Part5:流程的魔力:让复杂工作流井然有序
5️⃣

Part5:流程的魔力:让复杂工作流井然有序

导语:

在之前的文章中,我们学习了工作流编排的基本概念, 了解了任务 (Task) 的定义和使用方法, 甚至还亲自动手构建了你的第一个工作流! 相信你已经体会到了工作流编排的强大之处。 但是, 任务只是工作流的基本单元, 要想构建和管理复杂的工作流, 我们还需要一个更强大的工具, 那就是 流程 (Flow)。 如果说任务是工作流的基本单元,那么流程就是将这些基本单元组织起来,构建成完整工作流的核心。

一、回顾:任务只是工作流中的一部分

我们先来快速回顾一下 任务 的概念。 任务是工作流中最小的执行单元, 它代表了一个独立、可复用、执行特定操作的代码单元。 例如, 从数据库中读取数据、处理数据、发送邮件等等, 都可以定义为一个任务。 任务是构建工作流的基石, 没有任务, 工作流就无从谈起。
虽然任务很重要,但它只是工作流的一部分。 要构建复杂的工作流,我们需要一种方法将多个任务组织起来,并定义它们之间的执行顺序和依赖关系。 这就是流程 (Flow) 发挥作用的地方。

二、什么是流程?如何用流程组织任务

那么,到底什么是流程呢?
定义流程:
简单来说,流程就是一个容器,用于组织和编排多个任务。 它定义了任务的执行顺序、依赖关系、数据传递等。 流程本身也是一个可复用的单元,可以被其他流程调用,从而构建更复杂的工作流。 在 Prefect 中, 使用 @flow 装饰器将 Python 函数转换为流程。
流程如何组织任务:
在流程中, 可以通过 直接调用任务 或者使用 wait_for 参数来定义任务之间的依赖关系和执行顺序。 当你在一个流程中调用一个任务时, Prefect 会自动建立流程和任务之间的关联, 并根据任务的依赖关系来安排它们的执行顺序。
流程与任务的关系:
流程包含任务,任务在流程中执行。 可以将流程看作一个更大的 “任务”, 它可以包含多个子任务, 甚至可以包含子流程, 从而实现更复杂的功能。
+-----------------+ | 流程(Flow) | | +-------------+ | | | 任务(Task) | | | +-------------+ | | +-------------+ | | | 任务(Task) | | | +-------------+ | | +-------------+ | | | 任务(Task) | | | +-------------+ | +-----------------+
流程的优势:
通过流程, 我们可以将复杂的工作流分解成更小的、更易于管理的模块。 流程提高了代码的可读性、可维护性和可复用性。 想象一下, 如果一个工作流包含几十个甚至上百个任务, 如果没有流程将它们组织起来, 那么整个工作流将会变得非常混乱, 难以理解和维护。
示例代码:
让我们来看一个简单的例子, 演示如何使用流程来组织多个任务:
from prefect import task, flow @task def task_a(): print("Task A executed") @task def task_b(): print("Task B executed") @task def task_c(): print("Task C executed") @flow def my_flow(): task_a() task_b() task_c() if __name__ == "__main__": my_flow()
在这个例子中,我们定义了三个任务 task_atask_btask_c, 然后使用 @flow 装饰器定义了一个名为 my_flow 的流程。 在 my_flow 中, 我们依次调用了这三个任务。 当我们运行 my_flow 时, 这三个任务会按照我们在流程中定义的顺序依次执行。
请注意 @flow 装饰器的作用, 它将一个普通的 Python 函数转换成了一个 Prefect 流程。

三、流程的调度:定时执行,事件触发,多种方式任你选

