Part6:高级技巧:让你的工作流更上一层楼
6️⃣

Part6:高级技巧:让你的工作流更上一层楼

导语:

恭喜你!经过前五篇文章的学习,你已经掌握了 Prefect 的核心概念, 能够使用任务 (Task) 和流程 (Flow) 构建基本的工作流, 并且学会了如何调度和监控流程的执行。 是时候更上一层楼, 探索 Prefect 的高级技巧了! 本篇文章将介绍动态映射、自定义组件、集成外部服务、结果持久化、缓存、Artifacts 等高级功能, 并分享一些最佳实践, 帮助你构建更强大、更灵活、更高效的工作流!

一、回顾:你已经掌握了工作流编排框架的基本用法

在深入学习高级技巧之前, 让我们快速回顾一下前几篇文章的核心知识点:
  • 工作流的概念和应用场景: 工作流是由一系列相互关联的步骤组成的, 用于完成特定的业务目标。 工作流编排框架可以帮助我们自动化执行和管理工作流。
  • 工作流编排框架的作用和优势: 自动化执行、提高效率、减少错误、增强可靠性和可维护性。
  • Prefect 的基本概念:任务 (Task) 和流程 (Flow): 任务是最小的执行单元, 流程是任务的容器, 用于组织和编排任务。
  • 如何安装和配置 Prefect: 使用 pip 或者 uv 安装 Prefect, 以及如何配置 Prefect Cloud 或 Server (可选)。
  • 如何定义任务和流程, 以及如何设置任务的参数和依赖关系: 使用 @task@flow 装饰器, 以及 wait_for 参数。
  • 如何调度和监控流程: 使用 deploymentSchedulesAutomations 以及 Prefect UI 进行监控。
到目前为止, 你已经掌握了构建基本工作流所需的知识和技能。 现在, 让我们一起探索 Prefect 的高级技巧, 让你的工作流更上一层楼!

二、动态映射:处理批量数据不再是难题

在实际应用中, 我们经常需要处理批量数据, 例如对一个列表中的每个元素执行相同的操作。 例如, 你可能需要处理一个包含多个文件名的列表, 然后对每个文件执行下载、处理、上传等操作。
在没有动态映射之前, 我们通常会使用循环来实现类似的功能, 例如:
from prefect import task, flow @task def process_item(item): print(f"Processing item: {item}") @flow def process_all_items(items): for item in items: process_item(item) if __name__ == "__main__": items = [1, 2, 3, 4, 5] process_all_items(items)
这种方式虽然可以实现功能, 但是存在一些局限性:
  • 代码冗长: 需要手动编写循环逻辑。
  • 无法充分利用并发执行的优势: 循环是串行执行的, 无法利用 Prefect 的并发执行能力。
Prefect 提供了 动态映射 (Dynamic Mapping) 功能, 可以帮助我们轻松地处理这种场景。 动态映射允许我们根据输入数据的数量, 动态地创建多个任务实例, 并发地执行它们。 这样可以大大提高工作流的执行效率, 特别是当处理大量数据时。
介绍 map 方法:
Prefect 提供了 map 方法, 可以让我们轻松地实现动态映射。 map 方法接收一个任务和一个可迭代对象作为输入, 然后为可迭代对象中的每个元素创建一个任务实例, 并将元素作为参数传递给任务。
示例代码:
from prefect import task, flow @task def process_item(item): print(f"Processing item: {item}") @flow def process_all_items(items): process_item.map(items) # 使用 map 方法 if __name__ == "__main__": items = [1, 2, 3, 4, 5] process_all_items(items)
在这个例子中, 我们使用 process_item.map(items) 代替了之前的循环逻辑。 map 方法会为 items 列表中的每个元素创建一个 process_item 任务实例, 并将元素作为参数传递给任务。 Prefect 会自动并发地执行这些任务实例。
[1, 2, 3, 4, 5] <-- 输入数据 (items) | | 通过 map 方法 v +----+----+----+----+----+ | 任务 | 任务 | 任务 | 任务 | 任务 | <-- 动态创建多个任务实例 +----+----+----+----+----+ | | | | | | | | | | 并发执行 v v v v v [结果1][结果2][结果3][结果4][结果5] <-- 每个任务的执行结果
动态映射的优势:
  • 简化代码逻辑: 无需手动编写循环逻辑, 代码更加简洁。
  • 提高代码的可读性和可维护性: 代码逻辑更加清晰, 更易于理解和维护。
  • 充分利用并发执行的优势: Prefect 会自动并发地执行任务实例, 提高了工作流的执行效率。
