⚡ MLOps实战训练营

学习进度

阶段 5: 自动化流水线(Prefect编排)

目标: 从70%到85%信心值 🎯

📅 第5阶段:自动化流水线(Prefect编排)

🚀 自动化流水线 = 从"能跑模型"到"工程师"的关键跃升!

🎯 为什么需要自动化流水线?

手动执行ML流程是不可扩展的。想象一下每天都要手动跑数据处理→训练→验证→部署,你会累死。Prefect让整个流程一键执行,这是从"胜任个人项目""企业级工程师"的关键跃升。


职场意义:自动化能力是高级工程师必备技能,直接影响薪资水平

🔄 端到端自动化流程

📊 数据检查更新
🔧 数据预处理
🤖 模型训练
✅ 性能验证
📝 模型注册
📋 生成报告
版本兼容性说明

Prefect 2.16+版本API有重大变更,本教程已更新为兼容版本。遇到API不兼容时,查看官方migration guide。

Prefect流水线实战问题集

❌ 问题1: Prefect服务器连接失败

解决方案

先启动Prefect服务器: prefect server start

深层原因:Prefect 2.x需要服务器端来管理流程状态

最佳实践:生产环境中使用独立的Prefect Cloud或自建服务器

错误信息

Failed to reach API at http://127.0.0.1:4200/api

❌ 问题2: 数据库迁移错误

解决方案

删除~/.prefect目录,重新初始化数据库

预防措施:定期备份Prefect数据库,特别是在版本升级前

职场价值:数据库迁移问题在任何系统中都常见,解决它展现你的运维能力

错误信息

Can't locate revision identified by 'xxxxx'

❌ 问题3: 任务失败但流水线继续执行

解决方案

使用retries参数和异常处理机制

高级技巧:设置告警机制,任务失败时发送邮件/Slack通知

实战意义:可靠的错误处理是生产系统的核心要求

设计问题

没有正确配置任务依赖和错误处理

❌ 问题4: 流水线执行时间过长

解决方案

使用Prefect UI查看每个任务的执行时间

优化策略:并行执行独立任务,使用缓存避免重复计算

举一反三:性能优化是高级工程师必备技能

上午:构建端到端流水线

💡 所有代码块默认收起,点击标题栏可展开查看完整代码,支持一键复制
🐍 src/day4_prefect_pipeline.py
"""
Day 4: Prefect自动化流水线
目标:将所有步骤串联成自动化pipeline
"""

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import subprocess
import pandas as pd
import joblib
import os
import yaml
from datetime import datetime, timedelta
import mlflow
from sklearn.metrics import mean_squared_error, r2_score

@task(name="检查数据更新", retries=2)
def check_data_freshness():
    """检查数据是否需要重新处理"""
    raw_data_path = 'data/raw/california_housing.csv'
    processed_config_path = 'data/processed/processing_config.yaml'
    
    if not os.path.exists(processed_config_path):
        print("📊 未找到处理配置,需要重新处理数据")
        return True
    
    # 检查原始数据修改时间
    raw_modified = datetime.fromtimestamp(os.path.getmtime(raw_data_path))
    
    # 检查处理配置时间
    with open(processed_config_path, 'r') as f:
        config = yaml.safe_load(f)
    
    processed_time = datetime.fromisoformat(config['processing_date'])
    
    if raw_modified > processed_time:
        print("📊 检测到数据更新,需要重新处理")
        return True
    else:
        print("📊 数据无需更新")
        return False

@task(name="数据处理", retries=1)
def process_data():
    """运行数据处理脚本"""
    print("🔄 开始数据处理...")
    
    result = subprocess.run(
        ['python', 'src/day2_data_pipeline.py'],
        capture_output=True,
        text=True,
        cwd=os.getcwd()
    )
    
    if result.returncode == 0:
        print("✅ 数据处理成功")
        return True
    else:
        print(f"❌ 数据处理失败: {result.stderr}")
        raise Exception(f"数据处理失败: {result.stderr}")

@task(name="模型训练", retries=1)
def train_best_model():
    """训练最佳模型(基于之前的对比结果)"""
    print("🚀 开始训练最佳模型...")
    
    # 读取之前的对比结果,选择最佳模型
    try:
        results = pd.read_csv('models/comparison_results.csv')
        best_model_name = results.loc[results['val_r2'].idxmax(), 'model_name']
    except:
        best_model_name = 'random_forest'  # 默认选择
    
    print(f"🎯 训练模型: {best_model_name}")
    
    result = subprocess.run(
        ['python', 'src/day3_model_comparison.py', '--model-type', best_model_name],
        capture_output=True,
        text=True,
        cwd=os.getcwd()
    )
    
    if result.returncode == 0:
        print(f"✅ {best_model_name} 模型训练成功")
        return best_model_name
    else:
        print(f"❌ 模型训练失败: {result.stderr}")
        raise Exception(f"模型训练失败: {result.stderr}")