在实际应用中, 我们通常需要定期执行工作流, 或者在特定事件发生时触发工作流。 这就是流程调度的作用。
Deployment 的概念和作用:
在介绍流程调度之前,我们需要先了解一下 deployment 的概念。 deployment 是 Prefect 中用于部署和管理流程的重要概念。 通过创建 deployment, 你可以将流程部署到 Prefect Cloud 或 Server 上, 并进行调度、监控和版本控制。
一个 deployment 需要指定流程的入口文件、流程名称、调度计划等信息。 我们可以使用 prefect deployment buildprefect deployment apply 命令创建和应用 deployment,也可以通过 python 代码构建。
例如, 你可以使用以下命令创建一个简单的 deployment:
prefect deployment build ./my_flow.py:my_flow -n "my-first-deployment"
然后, 使用:
prefect deployment apply my_flow-deployment.yaml
应用该 deployment。
然后, 我们就可以为这个 deployment 配置调度或者触发器。
我们也可以在 python 代码里面构建 deployment:
from prefect.deployments import Deployment from prefect.server.schemas.schedules import IntervalSchedule from datetime import timedelta def build_deployment(): deployment = Deployment.build_from_flow( flow=my_flow, name="my_flow_deployment", schedule=IntervalSchedule(interval=timedelta(hours=1)) ) deployment.apply() if __name__ == "__main__": build_deployment()
这段代码会在部署一个 my_flow 流程, 并且设置了每小时执行一次的计划。
介绍 Prefect 支持的流程调度方式:
Prefect 提供了多种流程调度方式, 可以满足不同的需求:
  • 定时执行 (Schedules):
    • 我们可以为流程设置定时调度计划, 让流程按照预定的时间自动执行。 Prefect 提供了三种类型的定时调度:
    • IntervalSchedule 设置固定的时间间隔执行流程。 例如, 每隔一小时执行一次, 每隔 10 分钟执行一次等等。
    • from prefect.deployments import Deployment from prefect.server.schemas.schedules import IntervalSchedule from datetime import timedelta # ... 省略之前的代码 def build_deployment(): deployment = Deployment.build_from_flow( flow=my_flow, name="my_flow_deployment", schedule=IntervalSchedule(interval=timedelta(hours=1)) # 每小时执行一次 ) deployment.apply() if __name__ == "__main__": build_deployment()
    • CronSchedule 使用 Cron 表达式设置调度计划。 Cron 表达式是一种非常灵活的定时任务表示方式, 可以精确到分钟, 可以表示各种复杂的调度计划。 例如, 每天早上 8 点执行, 每周一到周五的下午 3 点执行等等。
    • +-----------------+ | CronSchedule | | "0 8 * * *" | <-- 每天早上8点执行 +-----------------+ | | 部署 (Deploy) v +-----------------+ | 流程 (Flow) | +-----------------+
      from prefect.deployments import Deployment from prefect.server.schemas.schedules import CronSchedule # ... 省略之前的代码 def build_deployment(): deployment = Deployment.build_from_flow( flow=my_flow, name="my_flow_deployment", schedule=CronSchedule(cron="0 8 * * *") # 每天早上 8 点执行 ) deployment.apply() if __name__ == "__main__": build_deployment()
    • RRuleSchedule 使用 iCalendar RFC 中定义的重复规则 (RRule) 来设置调度计划。 RRule 是一种非常强大的重复规则表示方式, 可以表示各种复杂的重复模式。
      • 这里由于篇幅关系就不再展开, 读者可以参考 Prefect 相关的文档。
  • 事件触发 (Triggers/Automations):
    • 除了定时执行, Prefect 还支持基于事件的触发器, 可以根据特定的事件来触发流程的执行。 例如, 当一个新的文件上传到云存储时, 当收到一个 Webhook 请求时, 都可以触发流程的执行。
      +-----------------+ +-----------------+ +-----------------+ | Webhook 事件 | | 文件上传事件 | | 定时事件 | +-----------------+ +-----------------+ +-----------------+ | | | | | | +-----> 触发器 (Trigger) <-----+ | | 触发流程执行 v +-----------------+ | 流程 (Flow) | +-----------------+
      事件触发是通过 Automations 来实现的。 Automations 是 Prefect Cloud 或 Server 上提供的一项功能, 可以用于创建触发器 (Triggers)。 触发器可以根据特定的事件来触发流程的执行。 你可以通过 Prefect UI 或者 API 创建和管理触发器。
      可以配置的触发器类型包括:Webhook 事件、调度事件、手动触发等等。 在创建触发器时, 你需要指定触发条件和要执行的流程的 deployment
      例如, 你可以创建一个 Webhook 触发器, 当收到一个 HTTP POST 请求时, 触发某个流程的执行。 你需要在 Prefect UI 中创建一个 Automation, 选择 “Webhook” 类型, 然后配置触发条件和要执行的流程的 deployment
      由于配置 Automations 需要 Prefect Cloud 或者 Prefect Server, 这里就不再展开, 读者可以参考 Prefect 相关的文档。
  • 手动触发:
    • 除了定时执行和事件触发, 你还可以通过 Prefect UI 或者 API 手动触发流程的执行。
      +-----------------+ | Prefect UI/API | <-- 手动触发 +-----------------+ | | 执行流程 v +-----------------+ | 流程 (Flow) | +-----------------+