展开介绍 map 方法的其他参数 (可选):
map 方法还支持一些其他参数, 例如 max_parallelism, 用于控制并发执行的任务实例数量。 你可以根据实际情况设置这些参数, 以优化工作流的性能。

三、自定义组件:扩展框架,满足你的特殊需求

虽然 Prefect 提供了许多内置的任务和流程, 但在实际应用中, 我们可能需要根据自己的特殊需求来扩展框架的功能。 Prefect 支持自定义组件, 允许我们创建自己的任务、流程和其他组件。
自定义组件的优势:
  • 提高代码的可复用性: 可以将常用的操作封装成自定义组件, 并在不同的流程中重复使用。
  • 扩展 Prefect 的功能: 可以根据自己的需求定制 Prefect 的行为, 满足特定的业务需求。
  • 让工作流更加灵活和可定制: 可以根据不同的场景选择使用不同的自定义组件。
介绍自定义任务:
我们可以通过继承 Task 类来创建自定义任务。 在自定义任务中, 可以重写 run 方法来实现自定义的逻辑。 自定义任务可以像内置任务一样在流程中使用。
示例代码:
from prefect import task, flow from prefect.tasks import Task class MyCustomTask(Task): def __init__(self, message, **kwargs): super().__init__(**kwargs) self.message = message def run(self): print(f"Custom task: {self.message}") @flow def my_flow(): custom_task = MyCustomTask(message="Hello from custom task!") custom_task() if __name__ == "__main__": my_flow()
在这个例子中, 我们创建了一个名为 MyCustomTask 的自定义任务, 它继承自 Task 类, 并重写了 run 方法。 在 run 方法中, 我们打印了一条自定义的消息。 我们可以像使用内置任务一样, 在流程中使用自定义任务。
介绍自定义流程 (可选):
除了自定义任务, 我们还可以通过继承 Flow 类来创建自定义流程。 自定义流程可以用于封装更复杂的逻辑, 或者修改流程的默认行为。 (这里不深入展开)

四、集成外部服务:与数据库,云平台无缝对接

在实际应用中, 工作流通常需要与各种外部服务进行交互, 例如数据库、云存储、消息队列等等。 手动管理这些服务的连接信息既繁琐又容易出错, 还存在安全风险。
Prefect 的 Blocks 机制:
Prefect 提供了 Blocks 机制来管理和配置外部服务连接。 通过 Blocks, 我们可以安全地存储连接信息, 例如数据库的用户名和密码, 云平台的 API 密钥等等。
Prefect 提供了许多预定义的 Blocks, 可以方便地连接到常用的外部服务, 例如:
  • Secret:用于存储敏感信息, 如密码、API 密钥等。
  • LocalFileSystem:用于操作本地文件系统。
  • S3Bucket:用于连接到 AWS S3 云存储。
  • KubernetesClusterConfig:用于连接到 Kubernetes 集群。
  • ... 还有许多其他类型的 Blocks, 可以参考 Prefect 的官方文档。
我们也可以创建自定义的 Blocks 来连接其他服务。
示例代码:
以下示例演示如何使用 S3Bucket Block 连接到 AWS S3 云存储,并在任务中上传文件:
from prefect import task, flow from prefect_aws import AwsCredentials from prefect_aws.s3 import S3Bucket # 提前在 Prefect UI 中创建好 S3Bucket 和 AwsCredentials 的 Block @task def upload_file_to_s3(filename, bucket_block_name, credentials_block_name): aws_credentials = AwsCredentials.load(credentials_block_name) s3_bucket = S3Bucket.load(bucket_block_name, credentials=aws_credentials) s3_bucket.upload_from_path(filename, f"s3://{s3_bucket.bucket_name}/{filename}") @flow def my_flow(): upload_file_to_s3("my_file.txt", "my-s3-bucket", "my-aws-credentials") if __name__ == "__main__": # 创建一个测试文件 with open("my_file.txt", "w") as f: f.write("This is a test file.") my_flow()
注意: 上述代码中 my-s3-bucketmy-aws-credentials 假设你已经提前在 Prefect UI 中创建好了。
在这个例子中, 我们使用了 S3Bucket Block 来连接到 AWS S3 云存储。 我们首先使用 AwsCredentials.load() 加载 AWS 凭证, 然后使用 S3Bucket.load() 加载 S3 存储桶的配置信息。 最后, 我们使用 s3_bucket.upload_from_path() 方法将文件上传到 S3 存储桶。
+-----------------+ +-----------------+ | Prefect 任务 | | AWS S3 Bucket | | | | | | upload_file | ---> | my-s3-bucket | | | | | +-----------------+ +-----------------+ ^ ^ | +------------+ | | | S3Bucket | | | | Block | | | +------------+ | | | +-------------------------+
Blocks 的优势:
  • 安全地存储和管理连接信息: 避免将敏感信息硬编码在代码中, 提高了安全性。
  • 方便地在不同的流程和任务中复用连接信息: 无需在每个任务中都重复配置连接信息。
  • 简化与外部服务的交互: Prefect 提供了统一的接口来访问不同的外部服务, 让我们可以更专注于业务逻辑的实现。