@task(name="模型验证", retries=1)
def validate_model(model_name):
    """验证模型性能"""
    print("🔍 验证模型性能...")
    
    # 加载测试数据
    X_test = pd.read_csv('data/processed/X_test.csv')
    y_test = pd.read_csv('data/processed/y_test.csv').values.ravel()
    
    # 加载模型
    model_path = f'models/{model_name}/model.pkl'
    if not os.path.exists(model_path):
        raise Exception(f"模型文件不存在: {model_path}")
    
    model = joblib.load(model_path)
    
    # 预测
    predictions = model.predict(X_test)
    
    # 计算指标
    mse = mean_squared_error(y_test, predictions)
    r2 = r2_score(y_test, predictions)
    
    # 设置性能阈值
    min_r2 = 0.6  # 最低R²要求
    max_mse = 1.0  # 最高MSE要求
    
    if r2 >= min_r2 and mse <= max_mse:
        print(f"✅ 模型验证通过 - R²: {r2:.4f}, MSE: {mse:.4f}")
        return {
            'model_name': model_name,
            'r2': r2,
            'mse': mse,
            'validation_passed': True
        }
    else:
        print(f"❌ 模型验证失败 - R²: {r2:.4f}, MSE: {mse:.4f}")
        raise Exception(f"模型性能不达标: R²={r2:.4f} < {min_r2}, MSE={mse:.4f} > {max_mse}")

@task(name="模型注册", retries=1)
def register_model(validation_result):
    """将验证通过的模型注册到MLflow"""
    print("📝 注册模型到MLflow...")
    
    model_name = validation_result['model_name']
    
    # 设置MLflow
    mlflow.set_experiment("production-models")
    
    with mlflow.start_run(run_name=f"production-{model_name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"):
        # 加载模型
        model = joblib.load(f'models/{model_name}/model.pkl')
        
        # 记录生产指标
        mlflow.log_metric("production_r2", validation_result['r2'])
        mlflow.log_metric("production_mse", validation_result['mse'])
        mlflow.log_param("model_type", model_name)
        mlflow.log_param("deployment_ready", True)
        
        # 注册模型
        model_uri = mlflow.sklearn.log_model(
            model,
            "model",
            registered_model_name="house_price_production_model"
        ).model_uri
        
        print(f"✅ 模型已注册: {model_uri}")
        return model_uri

@task(name="生成报告")
def generate_pipeline_report(validation_result, model_uri):
    """生成流水线执行报告"""
    print("📊 生成执行报告...")
    
    report = {
        'pipeline_run_time': datetime.now().isoformat(),
        'model_performance': validation_result,
        'model_uri': model_uri,
        'deployment_status': 'ready',
        'next_scheduled_run': (datetime.now() + timedelta(days=1)).isoformat()
    }
    
    # 保存报告
    os.makedirs('reports', exist_ok=True)
    report_path = f"reports/pipeline_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.yaml"
    
    with open(report_path, 'w') as f:
        yaml.dump(report, f, default_flow_style=False)
    
    print(f"✅ 报告已保存: {report_path}")
    return report_path

@flow(name="MLOps-Pipeline", task_runner=ConcurrentTaskRunner())
def mlops_pipeline():
    """完整的MLOps流水线"""
    print("🚀 启动MLOps自动化流水线...")
    print("=" * 60)
    
    # 1. 检查数据是否需要更新
    need_update = check_data_freshness()
    
    # 2. 如果需要,处理数据
    if need_update:
        process_data()
    
    # 3. 训练最佳模型
    model_name = train_best_model()
    
    # 4. 验证模型性能
    validation_result = validate_model(model_name)
    
    # 5. 注册模型
    model_uri = register_model(validation_result)
    
    # 6. 生成报告
    report_path = generate_pipeline_report(validation_result, model_uri)
    
    print("\n🎉 MLOps流水线执行完成!")
    print(f"✅ 最佳模型: {validation_result['model_name']}")
    print(f"✅ 性能指标: R²={validation_result['r2']:.4f}")
    print(f"✅ 模型URI: {model_uri}")
    print(f"✅ 执行报告: {report_path}")
    
    return {
        'success': True,
        'model_name': validation_result['model_name'],
        'performance': validation_result,
        'model_uri': model_uri,
        'report_path': report_path
    }

