导语:
恭喜你! 经过前面六篇文章的学习, 你已经掌握了 Prefect 工作流编排框架的基础知识和高级技巧。 从任务 (Task) 和流程 (Flow) 的定义, 到调度、监控、版本控制, 再到动态映射、自定义组件、集成外部服务等等, 你已经具备了使用 Prefect 构建强大工作流的能力。 但是, 理论知识的学习固然重要, 只有将它们应用到实践中, 才能真正发挥其价值。 “纸上得来终觉浅,绝知此事要躬行。” 在本篇文章中, 我们将一起完成几个实战案例, 让你亲身体验 Prefect 在解决实际问题中的强大能力!
一、回顾:理论知识不如实践出真知
在前面的文章中, 我们学习了 Prefect 的方方面面:
- 第一篇: 我们了解了工作流的基本概念, 以及工作流编排框架的作用和优势。
- 第二篇: 我们认识到手动执行任务的诸多痛点, 以及 Prefect 如何自动化执行任务, 解放我们的双手。
- 第三篇: 我们动手搭建了第一个工作流, 迈出了实践的第一步。
- 第四篇: 我们深入学习了任务 (Task) 的各种用法, 包括参数传递、依赖关系、错误处理等等。
- 第五篇: 我们掌握了流程 (Flow) 的概念, 学会了如何使用流程来组织任务, 以及如何调度、监控和管理流程。
- 第六篇: 我们探索了 Prefect 的高级技巧, 包括动态映射、自定义组件、集成外部服务、结果持久化、缓存和 Artifacts 等, 让我们的工作流更加强大和灵活。
一路走来, 我们学习了很多 Prefect 的知识, 但只有将这些知识应用到实践中, 才能真正理解它们的精髓, 并将其转化为解决实际问题的能力。
二、案例一:自动备份数据库
问题描述:
数据是任何应用的核心资产, 定期备份数据库是防止数据丢失的重要措施。 手动备份数据库不仅繁琐, 而且容易出错, 难以保证备份的及时性和一致性。
手动备份数据库 +-----+ | | <-- 容易出错, 耗时耗力 +-----+ | | 数据丢失风险! v +---------+ | 数据库 | +---------+
解决方案:
我们可以使用 Prefect 构建一个工作流, 自动执行数据库备份操作, 并将备份文件上传到云存储, 从而实现数据库的自动备份。
+---------+ +-------------+ +------------+ | 数据库 | ----> | 备份任务 | ----> | 上传任务 | +---------+ +-------------+ +------------+ ^ | | | | | | v v | +-----------+ +----------+ | | 定时触发 | | 云存储 | +------------+ (Prefect) +------ -+ (例如 S3) | +-----------+ +----------+
步骤分解:
- 定义一个任务来执行数据库备份命令。 我们将使用
mysqldump
命令来执行数据库备份 (你可以根据自己的数据库类型选择合适的备份命令)。
- 定义一个任务来将备份文件上传到云存储。 我们将使用 AWS S3 作为云存储服务 (你可以根据自己的需求选择其他的云存储服务)。
- 定义一个流程来组织这两个任务, 并设置它们之间的依赖关系。 我们需要先执行备份任务, 然后再执行上传任务, 因此上传任务需要依赖备份任务。
- 使用
Deployment
部署流程, 并设置定时调度计划。 我们将设置一个定时调度计划, 让工作流每天凌晨自动执行。
代码示例:
from prefect import task, flow from prefect.deployments import Deployment from prefect.filesystems import S3 from prefect.server.schemas.schedules import CronSchedule from prefect_shell import ShellOperation import os from datetime import datetime from subprocess import CalledProcessError from prefect.blocks.database import SQLAlchemyCredentials # 使用 Blocks 安全地存储数据库和 AWS 连接信息 DATABASE_BLOCK_NAME = "your-database-block-name" # 替换成你的数据库 Block 名称 S3_BLOCK_NAME = "your-s3-block-name" # 替换成你的 S3 Block 名称 @task def backup_database(backup_dir="backup"): """执行数据库备份命令""" db_block = SQLAlchemyCredentials.load(DATABASE_BLOCK_NAME) # 加载数据库连接信息 # 如果备份目录不存在,则创建 if not os.path.exists(backup_dir): os.makedirs(backup_dir) backup_file = f"{backup_dir}/backup_{datetime.now().strftime('%Y%m%d%H%M%S')}.sql" # 使用 mysqldump 命令备份数据库 try: result = ShellOperation( commands=[ f"mysqldump -h {db_block.host} -P {db_block.port} -u {db_block.username} --password='{db_block.password}' {db_block.database} > {backup_file}" ], working_dir="." ).run() print(f"备份数据库成功: {result}") return backup_file except CalledProcessError as e: print(f"备份数据库失败: {e}") raise @task def upload_backup_to_s3(backup_file): """将备份文件上传到 S3""" s3_block = S3.load(S3_BLOCK_NAME) # 加载 S3 连接信息 try: destination_path = os.path.join("db_backups", os.path.basename(backup_file)) s3_block.upload_from_path(backup_file, destination_path) print(f"上传备份文件到 S3 成功: {backup_file}") except Exception as e: print(f"上传备份文件到 S3 失败: {e}") raise @flow(name="Backup Database") def backup_database_flow(): """备份数据库并将备份文件上传到 S3""" backup_file = backup_database() upload_backup_to_s3(backup_file) def build_deployment(): """创建 Deployment""" deployment = Deployment.build_from_flow( flow=backup_database_flow, name="backup_database_deployment", schedule=CronSchedule(cron="0 0 * * *"), # 每天凌晨执行 ) deployment.apply() if __name__ == "__main__": build_deployment()
代码解释:
- 我们使用了
prefect-shell
库来执行 shell 命令。
backup_database
任务使用mysqldump
命令备份数据库, 并将备份文件保存到本地目录。
upload_backup_to_s3
任务将备份文件上传到 S3。
backup_database_flow
流程组织这两个任务, 并设置了依赖关系。
build_deployment
函数创建了一个Deployment
, 并设置了每天凌晨执行的定时调度计划。
DATABASE_BLOCK_NAME
和S3_BLOCK_NAME
需要替换成你在 Prefect 中创建的相应Blocks
的名称。 你需要事先在 Prefect UI 中创建好SQLAlchemyCredentials
Block (用于存储数据库连接信息) 和S3
Block (用于存储 AWS S3 连接信息)。
运行和监控:
你可以通过
prefect deployment apply
命令部署工作流, 然后在 Prefect UI 中查看工作流的运行状态和日志信息。+---------------------+ | Prefect UI | | +-----------------+ | | | 运行状态 | | <-- 实时监控 | | 日志信息 | | | +-----------------+ | +---------------------+
要点总结:
- 我们使用 Prefect 执行了外部命令 (
mysqldump
) 来备份数据库。
- 我们使用 Prefect 的
Blocks
机制安全地连接到数据库和 AWS S3。
- 我们使用
CronSchedule
设置了定时调度计划, 让工作流每天凌晨自动执行。
- 我们可以通过 Prefect UI 监控工作流的执行状态和日志信息。
三、案例二:RAG 客服系统的工作流
问题描述:
传统的基于关键词匹配的客服系统难以准确理解用户的问题, 导致回答质量不高。 基于检索增强生成 (Retrieval-Augmented Generation, RAG) 的客服系统可以结合检索和生成技术, 提供更准确、更自然的回答。 我们需要一个工作流来定期更新 RAG 系统的知识库, 并处理用户的查询请求。
+----------+ | | | 知识库 | | | +----^-----+ | | 检索 | +--------+ 自然语言 +--------+ +------+ +--------+ | 用户 | ----------> | RAG系统 | ---+ 生成 +--------> | 回答 | 回答质量高 +--------+ +--------+ +------+ +--------+
解决方案:
我们将使用 Prefect 构建一个 RAG 客服系统的工作流, 包括知识库更新和查询处理两个主要流程。
步骤分解:
- 知识库更新流程:
- 定义一个任务来爬取最新的文档数据。
- 定义一个任务来清洗和预处理文档数据。
- 定义一个任务来构建或更新向量数据库 (例如使用 FAISS 或 Pinecone)。
- 定义一个流程来组织这些任务, 并设置它们之间的依赖关系。
- 使用
Deployment
部署流程, 并设置定时调度计划 (例如每周更新一次)。
- 查询处理流程:
- 定义一个任务来接收用户的查询请求。
- 定义一个任务来将用户问题转换为向量表示。
- 定义一个任务来检索向量数据库, 找到与问题最相关的文档片段。
- 定义一个任务来使用 LLM (例如 OpenAI 的 GPT 模型) 生成最终的回答。
- 定义一个任务来将回答返回给用户。
- 定义一个流程来组织这些任务, 并设置它们之间的依赖关系。
- 使用
Deployment
部署流程, 并配置 API 或 Webhook 触发器。
代码示例:
from prefect import task, flow from prefect.deployments import Deployment from prefect.server.schemas.schedules import IntervalSchedule from prefect.blocks.system import Secret from prefect.blocks.core import Block from typing import List import time # 模拟的向量数据库 Block (用于测试) class FakeVectorDatabaseBlock(Block): """ 用于测试的假的向量数据库 Block """ _block_type_name = "Fake Vector Database" def __init__(self, data=None): super().__init__() self.data = data or {} def query(self, vector: List[float], top_k: int = 5) -> List[str]: """ 模拟查询向量数据库 """ print(f"Querying vector database with vector: {vector}") # 在实际应用中, 这里应该使用真正的向量数据库进行查询 # 这里只是简单地返回一些模拟数据 return [f"Relevant document {i}" for i in range(top_k)] # 模拟的外部函数 (你需要根据你的实际情况来实现这些函数) def crawl_documents(): """模拟爬取文档""" print("Crawling documents...") return ["Document 1", "Document 2", "Document 3"] def preprocess_documents(documents): """模拟预处理文档""" print("Preprocessing documents...") return [doc.lower() for doc in documents] def build_vector_db(processed_documents, vector_db_block): """模拟构建向量数据库""" print("Building vector database...") # 在实际应用中, 这里应该使用真正的向量数据库构建逻辑 vector_db_block.data = {doc: [0.1, 0.2, 0.3] for doc in processed_documents} # 假设每个文档的向量都是 [0.1, 0.2, 0.3] def encode_question(question): """模拟将问题转换为向量""" print(f"Encoding question: {question}") # 在实际应用中, 这里应该使用真正的编码模型 return [0.1, 0.2, 0.3] # 假设问题的向量是 [0.1, 0.2, 0.3] def query_vector_db(question_vector, vector_db_block): """模拟查询向量数据库""" print(f"Querying vector database with vector: {question_vector}") # 在实际应用中, 这里应该使用真正的向量数据库进行查询 return vector_db_block.query(question_vector) def generate_answer(relevant_documents, question, api_key): """模拟使用 LLM 生成回答""" print(f"Generating answer based on relevant documents: {relevant_documents} and question: {question}") # 在实际应用中, 这里应该调用 LLM 的 API return f"The answer to your question '{question}' is: This is a simulated answer." def receive_question(): """模拟接收用户问题""" print("Receiving question...") return "What is the meaning of life?" def send_answer(answer): """模拟发送回答""" print(f"Sending answer: {answer}") # 使用 Blocks 安全地存储 API 密钥 VECTOR_DB_BLOCK_NAME = "your-vector-db-block-name" # 替换成你的向量数据库 Block 名称 (如果使用真的数据库) LLM_API_KEY_BLOCK_NAME = "your-llm-api-key-block-name" # 替换成你的 LLM API 密钥 Block 名称 # 在 flow 外部加载 Block vector_db_block = FakeVectorDatabaseBlock() # 使用模拟的 Block llm_api_key_block = Secret.load(LLM_API_KEY_BLOCK_NAME) # 知识库更新流程的任务 @task def crawl_documents_task(): """爬取最新的文档数据""" documents = crawl_documents() return documents @task def preprocess_documents_task(documents): """清洗和预处理文档数据""" processed_documents = preprocess_documents(documents) return processed_documents @task def build_vector_db_task(processed_documents): """构建或更新向量数据库""" build_vector_db(processed_documents, vector_db_block) @flow(name="Update Knowledge Base") def update_knowledge_base_flow(): """更新知识库的流程""" documents = crawl_documents_task() processed_documents = preprocess_documents_task(documents) build_vector_db_task(processed_documents) # 查询处理流程的任务 @task def receive_question_task(): """接收用户的查询请求""" question = receive_question() return question @task def encode_question_task(question): """将用户问题转换为向量表示""" question_vector = encode_question(question) return question_vector @task def query_vector_db_task(question_vector): """检索向量数据库, 找到与问题最相关的文档片段""" relevant_documents = query_vector_db(question_vector, vector_db_block) return relevant_documents @task def generate_answer_task(relevant_documents, question): """使用 LLM 生成最终的回答""" answer = generate_answer(relevant_documents, question, llm_api_key_block.get()) return answer @task def send_answer_task(answer): """将回答返回给用户""" send_answer(answer) @flow(name="Process Query") def process_query_flow(): """处理用户查询的流程""" question = receive_question_task() question_vector = encode_question_task(question) relevant_documents = query_vector_db_task(question_vector) answer = generate_answer_task(relevant_documents, question) send_answer_task(answer) def build_knowledge_base_deployment(): """创建知识库更新流程的 Deployment""" deployment = Deployment.build_from_flow( flow=update_knowledge_base_flow, name="update_knowledge_base", schedule=IntervalSchedule(interval=timedelta(weeks=1)), # 每周更新一次 ) deployment.apply() def build_query_deployment(): """创建查询处理流程的 Deployment""" deployment = Deployment.build_from_flow( flow=process_query_flow, name="process_query", # 配置 API 或 Webhook 触发器 # ... ) deployment.apply() if __name__ == "__main__": build_knowledge_base_deployment() build_query_deployment()
代码解释:
- 这个例子中, 我们定义了两个流程:
update_knowledge_base_flow
和process_query_flow
。
update_knowledge_base_flow
负责定期更新 RAG 系统的知识库, 包括爬取文档、预处理文档和构建向量数据库等步骤。
process_query_flow
负责处理用户的查询请求, 包括接收问题、编码问题、检索向量数据库、生成回答和发送回答等步骤。
知识库更新流程 查询处理流程 +----------+----------+ +----------+----------+ | 爬取文档 | | 接收问题 | +----------+----------+ +----------+----------+ | | v v +----------+----------+ +----------+----------+ | 预处理文档 | | 编码问题 | +----------+----------+ +----------+----------+ | | v v +----------+----------+ +-----> +----------+----------+ | 构建向量数据库 | ----- | 检索向量数据库 | +---------------------+ | +----------+----------+ | | | v | +----------+----------+ | | 生成回答 | | +----------+----------+ | | | v | +----------+----------+ +----- | 发送回答 | +---------------------+
- 我们使用了
Blocks
来安全地存储向量数据库和 LLM 的 API 密钥。
- 我们使用了
FakeVectorDatabaseBlock
作为示例, 你需要根据你的实际情况选择合适的向量数据库, 并创建相应的Block
。
- 我们为
update_knowledge_base_flow
设置了每周更新一次的定时调度计划, 为process_query_flow
配置了 API 或 Webhook 触发器 (具体配置方式取决于你的 API 或 Webhook 实现)。
- 代码中的以
your_
开头的函数 (your_rag_system
,your_api_or_webhook
) 和类 (VectorDatabaseBlock
) 都只是占位符,你需要根据你的实际情况来实现这些模块。
运行和监控:
你可以通过
prefect deployment apply
命令部署这两个流程, 然后在 Prefect UI 中查看流程的运行状态和日志信息。 你也可以通过发送请求到你的 API 或 Webhook 来测试查询处理流程。要点总结:
- 我们使用 Prefect 构建了一个 RAG 客服系统的工作流, 包括知识库更新和查询处理两个主要流程。
- 我们使用了 Prefect 的任务和流程来组织代码, 并设置了任务之间的依赖关系。
- 我们使用了
Blocks
来安全地存储敏感信息。
- 我们为不同的流程设置了不同的调度方式, 包括定时调度和事件触发。
- 我们可以通过 Prefect UI 监控工作流的执行状态和日志信息。
四、案例三:自动化机器学习模型训练
问题描述:
机器学习模型的训练通常需要耗费大量的时间和计算资源。 手动执行模型训练过程容易出错, 且难以跟踪和复现训练结果。
手动训练模型 +---------+ | 数据 | +---------+ | | 手动执行 v +---------+ | 模型训练 | <-- 耗时, 易出错, 难以复现 +---------+ | v +---------+ | 模型 | +---------+
解决方案:
我们可以使用 Prefect 构建一个工作流, 自动化机器学习模型的训练过程。
自动化模型训练 (Prefect) +---------+ +-----------+ +-----------+ +-----------+ +-----------+ | 数据 | ---> | 预处理任务 | ---> | 训练任务 | ---> | 评估任务 | ---> | 保存任务 | +---------+ +-----------+ +-----------+ +-----------+ +-----------+ ^ | | | | v | +----------+ | | Artifacts| +----------------->+ (Prefect UI)| +----------+
步骤分解:
- 定义一个任务来加载和预处理数据。
- 定义一个任务来训练模型 (例如使用 Scikit-learn 训练一个模型)。
- 定义一个任务来评估模型的性能。
- 定义一个任务来保存模型和评估结果 (例如将模型保存到文件, 将评估指标保存到数据库)。
- 定义一个流程来组织这些任务, 并设置它们之间的依赖关系。
- 使用
Deployment
部署流程, 并设置触发条件 (例如当新的训练数据可用时触发)。
代码示例:
from prefect import task, flow from prefect.deployments import Deployment from prefect.server.schemas.schedules import IntervalSchedule from prefect.artifacts import create_markdown_artifact from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score import joblib import pandas as pd from sklearn.preprocessing import StandardScaler # 模拟的数据加载和预处理函数 (你需要根据你的实际情况来实现这些函数) def load_data(data_path): """加载数据""" print(f"Loading data from {data_path}...") # 使用 pandas 加载数据 df = pd.read_csv(data_path) return df def preprocess_data(df): """预处理数据""" print("Preprocessing data...") # 假设最后一列是目标列 target_column = df.columns[-1] features = df.drop(columns=[target_column]) target = df[target_column] # 对特征进行标准化 scaler = StandardScaler() scaled_features = scaler.fit_transform(features) return {"features": scaled_features, "target": target} @task def load_and_preprocess_data_task(data_path): """加载和预处理数据""" data = load_data(data_path) preprocessed_data = preprocess_data(data) return preprocessed_data @task def train_model_task(data): """训练模型""" X_train, X_test, y_train, y_test = train_test_split( data["features"], data["target"], test_size=0.2, random_state=42 ) model = LogisticRegression() model.fit(X_train, y_train) return model, X_test, y_test @task def evaluate_model_task(model, X_test, y_test): """评估模型的性能""" y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) create_markdown_artifact( key="model-evaluation", markdown=f"# Model Accuracy: {accuracy:.4f}" ) return accuracy @task def save_model_and_results_task(model, accuracy, model_path="model.joblib"): """保存模型和评估结果""" joblib.dump(model, model_path) print(f"Model saved to {model_path}") # 将 accuracy 保存到数据库或文件中 print(f"Model accuracy: {accuracy}") @flow(name="Train Machine Learning Model") def train_model_flow(data_path="data.csv"): """自动化机器学习模型训练的流程""" data = load_and_preprocess_data_task(data_path) model, X_test, y_test = train_model_task(data) accuracy = evaluate_model_task(model, X_test, y_test) save_model_and_results_task(model, accuracy) def build_deployment(): """创建 Deployment""" deployment = Deployment.build_from_flow( flow=train_model_flow, name="train_model_deployment", # 设置触发条件, 例如当新的训练数据可用时触发 # ... ) deployment.apply() if __name__ == "__main__": build_deployment()
代码解释:
- 我们定义了四个任务:
load_and_preprocess_data_task
、train_model_task
、evaluate_model_task
和save_model_and_results_task
。
train_model_flow
流程组织这些任务, 并设置了它们之间的依赖关系。
- 我们使用
create_markdown_artifact
函数创建了一个Artifact
来记录模型的评估指标, 你可以在 Prefect UI 的 flow run 页面中找到model-evaluation
的Artifact
。
+------------------+ | Prefect UI | | +--------------+ | | | Artifacts | | | | +----------+| | | | | Model || | | | | Accuracy || | | | +----------+| | | +--------------+ | +------------------+
- 为了简化代码, 我们假设你已经有了
load_data
和preprocess_data
函数, 你需要根据你的实际情况来实现这些函数。
- 我们使用
LogisticRegression
作为示例模型, 你可以根据你的需求选择其他的机器学习模型。
- 代码中使用了
pandas
、joblib
和scikit-learn
库, 你需要确保已经安装了这些库 (可以使用uv pip install pandas joblib scikit-learn
安装)。
运行和监控:
你可以通过
prefect deployment apply
命令部署工作流, 然后在 Prefect UI 中查看工作流的运行状态和日志信息, 以及 Artifacts
。要点总结:
- 我们使用 Prefect 执行了机器学习模型的训练过程。
- 我们使用 Prefect 保存了训练好的模型和评估结果。
- 我们使用了
Artifacts
记录和展示任务执行结果, 例如模型的评估指标, 方便你在 Prefect UI 中查看。
- 我们可以设置事件触发器, 例如当新的训练数据可用时自动触发模型训练流程。
总结:
在本篇文章中, 我们通过三个实际案例: 自动备份数据库、RAG客服系统和自动化机器学习模型训练, 演示了如何使用 Prefect 解决实际问题。 这些案例涵盖了不同的领域和应用场景, 展示了 Prefect 在构建自动化工作流方面的强大能力。
"纸上得来终觉浅,绝知此事要躬行。" 希望这些案例能够帮助你更好地理解 Prefect 的用法, 并将其应用到你的工作中。 记住, 实践是学习的最好方式! 不断尝试, 你会发现 Prefect 的更多精彩之处。
在下一篇文章中, 我们将对整个系列文章进行总结, 并展望工作流编排的未来发展趋势。 敬请期待!