⚡ MLOps实战训练营
学习进度
阶段 5: 自动化流水线(Prefect编排)
目标: 从70%到85%信心值 🎯
📅 第5阶段:自动化流水线(Prefect编排)
🎯 为什么需要自动化流水线?
手动执行ML流程是不可扩展的。想象一下每天都要手动跑数据处理→训练→验证→部署,你会累死。Prefect让整个流程一键执行,这是从"胜任个人项目"到"企业级工程师"的关键跃升。
职场意义:自动化能力是高级工程师必备技能,直接影响薪资水平
🔄 端到端自动化流程
Prefect 2.16+版本API有重大变更,本教程已更新为兼容版本。遇到API不兼容时,查看官方migration guide。
Prefect流水线实战问题集
❌ 问题1: Prefect服务器连接失败
先启动Prefect服务器: prefect server start
深层原因:Prefect 2.x需要服务器端来管理流程状态
最佳实践:生产环境中使用独立的Prefect Cloud或自建服务器
Failed to reach API at http://127.0.0.1:4200/api
❌ 问题2: 数据库迁移错误
删除~/.prefect目录,重新初始化数据库
预防措施:定期备份Prefect数据库,特别是在版本升级前
职场价值:数据库迁移问题在任何系统中都常见,解决它展现你的运维能力
Can't locate revision identified by 'xxxxx'
❌ 问题3: 任务失败但流水线继续执行
使用retries
参数和异常处理机制
高级技巧:设置告警机制,任务失败时发送邮件/Slack通知
实战意义:可靠的错误处理是生产系统的核心要求
没有正确配置任务依赖和错误处理
❌ 问题4: 流水线执行时间过长
使用Prefect UI查看每个任务的执行时间
优化策略:并行执行独立任务,使用缓存避免重复计算
举一反三:性能优化是高级工程师必备技能
上午:构建端到端流水线
"""
Day 4: Prefect自动化流水线
目标:将所有步骤串联成自动化pipeline
"""
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import subprocess
import pandas as pd
import joblib
import os
import yaml
from datetime import datetime, timedelta
import mlflow
from sklearn.metrics import mean_squared_error, r2_score
@task(name="检查数据更新", retries=2)
def check_data_freshness():
"""检查数据是否需要重新处理"""
raw_data_path = 'data/raw/california_housing.csv'
processed_config_path = 'data/processed/processing_config.yaml'
if not os.path.exists(processed_config_path):
print("📊 未找到处理配置,需要重新处理数据")
return True
# 检查原始数据修改时间
raw_modified = datetime.fromtimestamp(os.path.getmtime(raw_data_path))
# 检查处理配置时间
with open(processed_config_path, 'r') as f:
config = yaml.safe_load(f)
processed_time = datetime.fromisoformat(config['processing_date'])
if raw_modified > processed_time:
print("📊 检测到数据更新,需要重新处理")
return True
else:
print("📊 数据无需更新")
return False
@task(name="数据处理", retries=1)
def process_data():
"""运行数据处理脚本"""
print("🔄 开始数据处理...")
result = subprocess.run(
['python', 'src/day2_data_pipeline.py'],
capture_output=True,
text=True,
cwd=os.getcwd()
)
if result.returncode == 0:
print("✅ 数据处理成功")
return True
else:
print(f"❌ 数据处理失败: {result.stderr}")
raise Exception(f"数据处理失败: {result.stderr}")
@task(name="模型训练", retries=1)
def train_best_model():
"""训练最佳模型(基于之前的对比结果)"""
print("🚀 开始训练最佳模型...")
# 读取之前的对比结果,选择最佳模型
try:
results = pd.read_csv('models/comparison_results.csv')
best_model_name = results.loc[results['val_r2'].idxmax(), 'model_name']
except:
best_model_name = 'random_forest' # 默认选择
print(f"🎯 训练模型: {best_model_name}")
result = subprocess.run(
['python', 'src/day3_model_comparison.py', '--model-type', best_model_name],
capture_output=True,
text=True,
cwd=os.getcwd()
)
if result.returncode == 0:
print(f"✅ {best_model_name} 模型训练成功")
return best_model_name
else:
print(f"❌ 模型训练失败: {result.stderr}")
raise Exception(f"模型训练失败: {result.stderr}")
@task(name="模型验证", retries=1)
def validate_model(model_name):
"""验证模型性能"""
print("🔍 验证模型性能...")
# 加载测试数据
X_test = pd.read_csv('data/processed/X_test.csv')
y_test = pd.read_csv('data/processed/y_test.csv').values.ravel()
# 加载模型
model_path = f'models/{model_name}/model.pkl'
if not os.path.exists(model_path):
raise Exception(f"模型文件不存在: {model_path}")
model = joblib.load(model_path)
# 预测
predictions = model.predict(X_test)
# 计算指标
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)
# 设置性能阈值
min_r2 = 0.6 # 最低R²要求
max_mse = 1.0 # 最高MSE要求
if r2 >= min_r2 and mse <= max_mse:
print(f"✅ 模型验证通过 - R²: {r2:.4f}, MSE: {mse:.4f}")
return {
'model_name': model_name,
'r2': r2,
'mse': mse,
'validation_passed': True
}
else:
print(f"❌ 模型验证失败 - R²: {r2:.4f}, MSE: {mse:.4f}")
raise Exception(f"模型性能不达标: R²={r2:.4f} < {min_r2}, MSE={mse:.4f} > {max_mse}")
@task(name="模型注册", retries=1)
def register_model(validation_result):
"""将验证通过的模型注册到MLflow"""
print("📝 注册模型到MLflow...")
model_name = validation_result['model_name']
# 设置MLflow
mlflow.set_experiment("production-models")
with mlflow.start_run(run_name=f"production-{model_name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"):
# 加载模型
model = joblib.load(f'models/{model_name}/model.pkl')
# 记录生产指标
mlflow.log_metric("production_r2", validation_result['r2'])
mlflow.log_metric("production_mse", validation_result['mse'])
mlflow.log_param("model_type", model_name)
mlflow.log_param("deployment_ready", True)
# 注册模型
model_uri = mlflow.sklearn.log_model(
model,
"model",
registered_model_name="house_price_production_model"
).model_uri
print(f"✅ 模型已注册: {model_uri}")
return model_uri
@task(name="生成报告")
def generate_pipeline_report(validation_result, model_uri):
"""生成流水线执行报告"""
print("📊 生成执行报告...")
report = {
'pipeline_run_time': datetime.now().isoformat(),
'model_performance': validation_result,
'model_uri': model_uri,
'deployment_status': 'ready',
'next_scheduled_run': (datetime.now() + timedelta(days=1)).isoformat()
}
# 保存报告
os.makedirs('reports', exist_ok=True)
report_path = f"reports/pipeline_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.yaml"
with open(report_path, 'w') as f:
yaml.dump(report, f, default_flow_style=False)
print(f"✅ 报告已保存: {report_path}")
return report_path
@flow(name="MLOps-Pipeline", task_runner=ConcurrentTaskRunner())
def mlops_pipeline():
"""完整的MLOps流水线"""
print("🚀 启动MLOps自动化流水线...")
print("=" * 60)
# 1. 检查数据是否需要更新
need_update = check_data_freshness()
# 2. 如果需要,处理数据
if need_update:
process_data()
# 3. 训练最佳模型
model_name = train_best_model()
# 4. 验证模型性能
validation_result = validate_model(model_name)
# 5. 注册模型
model_uri = register_model(validation_result)
# 6. 生成报告
report_path = generate_pipeline_report(validation_result, model_uri)
print("\n🎉 MLOps流水线执行完成!")
print(f"✅ 最佳模型: {validation_result['model_name']}")
print(f"✅ 性能指标: R²={validation_result['r2']:.4f}")
print(f"✅ 模型URI: {model_uri}")
print(f"✅ 执行报告: {report_path}")
return {
'success': True,
'model_name': validation_result['model_name'],
'performance': validation_result,
'model_uri': model_uri,
'report_path': report_path
}
if __name__ == "__main__":
result = mlops_pipeline()
print(f"\n🎯 Pipeline Result: {result}")
晚上:调度和监控设置
"""
设置定时调度和监控 - Prefect 2.16+兼容版
"""
from prefect import flow
from day4_prefect_pipeline import mlops_pipeline
from datetime import datetime
@flow(name="scheduled-mlops-pipeline", log_prints=True)
def scheduled_mlops_pipeline():
"""带调度的MLOps流水线"""
print(f"🕘 定时任务开始执行: {datetime.now()}")
# 调用原始流水线
result = mlops_pipeline()
print(f"🎉 定时任务执行完成: {datetime.now()}")
return result
def test_scheduling():
"""测试调度功能"""
print("🧪 测试调度功能...")
# 直接运行一次作为测试
result = scheduled_mlops_pipeline()
if result and result.get('success'):
print("✅ 调度测试成功!")
return True
else:
print("❌ 调度测试失败")
return False
if __name__ == "__main__":
print("🎯 Day 4: Prefect调度器 - 兼容版本")
print("=" * 50)
# 由于Prefect 2.16+版本变更,使用简化的调度方式
print("⚠️ 注意: Prefect 2.16+版本移除了Deployment类")
print("💡 使用以下替代方案:")
print()
print("方案1: 直接运行测试")
test_scheduling()
print()
print("方案2: 手动调度 (推荐)")
print(" python src/day4_prefect_pipeline.py")
print()
print("方案3: Cron调度 (生产环境)")
print(" 0 9 * * * cd ~/mlops-project && python src/day4_prefect_pipeline.py")
print()
print("🎉 Day 4 调度配置完成!")
运行和配置
# 1. 首先更新Prefect到兼容版本
cd ~/mlops-project
source ~/mlops-env/bin/activate
# 2. 启动Prefect服务器(新终端)
nohup prefect server start --host 0.0.0.0 >> prefect-server.log 2>&1 &
# 3. 运行一次测试流水线
python src/day4_prefect_pipeline.py
# 4. 测试调度功能(3种方案)
# 方案1: 直接测试调度
python src/day4_scheduler.py
# 方案2: 手动调度(最可靠)
python src/day4_prefect_pipeline.py
# 方案3: 设置Cron调度(生产环境)
# 创建调度脚本
cat >> scripts/daily_mlops.sh << 'EOF'
#!/bin/bash
cd ~/mlops-project
source ~/mlops-env/bin/activate
python src/day4_prefect_pipeline.py >> logs/pipeline.log 2>&1
EOF
chmod +x scripts/daily_mlops.sh
mkdir -p logs
# 添加到crontab (每天9点执行)
echo "0 9 * * * ~/mlops-project/scripts/daily_mlops.sh" | crontab -
# 5. 查看Prefect UI
# 浏览器访问: http://localhost:4200
# 6. 开放端口(云服务器)
#sudo ufw allow 4200/tcp
# 7. 提交Day 4成果
git add .
git commit -m "Day 4: Prefect automated pipeline (fixed version)
✅ Fixed Prefect 2.16+ compatibility issues
✅ Automated data processing, training, validation
✅ Model registration and performance reporting
✅ Multiple scheduling solutions provided
✅ Pipeline monitoring dashboard available"
故障排查指南
# 常见问题解决
# 1. Prefect版本问题
pip uninstall prefect
# 3. 检查Prefect服务状态
prefect server start --host 0.0.0.0
# 4. 查看流水线日志
tail -f logs/pipeline.log
# 5. 重置Prefect数据库(如果UI异常)
prefect server database reset
# 6. 检查端口占用
netstat -tlnp | grep 4200
端到端MLOps流水线自动化执行
Prefect工作流编排和依赖管理
实时监控和故障告警机制
定时任务和触发器配置
🏆 阶段 5 成就解锁
- 端到端自动化流水线运行成功
- Prefect UI显示完整流程
- 定时调度配置完成
- 信心值:70% → 85%
🚀 下一步预告
阶段 6: 将进行模型服务化,让你的模型真正为业务提供服务!
训练好的模型只是半成品,只有能通过API提供服务才有业务价值。服务化是ML工程师的核心技能,也是你薪资上涨的关键能力。