if __name__ == "__main__":
    result = mlops_pipeline()
    print(f"\n🎯 Pipeline Result: {result}")

晚上:调度和监控设置

🐍 src/day4_scheduler.py
"""
设置定时调度和监控 - Prefect 2.16+兼容版
"""

from prefect import flow
from day4_prefect_pipeline import mlops_pipeline
from datetime import datetime

@flow(name="scheduled-mlops-pipeline", log_prints=True)
def scheduled_mlops_pipeline():
    """带调度的MLOps流水线"""
    print(f"🕘 定时任务开始执行: {datetime.now()}")
    
    # 调用原始流水线
    result = mlops_pipeline()
    
    print(f"🎉 定时任务执行完成: {datetime.now()}")
    return result

def test_scheduling():
    """测试调度功能"""
    print("🧪 测试调度功能...")
    
    # 直接运行一次作为测试
    result = scheduled_mlops_pipeline()
    
    if result and result.get('success'):
        print("✅ 调度测试成功!")
        return True
    else:
        print("❌ 调度测试失败")
        return False

if __name__ == "__main__":
    print("🎯 Day 4: Prefect调度器 - 兼容版本")
    print("=" * 50)
    
    # 由于Prefect 2.16+版本变更,使用简化的调度方式
    print("⚠️  注意: Prefect 2.16+版本移除了Deployment类")
    print("💡 使用以下替代方案:")
    print()
    print("方案1: 直接运行测试")
    test_scheduling()
    
    print()
    print("方案2: 手动调度 (推荐)")
    print("  python src/day4_prefect_pipeline.py")
    
    print()
    print("方案3: Cron调度 (生产环境)")
    print("  0 9 * * * cd ~/mlops-project && python src/day4_prefect_pipeline.py")
    
    print()
    print("🎉 Day 4 调度配置完成!")

运行和配置

🔧 完整部署和测试流程
# 1. 首先更新Prefect到兼容版本
cd ~/mlops-project
source ~/mlops-env/bin/activate

# 2. 启动Prefect服务器(新终端)
nohup prefect server start --host 0.0.0.0 >> prefect-server.log 2>&1 &

# 3. 运行一次测试流水线
python src/day4_prefect_pipeline.py

# 4. 测试调度功能(3种方案)

# 方案1: 直接测试调度
python src/day4_scheduler.py

# 方案2: 手动调度(最可靠)
python src/day4_prefect_pipeline.py

# 方案3: 设置Cron调度(生产环境)
# 创建调度脚本
cat >> scripts/daily_mlops.sh << 'EOF'
#!/bin/bash
cd ~/mlops-project
source ~/mlops-env/bin/activate
python src/day4_prefect_pipeline.py >> logs/pipeline.log 2>&1
EOF

chmod +x scripts/daily_mlops.sh
mkdir -p logs

# 添加到crontab (每天9点执行)
echo "0 9 * * * ~/mlops-project/scripts/daily_mlops.sh" | crontab -

# 5. 查看Prefect UI
# 浏览器访问: http://localhost:4200

# 6. 开放端口(云服务器)
#sudo ufw allow 4200/tcp

# 7. 提交Day 4成果
git add .
git commit -m "Day 4: Prefect automated pipeline (fixed version)

✅ Fixed Prefect 2.16+ compatibility issues
✅ Automated data processing, training, validation
✅ Model registration and performance reporting  
✅ Multiple scheduling solutions provided
✅ Pipeline monitoring dashboard available"

故障排查指南

🔧 常见问题解决方案
# 常见问题解决

# 1. Prefect版本问题
pip uninstall prefect

# 3. 检查Prefect服务状态
prefect server start --host 0.0.0.0

# 4. 查看流水线日志
tail -f logs/pipeline.log

# 5. 重置Prefect数据库(如果UI异常)
prefect server database reset

# 6. 检查端口占用
netstat -tlnp | grep 4200
⚡ 流程自动化

端到端MLOps流水线自动化执行

🔧 任务编排

Prefect工作流编排和依赖管理

📊 流程监控

实时监控和故障告警机制

⏰ 调度管理

定时任务和触发器配置

🏆 阶段 5 成就解锁

  • 端到端自动化流水线运行成功
  • Prefect UI显示完整流程
  • 定时调度配置完成
  • 信心值:70% → 85%

🚀 下一步预告

阶段 6: 将进行模型服务化,让你的模型真正为业务提供服务!

训练好的模型只是半成品,只有能通过API提供服务才有业务价值。服务化是ML工程师的核心技能,也是你薪资上涨的关键能力。