Part4:任务的艺术:精通工作流中的“任务”
4️⃣

Part4:任务的艺术:精通工作流中的“任务”

导语:

在之前的文章中,我们已经成功地搭建了第一个工作流,并初步了解了 Prefect 框架的基本用法。今天,我们将深入探讨工作流中的核心组件——任务 (Task)。 任务是构建复杂工作流的基础, 本篇文章的目标是帮助你 “精通” 任务的各种用法,让你能够更加灵活和高效地使用 Prefect 构建强大的工作流。

一、回顾:我们已经成功运行了第一个工作流

在上一篇文章中,我们一起搭建了一个简单的 “Hello World” 工作流。我们定义了一个 hello_task 任务,用于打印 “Hello, World!”, 然后使用一个 hello_flow 流程来调用这个任务。
任务是工作流中最小的执行单元,它是构成复杂工作流的基础。 我们构建的任何工作流,最终都会分解成一个个独立的任务。如果你还没有运行过上篇文章的代码,现在是时候再次运行它,回顾一下!

二、什么是任务?它是如何执行的? + 任务执行顺序的默认行为 + 任务并发执行

那么,到底什么是任务呢? 简单来说,任务 是工作流中不可或缺的组成部分,它代表了工作流中的一个具体操作。它是一个独立的可复用的代码单元,可以执行特定的逻辑。
Prefect 如何将 Python 函数转换为任务呢? Prefect 使用 @task 装饰器,将一个普通的 Python 函数转换为一个可以在工作流中执行的任务。 Prefect 会自动处理任务的执行环境、日志记录、错误处理等细节。
任务的执行环境,可以是本地环境、远程服务器、Docker 容器等等 (可以简单理解为代码实际运行的地方,不深入展开)。 任务的生命周期是指任务从开始执行到最终结束的过程,包括:开始执行、执行中、执行完成、执行失败等等状态。
我们用生活中的例子,再次说明任务的概念,方便大家理解。 在 “泡咖啡” 的例子中, 烧水、磨咖啡豆、冲泡咖啡 都是一个个独立的任务。
任务执行顺序的默认行为:
默认情况下,Prefect 会按照你在 flow 中调用任务的顺序来执行任务。也就是说, 先调用的任务会先执行,后调用的任务会后执行。这种默认的执行顺序适用于简单的线性工作流,但对于更复杂的工作流,我们需要更精细的控制。
任务并发执行:
Prefect 默认情况下,在一个 flow 中定义的任务, 并不一定是串行执行的。 Prefect 会尽可能地并发执行没有依赖关系的任务,以提高工作流的执行效率。也就是说,如果多个任务之间没有依赖关系,那么它们会同时执行, 而不是按顺序执行。 Prefect 会自动处理并发执行中的资源管理和任务调度问题。 当然,我们也可以通过一些方法来控制并发执行的程度, 例如设置任务的运行资源 (可以简单介绍, 不深入展开)。
为了更直观地展示任务的执行流程, 我们可以用一个流程图来表示:
+-------+ +-------+ +-------+ | 任务A | | 任务B | | 任务C | +-------+ +-------+ +-------+ | | / | 顺序执行 | 并发执行 / v v / +-------+ +-------+ / | 任务B | | 任务C | / +-------+ +-------+ / | | / | | / v v / +--------------------+ | 任务D (依赖A和C) | +--------------------+
图中的箭头表示任务的执行顺序和依赖关系。 没有依赖关系的任务可以并发执行, 有依赖关系的任务必须按顺序执行。

三、任务的参数:如何传递数据给任务

