导语:
在之前的文章中,我们学习了工作流编排的基本概念, 了解了任务 (Task) 的定义和使用方法, 甚至还亲自动手构建了你的第一个工作流! 相信你已经体会到了工作流编排的强大之处。 但是, 任务只是工作流的基本单元, 要想构建和管理复杂的工作流, 我们还需要一个更强大的工具, 那就是 流程 (Flow)。 如果说任务是工作流的基本单元,那么流程就是将这些基本单元组织起来,构建成完整工作流的核心。
一、回顾:任务只是工作流中的一部分
我们先来快速回顾一下 任务 的概念。 任务是工作流中最小的执行单元, 它代表了一个独立、可复用、执行特定操作的代码单元。 例如, 从数据库中读取数据、处理数据、发送邮件等等, 都可以定义为一个任务。 任务是构建工作流的基石, 没有任务, 工作流就无从谈起。
虽然任务很重要,但它只是工作流的一部分。 要构建复杂的工作流,我们需要一种方法将多个任务组织起来,并定义它们之间的执行顺序和依赖关系。 这就是流程 (Flow) 发挥作用的地方。
二、什么是流程?如何用流程组织任务
那么,到底什么是流程呢?
定义流程:
简单来说,流程就是一个容器,用于组织和编排多个任务。 它定义了任务的执行顺序、依赖关系、数据传递等。 流程本身也是一个可复用的单元,可以被其他流程调用,从而构建更复杂的工作流。 在 Prefect 中, 使用
@flow
装饰器将 Python 函数转换为流程。流程如何组织任务:
在流程中, 可以通过 直接调用任务 或者使用
wait_for
参数来定义任务之间的依赖关系和执行顺序。 当你在一个流程中调用一个任务时, Prefect 会自动建立流程和任务之间的关联, 并根据任务的依赖关系来安排它们的执行顺序。流程与任务的关系:
流程包含任务,任务在流程中执行。 可以将流程看作一个更大的 “任务”, 它可以包含多个子任务, 甚至可以包含子流程, 从而实现更复杂的功能。
+-----------------+ | 流程(Flow) | | +-------------+ | | | 任务(Task) | | | +-------------+ | | +-------------+ | | | 任务(Task) | | | +-------------+ | | +-------------+ | | | 任务(Task) | | | +-------------+ | +-----------------+
流程的优势:
通过流程, 我们可以将复杂的工作流分解成更小的、更易于管理的模块。 流程提高了代码的可读性、可维护性和可复用性。 想象一下, 如果一个工作流包含几十个甚至上百个任务, 如果没有流程将它们组织起来, 那么整个工作流将会变得非常混乱, 难以理解和维护。
示例代码:
让我们来看一个简单的例子, 演示如何使用流程来组织多个任务:
from prefect import task, flow @task def task_a(): print("Task A executed") @task def task_b(): print("Task B executed") @task def task_c(): print("Task C executed") @flow def my_flow(): task_a() task_b() task_c() if __name__ == "__main__": my_flow()
在这个例子中,我们定义了三个任务
task_a
、task_b
和 task_c
, 然后使用 @flow
装饰器定义了一个名为 my_flow
的流程。 在 my_flow
中, 我们依次调用了这三个任务。 当我们运行 my_flow
时, 这三个任务会按照我们在流程中定义的顺序依次执行。请注意
@flow
装饰器的作用, 它将一个普通的 Python 函数转换成了一个 Prefect 流程。三、流程的调度:定时执行,事件触发,多种方式任你选
在实际应用中, 我们通常需要定期执行工作流, 或者在特定事件发生时触发工作流。 这就是流程调度的作用。
Deployment 的概念和作用:
在介绍流程调度之前,我们需要先了解一下
deployment
的概念。 deployment
是 Prefect 中用于部署和管理流程的重要概念。 通过创建 deployment
, 你可以将流程部署到 Prefect Cloud 或 Server 上, 并进行调度、监控和版本控制。一个
deployment
需要指定流程的入口文件、流程名称、调度计划等信息。 我们可以使用 prefect deployment build
和 prefect deployment apply
命令创建和应用 deployment
,也可以通过 python 代码构建。例如, 你可以使用以下命令创建一个简单的 deployment:
prefect deployment build ./my_flow.py:my_flow -n "my-first-deployment"
然后, 使用:
prefect deployment apply my_flow-deployment.yaml
应用该 deployment。
然后, 我们就可以为这个 deployment 配置调度或者触发器。
我们也可以在 python 代码里面构建 deployment:
from prefect.deployments import Deployment from prefect.server.schemas.schedules import IntervalSchedule from datetime import timedelta def build_deployment(): deployment = Deployment.build_from_flow( flow=my_flow, name="my_flow_deployment", schedule=IntervalSchedule(interval=timedelta(hours=1)) ) deployment.apply() if __name__ == "__main__": build_deployment()
这段代码会在部署一个
my_flow
流程, 并且设置了每小时执行一次的计划。介绍 Prefect 支持的流程调度方式:
Prefect 提供了多种流程调度方式, 可以满足不同的需求:
- 定时执行 (Schedules):
IntervalSchedule
: 设置固定的时间间隔执行流程。 例如, 每隔一小时执行一次, 每隔 10 分钟执行一次等等。
我们可以为流程设置定时调度计划, 让流程按照预定的时间自动执行。 Prefect 提供了三种类型的定时调度:
from prefect.deployments import Deployment from prefect.server.schemas.schedules import IntervalSchedule from datetime import timedelta # ... 省略之前的代码 def build_deployment(): deployment = Deployment.build_from_flow( flow=my_flow, name="my_flow_deployment", schedule=IntervalSchedule(interval=timedelta(hours=1)) # 每小时执行一次 ) deployment.apply() if __name__ == "__main__": build_deployment()
CronSchedule
: 使用 Cron 表达式设置调度计划。 Cron 表达式是一种非常灵活的定时任务表示方式, 可以精确到分钟, 可以表示各种复杂的调度计划。 例如, 每天早上 8 点执行, 每周一到周五的下午 3 点执行等等。+-----------------+ | CronSchedule | | "0 8 * * *" | <-- 每天早上8点执行 +-----------------+ | | 部署 (Deploy) v +-----------------+ | 流程 (Flow) | +-----------------+
from prefect.deployments import Deployment from prefect.server.schemas.schedules import CronSchedule # ... 省略之前的代码 def build_deployment(): deployment = Deployment.build_from_flow( flow=my_flow, name="my_flow_deployment", schedule=CronSchedule(cron="0 8 * * *") # 每天早上 8 点执行 ) deployment.apply() if __name__ == "__main__": build_deployment()
RRuleSchedule
: 使用 iCalendar RFC 中定义的重复规则 (RRule) 来设置调度计划。 RRule 是一种非常强大的重复规则表示方式, 可以表示各种复杂的重复模式。这里由于篇幅关系就不再展开, 读者可以参考 Prefect 相关的文档。
- 事件触发 (Triggers/Automations):
除了定时执行, Prefect 还支持基于事件的触发器, 可以根据特定的事件来触发流程的执行。 例如, 当一个新的文件上传到云存储时, 当收到一个 Webhook 请求时, 都可以触发流程的执行。
+-----------------+ +-----------------+ +-----------------+ | Webhook 事件 | | 文件上传事件 | | 定时事件 | +-----------------+ +-----------------+ +-----------------+ | | | | | | +-----> 触发器 (Trigger) <-----+ | | 触发流程执行 v +-----------------+ | 流程 (Flow) | +-----------------+
事件触发是通过
Automations
来实现的。 Automations
是 Prefect Cloud 或 Server 上提供的一项功能, 可以用于创建触发器 (Triggers)。 触发器可以根据特定的事件来触发流程的执行。 你可以通过 Prefect UI 或者 API 创建和管理触发器。可以配置的触发器类型包括:Webhook 事件、调度事件、手动触发等等。 在创建触发器时, 你需要指定触发条件和要执行的流程的
deployment
。例如, 你可以创建一个 Webhook 触发器, 当收到一个 HTTP POST 请求时, 触发某个流程的执行。 你需要在 Prefect UI 中创建一个
Automation
, 选择 “Webhook” 类型, 然后配置触发条件和要执行的流程的 deployment
。由于配置
Automations
需要 Prefect Cloud 或者 Prefect Server, 这里就不再展开, 读者可以参考 Prefect 相关的文档。- 手动触发:
除了定时执行和事件触发, 你还可以通过 Prefect UI 或者 API 手动触发流程的执行。
+-----------------+ | Prefect UI/API | <-- 手动触发 +-----------------+ | | 执行流程 v +-----------------+ | 流程 (Flow) | +-----------------+
流程调度的优先级:
如果一个流程同时配置了定时调度和触发器, 那么触发器触发的流程执行优先级会高于定时调度。 手动触发的流程执行优先级最高。
Prefect 内置的系统级
Automations
:Prefect 提供了一些内置的系统级
Automations
, 用于处理一些常见的事件, 例如:Notify on failure
: 流程运行失败时发送通知。
Pause on failure
: 流程运行失败时暂停deployment
。
Retry on failure
: 流程运行失败时自动重试。
这些内置的
Automations
可以帮助你更好地管理流程的执行。流程调度最佳实践:
- 选择合适的调度方式, 避免过于频繁或者不必要的执行。
- 合理配置触发器的条件, 避免误触发。
- 监控流程的执行情况, 及时调整调度计划。
不同的调度方式适用于不同的场景, 需要根据实际需求选择合适的调度方式。
四、流程的版本控制:让修改更加安全
在开发和维护工作流的过程中, 我们经常需要修改流程的定义。 版本控制可以帮助我们跟踪流程的变更历史, 方便回滚到之前的版本, 避免因为错误的修改而导致工作流出现问题。
Prefect 提供了强大的版本控制功能。 当你使用
deployment
部署流程时, Prefect 会自动为每个版本的流程创建唯一的标识符。 Prefect 会保存每个版本的流程定义, 方便用户查看和比较不同版本的差异。 你可以使用版本号来指定要执行的流程版本 (这里不详细展开)。版本控制的最佳实践:
- 每次修改流程后, 都应该创建一个新的版本。
- 使用有意义的版本号或者注释来描述每个版本的变更内容。例如, 使用
prefect deployment build ./my_flow.py:my_flow -n "my-deployment" -t v1
部署时指定 tag 为v1
五、流程的监控:实时掌握工作流的运行状态
监控是工作流编排中非常重要的一环。 通过监控, 我们可以实时了解工作流的运行状态, 及时发现和处理问题。
Prefect 提供了多种流程监控功能:
- Prefect UI: Prefect UI 提供了直观的图形界面, 可以查看流程的运行状态、任务的执行情况、日志信息等。 可以查看流程的运行历史记录, 方便分析和排查问题。
- 日志记录:
Prefect 会自动记录流程和任务的执行日志, 方便用户查看和分析。 你可以使用
get_run_logger
获取日志记录器, 在任务中记录自定义的日志信息。
from prefect import task, flow, get_run_logger @task def my_task(): logger = get_run_logger() logger.info("This is a log message from my_task") @flow def my_flow(): my_task() if __name__ == "__main__": my_flow()
- 通知和告警: Prefect 支持将流程的运行状态通知给用户, 例如通过邮件、Slack 等方式。 可以设置告警规则, 在流程执行失败或者出现其他异常情况时发送告警信息。 (这里不详细展开)
监控的最佳实践:
- 定期查看流程的运行状态, 及时发现问题。
- 设置合理的告警规则, 避免错过重要的错误信息。
总结:
在本篇文章中, 我们深入学习了流程 (Flow) 的概念和用法。 我们了解了什么是流程, 如何使用流程来组织任务, 以及 Prefect 提供的各种流程调度、版本控制和监控功能。 流程是构建和管理复杂工作流的核心, 掌握了流程的用法, 你就能够更加高效地使用 Prefect 构建强大的工作流!
Prefect 提供了丰富的功能来帮助我们构建强大的工作流, 本篇文章只介绍了流程的一些基本用法, 还有更多高级用法等待你去探索! 不要害怕尝试, 勇敢地去探索 Prefect 的更多功能, 你会发现工作流编排的无限魅力!
在下一篇文章中, 我们将深入学习工作流编排的高级技巧, 例如动态映射、自定义组件等等。 继续加油! 让我们一起成为工作流编排的高手!