导语:
在之前的文章中,我们了解了什么是工作流编排,以及它的重要性。今天,我们终于要开始动手实践了!准备好你的键盘,让我们一起开启工作流编排的实践之旅!本篇文章的重点是 “动手”,我们将从理论走向实践,一步步地搭建你的第一个工作流。
一、回顾:
在开始实践之前,我们先来简单回顾一下工作流的核心概念:
- 工作流: 简单来说,工作流就是一系列有顺序、相互关联的步骤,用于完成特定目标。你可以把它想象成一条流水线,每个步骤都有其特定的任务,并且这些任务之间存在着依赖关系。
- 任务(Task): 任务是工作流中的最小执行单元,代表一个独立的、可重复执行的操作。
- 流程(Flow): 流程是由多个任务组成,描述了整个工作流程。流程可以理解为任务的容器,它定义了任务的执行顺序和相互关系。
- 依赖关系: 依赖关系定义了任务之间的执行顺序,例如,任务 B 可能需要等待任务 A 执行完成才能开始。
为了更好地理解这些概念,我们不妨再次回顾一下 “泡咖啡” 的流程:烧水、磨咖啡豆、冲泡咖啡。在这个例子中,烧水、磨咖啡豆、冲泡咖啡都是一个个独立的 任务; 而把这些任务按照一定的顺序组合起来,就形成了一个完整的 流程;同时,冲泡咖啡这个任务必须等到烧水这个任务完成之后才能开始, 这就体现了任务之间的 依赖关系 。
小挑战:
请回忆一下,我们上次提到的工作流在日常生活中的三个例子是什么? 试着用你自己的话,描述一下任务、流程和依赖关系的概念。
二、选择一款合适的编排框架:Prefect,Airflow,你喜欢哪个?
为了让工作流自动化,我们需要使用工作流编排框架。在之前的文章中,我们介绍了一些常见的工作流编排框架,例如 Prefect,Airflow,Luigi 以及一些云平台的编排服务。在本篇文章中,我们将使用 Prefect 框架,来搭建我们的第一个工作流。
选择 Prefect 的原因有很多,首先,它易于上手,对初学者友好;其次,它拥有现代化的 Python API,使用起来非常方便;最后,它功能强大,能够满足我们的入门需求,并且为后续学习和深入打下坚实的基础。
当然,选择哪个框架最终还是取决于你的需求。如果你是 Python 开发者,并且希望快速上手,Prefect 是一个不错的选择。如果你需要处理大规模数据,或者需要更强大的调度功能,可以考虑 Airflow。如果你对云平台更熟悉,也可以尝试云平台的编排服务。重要的是选择适合自己需求和技术栈的工具,不要过分追求 “最好的”。
三、安装与配置
磨刀不误砍柴工,我们先来安装 Prefect,为接下来的实践做好准备。为了获得更快的安装速度,并体验更现代的 Python 开发流程,我们将使用
uv
来管理我们的 Python 环境和依赖。你可以通过以下步骤安装 Prefect 并配置
uv
环境:- 安装
uv
(如果尚未安装): - macOS/Linux: 使用
curl -fsSL <https://astral.sh/uv/install.sh> | sh
- Windows: 从
uv
的 GitHub Release 页面下载对应的可执行文件。
uv
可以通过以下方式安装(具体方法请参考 uv
官方文档):确保
uv
的可执行文件在你的 PATH 环境变量中。 你可以使用 uv --version
来验证是否安装成功。- 创建并激活虚拟环境 (推荐):
uv venv .venv # 在当前目录下创建名为 .venv 的虚拟环境 source .venv/bin/activate # Linux/Mac .venv\\Scripts\\activate # Windows ```3. **使用 `uv` 安装 Prefect:** ```bash uv pip install prefect
这将会安装 Prefect 框架及其必要的依赖包。
- 验证安装是否成功:
运行
prefect version
命令,如果能正确显示版本号,就表示安装成功了!本篇文章,我们先使用本地环境进行练习,后续会介绍 Prefect Cloud 和 Prefect Server 的使用。
常见问题:
- 如果
uv
命令无法找到,请确保它在你的 PATH 环境变量中。
- 如果安装速度仍然较慢,请检查你的网络连接。
- 如果出现其他错误信息,请参考
uv
或 Prefect 的官方文档。
版本兼容性提示:
Prefect 框架的版本更新较快,为了确保后续的代码示例能够正常运行,建议使用当前最新的稳定版本。你可以在 Prefect 官方文档中查看最新版本的安装说明。如果你的 Python 版本过旧,可能需要升级到 Python 3.7 或更高版本。 同时也请注意
uv
和 Prefect 版本的兼容性。四、编写你的第一个工作流代码
万事开头难,我们先从一个简单的
Hello, World!
工作流开始。复制下面的代码,粘贴到你的编辑器中,保存为 hello_world.py
,然后尝试运行它!from prefect import task, flow @task def hello_task(): print("Hello, World!") @flow def hello_flow(): hello_task() if __name__ == "__main__": hello_flow()
代码解释:
from prefect import task, flow
: 这行代码导入了 Prefect 中task
和flow
模块,它们是我们定义任务和流程的核心工具。
@task
:@task
装饰器将一个普通的 Python 函数hello_task
转换为一个可以在工作流中执行的任务。
def hello_task(): print("Hello, World!")
: 这是一个简单的 Python 函数,它打印 "Hello, World!", 它被@task
装饰器装饰后,成为了一个可在工作流中执行的任务。
@flow
:@flow
装饰器将一个普通的 Python 函数hello_flow
转换为一个可以包含多个任务的工作流。
def hello_flow(): hello_task()
: 这个函数定义了我们的工作流,它调用了之前定义的hello_task
任务。
if __name__ == "__main__": hello_flow()
: 这是 Python 的标准写法,用于直接运行这个脚本时调用hello_flow
流程,启动工作流。
运行结果分析:
当你运行这段代码时,你会在控制台中看到输出:
Hello, World!
, 这表明我们的任务已经成功执行了。 你可以思考一下,如果修改打印内容,运行结果会发生什么变化?预期结果:
当你成功运行这段代码后, 你会在终端看到类似这样的输出:
15:30:27.995 | INFO | prefect.engine - Created flow run 'crimson-jay' for flow 'hello-flow' 15:30:28.099 | INFO | prefect.task_runner - Executing 'hello_task' 15:30:28.101 | INFO | prefect.task_runner - 'hello_task' finished in 0.001 seconds Hello, World! 15:30:28.102 | INFO | prefect.engine - Completed flow run 'crimson-jay'
其中,
Hello, World!
是我们任务输出的结果,其他的为 Prefect 框架运行的日志。五、运行与监控:
运行你的第一个工作流的方式有很多种。 你可以使用
python hello_world.py
命令直接运行,也可以使用 Prefect 提供的命令运行:prefect flow run hello_world.py
。运行后,你可以在控制台中看到工作流的执行日志。 通过日志,你可以了解工作流的运行情况,例如任务的执行时间,以及是否有错误发生。
如果你安装了 Prefect UI,你可以在浏览器中查看工作流的执行状态和日志。 在后续的文章中,我们会更详细地介绍 Prefect UI 的使用方法。通过监控,我们可以实时了解工作流的运行状态,方便排查问题。
非可视化监控:
如果你没有安装 Prefect UI,你也可以使用 Prefect 命令行工具查看任务的执行状态:
prefect flow inspect hello_flow.py
(或者使用对应的流程名称)。 这将会输出关于流程和任务的详细信息,包括执行状态,运行时间等等。修改代码练习:
试着修改
hello_task
函数,让它输出 Hello, Prefect!
,然后重新运行工作流,看看结果是否发生了变化。你也可以尝试添加一个 print("This is my first flow!")
到 hello_flow
函数里面,看看运行结果。总结:
今天,我们一起搭建了第一个工作流,并且了解了 Prefect 框架的基本用法。实践出真知,不要忘记多动手尝试,你会发现工作流编排并没有想象中那么难。恭喜你,你已经成功迈出了学习工作流编排的第一步!
接下来,你可以尝试用你自己的代码,来构建一个简单的工作流。 别忘了,实践是最好的老师! 多动手尝试,你会发现工作流编排的乐趣无穷! 如果你遇到任何问题,都可以查阅 Prefect 的官方文档,或者在社区中寻求帮助。
下一篇文章,我们将深入学习任务的概念和用法,让你的工作流更加强大。继续加油!让我们一起探索工作流编排的更多可能性!