在实际应用中,我们经常需要给任务传递一些参数,来改变它的行为,让任务更加灵活和可定制。 如何使用 Prefect 给任务传递参数呢? 其实和普通的 Python 函数一样, 通过函数参数传递参数即可。
我们来看一个例子: 定义一个可以打印不同信息的任务:
from prefect import task, flow @task def print_message(message): print(message) @flow def message_flow(): print_message(message = "Hello, Prefect!") print_message(message = "This is a task with parameter!") if __name__ == "__main__": message_flow()
运行以上代码,你可以看到控制台打印了不同的信息。 在这个例子中, 我们使用 message 参数来传递不同的信息给 print_message 任务。 你可以修改代码, 尝试使用不同的参数来运行任务, 看看结果会有什么变化。
除了函数参数, 我们还可以使用 @task 装饰器的 name 参数来指定任务名称, 使用 @task 装饰器的其他参数来设置任务属性 (例如设置重试次数, 错误处理等等,这里不深入展开)。
我们还可以使用更复杂的数据结构作为参数,例如字典,列表等,让任务接收更丰富的信息:
from prefect import task, flow @task def process_data(data, threshold): results = [] for item in data: if item > threshold: results.append(item) print(results) @flow def my_flow(): data = [1, 5, 10, 3, 8] process_data(data, 5) if __name__ == "__main__": my_flow()
在这个例子中,process_data 任务接收一个列表 data 和一个阈值 threshold 作为参数,并输出大于阈值的元素。

四、任务的依赖关系:如何控制任务的执行顺序 + 显式控制任务执行顺序

在复杂的工作流中,任务之间通常存在依赖关系, 某些任务必须等待其他任务执行完成后才能开始。任务之间的依赖关系是限制任务并发执行的关键。 如果任务之间存在依赖关系, 那么它们就不能同时执行, 必须按照依赖顺序依次执行。 通过合理地定义任务的依赖关系, 我们可以有效地控制任务的并发执行程度。
Prefect 如何定义任务的依赖关系呢? 最简单的方法是在 flow 中,直接调用任务即可建立依赖关系。 Prefect 会按照你在 flow 中调用任务的顺序,依次执行任务。
示例代码 1: 线性执行
from prefect import task, flow import time @task def task_a(): time.sleep(1) print("Task A finished") @task def task_b(): time.sleep(1) print("Task B finished") @flow def my_flow(): task_a() task_b() if __name__ == "__main__": my_flow()
代码解释 1: 在这个例子中, task_a 会先执行,然后 task_b 才会执行。 因为在 my_flow 中,我们先调用了 task_a(),然后才调用了 task_b()time.sleep(1) 用于模拟耗时任务,方便观察执行顺序。
除了在 flow 中直接调用任务来建立依赖关系之外,我们还可以使用 wait_for 参数,来更加显式地控制任务的执行顺序。 使用 wait_for 参数,我们可以指定某些任务必须等待另一些任务完成后才能开始执行。 这种方式更加灵活,适用于复杂的依赖关系。
示例代码 2: 使用 wait_for 显式控制依赖关系
from prefect import task, flow, get_run_logger import time @task def task_a(): time.sleep(1) print("Task A finished") return "Result from A" @task def task_b(): time.sleep(1) print("Task B finished") return "Result from B" @task def task_c(): time.sleep(1) print("Task C finished") @flow def my_flow(): a = task_a() task_c() task_b.submit(wait_for=[a]) # 使用 submit 方法提交 task_b 任务, 并明确说明需要等待 task_a 完成 if __name__ == "__main__": my_flow()
代码解释 2: 在这个例子中,task_ctask_a 会并发执行 (因为没有依赖关系), 而 task_b 必须等待 task_a 完成后才会执行。我们使用 task_b.submit(wait_for=[a]) 显式指定了 task_btask_a 的依赖关系。get_run_logger 可以打印更多日志信息。
wait_for 参数还可以接收一个任务列表, 表示需要等待多个任务完成后才能执行, 例如, task_d.submit(wait_for=[a, c]) 表示 task_d 需要等待 task_atask_c 都完成后才能执行。
示例代码 3: 演示并发执行
from prefect import task, flow import time @task def task_a(): time.sleep(1) print("Task A finished") @task def task_b(): time.sleep(1) print("Task B finished") @task def task_c(): time.sleep(1) print("Task C finished") @flow def my_flow(): task_a() task_b() task_c() if __name__ == "__main__": my_flow()
代码解释 3: 在这个例子中,task_a, task_btask_c 之间没有任何依赖关系。 Prefect 会尽可能地并发执行这三个任务。 你会发现,这三个任务几乎是同时完成的,执行的顺序是不固定的。
任务结果的传递:
在 Prefect 中, 任务的返回值可以被后续的任务使用。 在上面的 示例代码 2 中, task_atask_b 各自返回了一个字符串。 我们可以修改 task_c 来使用这些返回值:
from prefect import task, flow, get_run_logger import time @task def task_a(): time.sleep(1) print("Task A finished") return "Result from A" @task def task_b(): time.sleep(1) print("Task B finished") return "Result from B" @task def task_c(result_a, result_b): time.sleep(1) print(f"Task C received: {result_a} and {result_b}") print("Task C finished") @flow def my_flow(): a = task_a() b = task_b() task_c.submit(result_a=a, result_b=b) # 使用 .submit() 提交, 并且传入依赖任务的返回值。 if __name__ == "__main__": my_flow()
在这个例子中, task_c 接收 result_aresult_b 两个参数, 它们分别是 task_atask_b 的返回值。 Prefect 会自动处理任务结果的传递, 你只需要在 flow 中将任务的结果作为参数传递给后续的任务即可。 关于任务结果的更多用法, 我们将在后续的文章中详细介绍。
鼓励读者动手实践,尝试改变任务的执行顺序,例如:
  • 在示例代码 1 中,调换 task_a()task_b() 的调用顺序,然后运行,观察执行顺序。
  • 在示例代码 2 中,尝试删除 wait_for=[a],然后运行,观察执行顺序。
  • 在示例代码 3 中, 尝试添加更多没有依赖关系的任务,并运行,观察执行情况。
  • 尝试修改代码,让 task_c 依赖于 task_atask_b,观察执行顺序。