五、结果持久化:保存和复用任务的结果

默认情况下,Prefect 任务的结果只保存在内存中,流程执行完成后就会消失。 在某些情况下, 我们需要将任务的结果持久化保存下来, 以便后续使用或者分析。 例如, 机器学习模型训练任务的结果 (模型文件) 就需要保存下来, 以便后续进行模型部署和推理。
应用场景:
  • 中间结果的缓存和复用: 对于一些计算量大、耗时长的任务, 可以将其结果持久化保存下来, 并在后续的流程中复用, 避免重复计算。
  • 任务结果的长期保存和分析: 可以将任务的结果保存到数据库、文件系统或者云存储中, 以便进行长期的保存和分析。
  • 构建数据管道: 可以将一个任务的结果保存下来, 并作为另一个任务的输入, 从而构建数据管道。
Prefect 支持的结果持久化方式:
Prefect 支持多种结果持久化方式, 包括:
  • 本地文件系统 (Local File System): 可以将任务的结果保存到本地文件系统中。
  • 云存储服务: 可以将任务的结果保存到云存储服务中, 例如 AWS S3, Google Cloud Storage, Azure Blob Storage 等等。
我们可以使用 @task 装饰器的 persist_result 参数来控制是否需要持久化任务的结果, 使用 result_storage 参数来指定存储的位置和方式。 还可以使用 cache_result_in_memory 参数控制是否在内存中缓存结果。
示例代码:
from prefect import task, flow from prefect.filesystems import LocalFileSystem @task(persist_result=True, result_storage=LocalFileSystem.load("my-result-storage")) def my_task(): result = {"message": "This is a task result."} return result @flow def my_flow(): my_task() if __name__ == "__main__": # 首先,你需要创建一个名为 `my-result-storage` 的 LocalFileSystem Block, 可以在 Prefect UI 中创建。 my_flow()
在这个例子中, 我们将 my_taskpersist_result 参数设置为 True, 表示需要将任务的结果持久化保存下来。 我们使用 result_storage 参数指定了存储的位置为名为 my-result-storageLocalFileSystem Block。
注意事项:
  • 选择合适的存储方式, 需要考虑存储成本、访问速度、安全性等因素。
  • 注意清理过期的结果, 避免存储空间的浪费。

六、缓存:避免重复执行,提高效率

Prefect 提供了缓存机制, 可以避免重复执行已经执行过的任务, 从而提高工作流的执行效率。 特别是对于那些计算量大、耗时长的任务, 缓存可以节省大量的执行时间。
缓存的原理:
Prefect 会根据任务的 输入参数代码的哈希值 来计算一个缓存键 (Cache Key)。 如果一个任务的缓存键已经存在, 并且结果已经缓存, 那么 Prefect 会直接返回缓存的结果, 而不会重新执行任务。
+-----------------+ +-----------------+ | 任务 (Task) | | 缓存 (Cache) | | 输入参数 | ---> | 计算缓存键 | | 代码 | ---> | (Cache Key) | +-----------------+ +-----------------+ | ^ | | | 检查缓存是否存在 | v | +-----------------+ +-----------------+ | 执行任务 | | 返回缓存结果 | +-----------------+ +-----------------+
如何使用缓存:
我们可以使用 @task 装饰器的 cache_key_fn 参数指定缓存键的计算方法, 使用 cache_expiration 参数指定缓存的过期时间。
示例代码:
from prefect import task, flow from datetime import timedelta @task(cache_key_fn=lambda context, parameters: parameters["item"], cache_expiration=timedelta(minutes=1)) def my_task(item): print(f"Processing item: {item}") return item * 2 @flow def my_flow(): my_task(1) my_task(2) my_task(1) # 由于缓存, 该任务不会被执行 if __name__ == "__main__": my_flow()
在这个例子中, 我们为 my_task 启用了缓存, 并使用 cache_key_fn 参数指定了缓存键的计算方法为 lambda context, parameters: parameters["item"], 这表示缓存键将根据任务的 item 参数的值来计算。 我们还使用 cache_expiration 参数指定了缓存的过期时间为 1 分钟。
当我们运行 my_flow 时, 你会发现 my_task(1) 只执行了一次, 第二次调用 my_task(1) 时, Prefect 直接返回了缓存的结果。
注意事项:
  • 确保缓存键的计算方法能够唯一地标识任务的输入和代码。
  • 合理设置缓存的过期时间, 避免缓存过期导致的数据不一致问题。

