导语:
在之前的文章中,我们已经成功地搭建了第一个工作流,并初步了解了 Prefect 框架的基本用法。今天,我们将深入探讨工作流中的核心组件——任务 (Task)。 任务是构建复杂工作流的基础, 本篇文章的目标是帮助你 “精通” 任务的各种用法,让你能够更加灵活和高效地使用 Prefect 构建强大的工作流。
一、回顾:我们已经成功运行了第一个工作流
在上一篇文章中,我们一起搭建了一个简单的 “Hello World” 工作流。我们定义了一个
hello_task
任务,用于打印 “Hello, World!”, 然后使用一个 hello_flow
流程来调用这个任务。任务是工作流中最小的执行单元,它是构成复杂工作流的基础。 我们构建的任何工作流,最终都会分解成一个个独立的任务。如果你还没有运行过上篇文章的代码,现在是时候再次运行它,回顾一下!
二、什么是任务?它是如何执行的? + 任务执行顺序的默认行为 + 任务并发执行
那么,到底什么是任务呢? 简单来说,任务 是工作流中不可或缺的组成部分,它代表了工作流中的一个具体操作。它是一个独立的可复用的代码单元,可以执行特定的逻辑。
Prefect 如何将 Python 函数转换为任务呢? Prefect 使用
@task
装饰器,将一个普通的 Python 函数转换为一个可以在工作流中执行的任务。 Prefect 会自动处理任务的执行环境、日志记录、错误处理等细节。任务的执行环境,可以是本地环境、远程服务器、Docker 容器等等 (可以简单理解为代码实际运行的地方,不深入展开)。 任务的生命周期是指任务从开始执行到最终结束的过程,包括:开始执行、执行中、执行完成、执行失败等等状态。
我们用生活中的例子,再次说明任务的概念,方便大家理解。 在 “泡咖啡” 的例子中, 烧水、磨咖啡豆、冲泡咖啡 都是一个个独立的任务。
任务执行顺序的默认行为:
默认情况下,Prefect 会按照你在
flow
中调用任务的顺序来执行任务。也就是说, 先调用的任务会先执行,后调用的任务会后执行。这种默认的执行顺序适用于简单的线性工作流,但对于更复杂的工作流,我们需要更精细的控制。任务并发执行:
Prefect 默认情况下,在一个
flow
中定义的任务, 并不一定是串行执行的。 Prefect 会尽可能地并发执行没有依赖关系的任务,以提高工作流的执行效率。也就是说,如果多个任务之间没有依赖关系,那么它们会同时执行, 而不是按顺序执行。 Prefect 会自动处理并发执行中的资源管理和任务调度问题。 当然,我们也可以通过一些方法来控制并发执行的程度, 例如设置任务的运行资源 (可以简单介绍, 不深入展开)。为了更直观地展示任务的执行流程, 我们可以用一个流程图来表示:
+-------+ +-------+ +-------+ | 任务A | | 任务B | | 任务C | +-------+ +-------+ +-------+ | | / | 顺序执行 | 并发执行 / v v / +-------+ +-------+ / | 任务B | | 任务C | / +-------+ +-------+ / | | / | | / v v / +--------------------+ | 任务D (依赖A和C) | +--------------------+
图中的箭头表示任务的执行顺序和依赖关系。 没有依赖关系的任务可以并发执行, 有依赖关系的任务必须按顺序执行。
三、任务的参数:如何传递数据给任务
在实际应用中,我们经常需要给任务传递一些参数,来改变它的行为,让任务更加灵活和可定制。 如何使用 Prefect 给任务传递参数呢? 其实和普通的 Python 函数一样, 通过函数参数传递参数即可。
我们来看一个例子: 定义一个可以打印不同信息的任务:
from prefect import task, flow @task def print_message(message): print(message) @flow def message_flow(): print_message(message = "Hello, Prefect!") print_message(message = "This is a task with parameter!") if __name__ == "__main__": message_flow()
运行以上代码,你可以看到控制台打印了不同的信息。 在这个例子中, 我们使用
message
参数来传递不同的信息给 print_message
任务。 你可以修改代码, 尝试使用不同的参数来运行任务, 看看结果会有什么变化。除了函数参数, 我们还可以使用
@task
装饰器的 name
参数来指定任务名称, 使用 @task
装饰器的其他参数来设置任务属性 (例如设置重试次数, 错误处理等等,这里不深入展开)。我们还可以使用更复杂的数据结构作为参数,例如字典,列表等,让任务接收更丰富的信息:
from prefect import task, flow @task def process_data(data, threshold): results = [] for item in data: if item > threshold: results.append(item) print(results) @flow def my_flow(): data = [1, 5, 10, 3, 8] process_data(data, 5) if __name__ == "__main__": my_flow()
在这个例子中,
process_data
任务接收一个列表 data
和一个阈值 threshold
作为参数,并输出大于阈值的元素。四、任务的依赖关系:如何控制任务的执行顺序 + 显式控制任务执行顺序
在复杂的工作流中,任务之间通常存在依赖关系, 某些任务必须等待其他任务执行完成后才能开始。任务之间的依赖关系是限制任务并发执行的关键。 如果任务之间存在依赖关系, 那么它们就不能同时执行, 必须按照依赖顺序依次执行。 通过合理地定义任务的依赖关系, 我们可以有效地控制任务的并发执行程度。
Prefect 如何定义任务的依赖关系呢? 最简单的方法是在
flow
中,直接调用任务即可建立依赖关系。 Prefect 会按照你在 flow
中调用任务的顺序,依次执行任务。示例代码 1: 线性执行
from prefect import task, flow import time @task def task_a(): time.sleep(1) print("Task A finished") @task def task_b(): time.sleep(1) print("Task B finished") @flow def my_flow(): task_a() task_b() if __name__ == "__main__": my_flow()
代码解释 1: 在这个例子中,
task_a
会先执行,然后 task_b
才会执行。 因为在 my_flow
中,我们先调用了 task_a()
,然后才调用了 task_b()
。time.sleep(1)
用于模拟耗时任务,方便观察执行顺序。除了在
flow
中直接调用任务来建立依赖关系之外,我们还可以使用 wait_for
参数,来更加显式地控制任务的执行顺序。 使用 wait_for
参数,我们可以指定某些任务必须等待另一些任务完成后才能开始执行。 这种方式更加灵活,适用于复杂的依赖关系。示例代码 2: 使用
wait_for
显式控制依赖关系from prefect import task, flow, get_run_logger import time @task def task_a(): time.sleep(1) print("Task A finished") return "Result from A" @task def task_b(): time.sleep(1) print("Task B finished") return "Result from B" @task def task_c(): time.sleep(1) print("Task C finished") @flow def my_flow(): a = task_a() task_c() task_b.submit(wait_for=[a]) # 使用 submit 方法提交 task_b 任务, 并明确说明需要等待 task_a 完成 if __name__ == "__main__": my_flow()
代码解释 2: 在这个例子中,
task_c
和 task_a
会并发执行 (因为没有依赖关系), 而 task_b
必须等待 task_a
完成后才会执行。我们使用 task_b.submit(wait_for=[a])
显式指定了 task_b
对 task_a
的依赖关系。get_run_logger
可以打印更多日志信息。wait_for
参数还可以接收一个任务列表, 表示需要等待多个任务完成后才能执行, 例如, task_d.submit(wait_for=[a, c])
表示 task_d
需要等待 task_a
和 task_c
都完成后才能执行。示例代码 3: 演示并发执行
from prefect import task, flow import time @task def task_a(): time.sleep(1) print("Task A finished") @task def task_b(): time.sleep(1) print("Task B finished") @task def task_c(): time.sleep(1) print("Task C finished") @flow def my_flow(): task_a() task_b() task_c() if __name__ == "__main__": my_flow()
代码解释 3: 在这个例子中,
task_a
, task_b
和 task_c
之间没有任何依赖关系。 Prefect 会尽可能地并发执行这三个任务。 你会发现,这三个任务几乎是同时完成的,执行的顺序是不固定的。任务结果的传递:
在 Prefect 中, 任务的返回值可以被后续的任务使用。 在上面的
示例代码 2
中, task_a
和 task_b
各自返回了一个字符串。 我们可以修改 task_c
来使用这些返回值:from prefect import task, flow, get_run_logger import time @task def task_a(): time.sleep(1) print("Task A finished") return "Result from A" @task def task_b(): time.sleep(1) print("Task B finished") return "Result from B" @task def task_c(result_a, result_b): time.sleep(1) print(f"Task C received: {result_a} and {result_b}") print("Task C finished") @flow def my_flow(): a = task_a() b = task_b() task_c.submit(result_a=a, result_b=b) # 使用 .submit() 提交, 并且传入依赖任务的返回值。 if __name__ == "__main__": my_flow()
在这个例子中,
task_c
接收 result_a
和 result_b
两个参数, 它们分别是 task_a
和 task_b
的返回值。 Prefect 会自动处理任务结果的传递, 你只需要在 flow
中将任务的结果作为参数传递给后续的任务即可。 关于任务结果的更多用法, 我们将在后续的文章中详细介绍。鼓励读者动手实践,尝试改变任务的执行顺序,例如:
- 在示例代码 1 中,调换
task_a()
和task_b()
的调用顺序,然后运行,观察执行顺序。
- 在示例代码 2 中,尝试删除
wait_for=[a]
,然后运行,观察执行顺序。
- 在示例代码 3 中, 尝试添加更多没有依赖关系的任务,并运行,观察执行情况。
- 尝试修改代码,让
task_c
依赖于task_a
和task_b
,观察执行顺序。
五、任务的错误处理:让工作流更加健壮
在实际应用中,任务执行过程中可能会发生错误,我们需要对这些错误进行处理,才能保证工作流的健壮性。
Prefect 如何处理任务的错误呢? Prefect 默认会自动重试失败的任务 (当然, 你可以设置禁止重试)。 我们可以使用
@task
装饰器的 retry
和 retries
参数来设置重试策略, 例如指定重试的次数, 重试的时间间隔等等。 也可以使用 on_failure
参数定义任务失败时的回调函数, 例如发送告警信息,记录错误日志等等。除了
retries
参数, Prefect 还支持其他错误处理方式, 例如:- 使用
try...except
语句捕获任务中的异常,并进行处理。
- 使用
on_failure
参数定义任务失败时的回调函数,例如发送告警信息,记录错误日志等等。
- 使用
state_handlers
参数自定义任务状态的处理逻辑,例如在任务成功、失败、重试等状态下执行不同的操作。
我们来看一个使用
retries
参数的简单例子, 定义一个可能会执行失败的任务, 并使用 @task
的参数设置重试策略:from prefect import task, flow import random @task(retries=3) def flaky_task(): if random.random() < 0.5: raise Exception("Task failed!") print("Task succeeded!") @flow def flaky_flow(): flaky_task() if __name__ == "__main__": flaky_flow()
在这个例子中,
flaky_task
有 50% 的概率会执行失败。 我们设置了 retries=3
, 所以如果任务执行失败, Prefect 会自动重试, 最多重试3次。错误处理的最佳实践:
- 尽可能地捕获和处理所有可能出现的错误。
- 记录详细的错误日志,方便排查问题。
- 避免因为一个任务的失败而导致整个工作流崩溃。
总结:
本篇文章中, 我们学习了任务的定义,执行, 参数, 依赖关系和错误处理。 同时我们也学习了 Prefect 默认会并发执行没有依赖关系的任务, 并可以通过定义依赖关系来控制任务的执行顺序。 任务是构成工作流的基本单元,理解和掌握任务的各种用法,对于构建高效和健壮的工作流至关重要。
Prefect 提供了丰富的功能来帮助我们构建强大的工作流, 本篇文章只介绍了任务的一些基本用法, 还有更多高级用法等待你去探索! 不要害怕尝试, 勇敢地去探索 Prefect 的更多功能, 你会发现工作流编排的无限魅力!
接下来, 在下一篇文章中, 我们将深入学习流程 (Flow) 的概念和用法, 进一步探索工作流编排的奥秘。 继续加油! 让我们一起成为工作流编排的高手!