五、任务的错误处理:让工作流更加健壮

在实际应用中,任务执行过程中可能会发生错误,我们需要对这些错误进行处理,才能保证工作流的健壮性。
Prefect 如何处理任务的错误呢? Prefect 默认会自动重试失败的任务 (当然, 你可以设置禁止重试)。 我们可以使用 @task 装饰器的 retryretries 参数来设置重试策略, 例如指定重试的次数, 重试的时间间隔等等。 也可以使用 on_failure 参数定义任务失败时的回调函数, 例如发送告警信息,记录错误日志等等。
除了 retries 参数, Prefect 还支持其他错误处理方式, 例如:
  • 使用 try...except 语句捕获任务中的异常,并进行处理。
  • 使用 on_failure 参数定义任务失败时的回调函数,例如发送告警信息,记录错误日志等等。
  • 使用 state_handlers 参数自定义任务状态的处理逻辑,例如在任务成功、失败、重试等状态下执行不同的操作。
我们来看一个使用 retries 参数的简单例子, 定义一个可能会执行失败的任务, 并使用 @task 的参数设置重试策略:
from prefect import task, flow import random @task(retries=3) def flaky_task(): if random.random() < 0.5: raise Exception("Task failed!") print("Task succeeded!") @flow def flaky_flow(): flaky_task() if __name__ == "__main__": flaky_flow()
在这个例子中, flaky_task 有 50% 的概率会执行失败。 我们设置了 retries=3, 所以如果任务执行失败, Prefect 会自动重试, 最多重试3次。
错误处理的最佳实践:
  • 尽可能地捕获和处理所有可能出现的错误。
  • 记录详细的错误日志,方便排查问题。
  • 避免因为一个任务的失败而导致整个工作流崩溃。

总结:

本篇文章中, 我们学习了任务的定义,执行, 参数, 依赖关系和错误处理。 同时我们也学习了 Prefect 默认会并发执行没有依赖关系的任务, 并可以通过定义依赖关系来控制任务的执行顺序。 任务是构成工作流的基本单元,理解和掌握任务的各种用法,对于构建高效和健壮的工作流至关重要。
Prefect 提供了丰富的功能来帮助我们构建强大的工作流, 本篇文章只介绍了任务的一些基本用法, 还有更多高级用法等待你去探索! 不要害怕尝试, 勇敢地去探索 Prefect 的更多功能, 你会发现工作流编排的无限魅力!
接下来, 在下一篇文章中, 我们将深入学习流程 (Flow) 的概念和用法, 进一步探索工作流编排的奥秘。 继续加油! 让我们一起成为工作流编排的高手!