七、 Artifacts: 记录和展示任务执行结果

Artifacts 是 Prefect 中用于记录和展示任务执行结果的一种机制。 你可以使用 create_link_artifact, create_markdown_artifact, create_table_artifact 等函数在任务中创建不同类型的 ArtifactsArtifacts 会显示在 Prefect UI 中, 方便用户查看任务的执行结果。
例如, 你可以在机器学习模型训练任务中创建一个 Artifacts 来展示模型的评估指标。
示例代码:
from prefect import task, flow, artifacts @task def my_task(): result = "This is a task result." artifacts.create_markdown_artifact( markdown=f"## Task Result\\n\\n{result}", key="task-result" ) @flow def my_flow(): my_task() if __name__ == "__main__": my_flow()
在这个例子中, 我们在 my_task 中使用 create_markdown_artifact 函数创建了一个 Markdown 类型的 Artifacts, 用于展示任务的执行结果。 当你在 Prefect UI 中查看这个任务的执行结果时, 你会看到一个名为 “task-result” 的 Artifacts, 其中包含了任务执行结果的 Markdown 文本。

八、最佳实践:分享一些使用工作流编排框架的经验

除了上述介绍的高级技巧之外, 这里还有一些使用工作流编排框架的最佳实践:
  • 模块化设计: 将复杂的工作流分解成更小的、更易于管理的模块 (任务和流程)。 这样可以提高代码的可读性、可维护性和可复用性。
  • 参数化任务: 使用参数来配置任务的行为, 提高任务的灵活性和可复用性。 避免将配置信息硬编码在任务的代码中。
  • 充分利用并发: 通过合理地定义任务的依赖关系和使用动态映射, 充分利用 Prefect 的并发执行能力, 提高工作流的执行效率。
  • 错误处理: 考虑各种可能出现的错误情况, 并进行相应的处理, 保证工作流的健壮性。 使用 try...except 语句捕获异常, 使用 retries 参数设置重试策略, 使用 on_failure 参数定义任务失败时的回调函数。
  • 日志记录: 使用 get_run_logger 记录详细的日志信息, 方便排查问题和跟踪工作流的执行情况。
  • 版本控制: 使用版本控制来管理工作流的变更历史, 方便回滚到之前的版本。
  • 监控和告警: 设置监控和告警规则, 及时发现和处理问题。 使用 Prefect UI 监控流程的执行状态, 设置 Automations 来发送告警信息。
  • 代码规范: 遵循良好的代码规范, 提高代码的可读性和可维护性。 使用有意义的变量名和函数名, 添加必要的注释。
  • Task Runner 的选择和使用:
    • Prefect 提供了不同类型的 Task Runner, 用于执行任务。 包括 SequentialTaskRunner, ConcurrentTaskRunner, DaskTaskRunnerRayTaskRunner (后面两个用于分布式任务)。
    • 默认情况下, Prefect 使用 ConcurrentTaskRunner 来并发执行任务。
    • 你可以根据任务的特点和资源情况选择合适的 Task Runner。 例如, 如果你的任务是 CPU 密集型的, 可以使用 DaskTaskRunnerRayTaskRunner 来利用多核 CPU 的优势。
    • 可以在流程上配置 task_runner 属性来使用不同的 Task Runner
    • from prefect import flow from prefect.task_runners import SequentialTaskRunner @flow(task_runner=SequentialTaskRunner()) def my_flow(): # ...
  • 调试技巧:
    • 使用 get_run_logger 记录详细的日志信息, 方便调试。
    • 使用 Prefect UI 查看任务的执行状态和日志信息。
    • 使用 flow.visualize() 方法可以可视化流程的结构和任务之间的依赖关系。
      • my_flow.visualize()
    • 使用 Python 调试器 (例如 pdb) 来调试任务的代码。

总结:

在本篇文章中, 我们学习了 Prefect 的一些高级技巧, 包括动态映射、自定义组件、集成外部服务、结果持久化、缓存、Artifacts 以及一些最佳实践。 这些高级技巧可以帮助我们构建更强大、更灵活、更高效的工作流!
希望你能够将这些技巧应用到实际的工作流开发中, 不断提升你的工作流编排能力。 Prefect 是一个非常强大的工具, 还有很多高级功能等待你去探索!
在下一篇文章中, 我们将通过实战案例来巩固所学知识。 继续加油! 让我们一起成为工作流编排的高手!