流程调度的优先级:
如果一个流程同时配置了定时调度和触发器, 那么触发器触发的流程执行优先级会高于定时调度。 手动触发的流程执行优先级最高。
Prefect 内置的系统级 Automations
Prefect 提供了一些内置的系统级 Automations, 用于处理一些常见的事件, 例如:
  • Notify on failure: 流程运行失败时发送通知。
  • Pause on failure: 流程运行失败时暂停 deployment
  • Retry on failure: 流程运行失败时自动重试。
这些内置的 Automations 可以帮助你更好地管理流程的执行。
流程调度最佳实践:
  • 选择合适的调度方式, 避免过于频繁或者不必要的执行。
  • 合理配置触发器的条件, 避免误触发。
  • 监控流程的执行情况, 及时调整调度计划。
不同的调度方式适用于不同的场景, 需要根据实际需求选择合适的调度方式。

四、流程的版本控制:让修改更加安全

在开发和维护工作流的过程中, 我们经常需要修改流程的定义。 版本控制可以帮助我们跟踪流程的变更历史, 方便回滚到之前的版本, 避免因为错误的修改而导致工作流出现问题。
Prefect 提供了强大的版本控制功能。 当你使用 deployment 部署流程时, Prefect 会自动为每个版本的流程创建唯一的标识符。 Prefect 会保存每个版本的流程定义, 方便用户查看和比较不同版本的差异。 你可以使用版本号来指定要执行的流程版本 (这里不详细展开)。
版本控制的最佳实践:
  • 每次修改流程后, 都应该创建一个新的版本。
  • 使用有意义的版本号或者注释来描述每个版本的变更内容。例如, 使用 prefect deployment build ./my_flow.py:my_flow -n "my-deployment" -t v1 部署时指定 tag 为 v1

五、流程的监控:实时掌握工作流的运行状态

监控是工作流编排中非常重要的一环。 通过监控, 我们可以实时了解工作流的运行状态, 及时发现和处理问题。
Prefect 提供了多种流程监控功能:
  • Prefect UI: Prefect UI 提供了直观的图形界面, 可以查看流程的运行状态、任务的执行情况、日志信息等。 可以查看流程的运行历史记录, 方便分析和排查问题。
  • 日志记录: Prefect 会自动记录流程和任务的执行日志, 方便用户查看和分析。 你可以使用 get_run_logger 获取日志记录器, 在任务中记录自定义的日志信息。
    • from prefect import task, flow, get_run_logger @task def my_task(): logger = get_run_logger() logger.info("This is a log message from my_task") @flow def my_flow(): my_task() if __name__ == "__main__": my_flow()
  • 通知和告警: Prefect 支持将流程的运行状态通知给用户, 例如通过邮件、Slack 等方式。 可以设置告警规则, 在流程执行失败或者出现其他异常情况时发送告警信息。 (这里不详细展开)
监控的最佳实践:
  • 定期查看流程的运行状态, 及时发现问题。
  • 设置合理的告警规则, 避免错过重要的错误信息。

总结:

在本篇文章中, 我们深入学习了流程 (Flow) 的概念和用法。 我们了解了什么是流程, 如何使用流程来组织任务, 以及 Prefect 提供的各种流程调度、版本控制和监控功能。 流程是构建和管理复杂工作流的核心, 掌握了流程的用法, 你就能够更加高效地使用 Prefect 构建强大的工作流!
Prefect 提供了丰富的功能来帮助我们构建强大的工作流, 本篇文章只介绍了流程的一些基本用法, 还有更多高级用法等待你去探索! 不要害怕尝试, 勇敢地去探索 Prefect 的更多功能, 你会发现工作流编排的无限魅力!
在下一篇文章中, 我们将深入学习工作流编排的高级技巧, 例如动态映射、自定义组件等等。 继续加油! 让我们一起成为工作流编排的高手!