📊 MLOps实战训练营

学习进度

阶段 3: 数据版本管理(DVC实战)

目标: 从30%到50%信心值 🎯

📅 第3阶段:数据版本管理(DVC实战)

🚀 掌握DVC = 掌握数据科学项目的时光机器!

🎯 为什么需要DVC?

在实际项目中,数据会不断更新,模型会不断调优。DVC让你能回答"这个模型是用哪版数据训练的?"这种关键问题,这在生产环境中是必需的能力。


职场价值:掌握数据版本控制,你就超越了80%的数据科学家!

DVC常见问题速查手册

❌ 问题1: dvc: command not found

解决方案

pip install dvc[s3]==3.48.4

深层原因:DVC需要单独安装,不像git那样系统自带

职场价值:掌握数据版本管理,你就超越了80%的数据科学家

❌ 问题2: git add .dvc 失败

解决方案

确保先git init,DVC依赖于Git

深层原因:DVC是Git的扩展,不是独立工具

举一反三:大多数MLOps工具都不是孤立的,理解工具间关系很重要

❌ 问题3: 大文件推送失败

解决方案

检查.dvcignore配置,排除超大文件

深层原因:DVC默认限制单文件大小,防止仓库过大

最佳实践:生产中使用S3/Azure作为DVC远程存储

上午:DVC初始化

💡 所有代码块默认收起,点击标题栏可展开查看完整代码,支持一键复制
🔧 DVC环境配置
# 初始化DVC
cd ~/mlops-project
dvc init

# 配置本地远程存储(学习阶段)
mkdir -p /tmp/dvc-storage
dvc remote add -d local /tmp/dvc-storage

# 从Git移除数据文件
git rm --cached data/raw/california_housing.csv
git commit -m "Remove data file from Git tracking"

# DVC接管追踪数据文件
dvc add data/raw/california_housing.csv

# 提交DVC配置
git add data/raw/california_housing.csv.dvc
git add .gitignore
git commit -m "Add data file to DVC tracking"

# 查看生成的文件
ls -la data/raw/
cat data/raw/california_housing.csv.dvc

晚上:数据处理Pipeline

🐍 src/day2_data_pipeline.py
"""
Day 2: 数据处理Pipeline
目标:建立可重现的数据处理流程
"""

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, RobustScaler
import yaml
import os
import joblib
from datetime import datetime

def load_raw_data():
    """加载原始数据"""
    print("📊 加载原始数据...")
    df = pd.read_csv('data/raw/california_housing.csv')
    
    print(f"原始数据形状: {df.shape}")
    print(f"缺失值统计:")
    print(df.isnull().sum())
    
    return df

def clean_data(df):
    """数据清洗"""
    print("🧹 数据清洗中...")
    
    # 记录清洗前状态
    before_shape = df.shape
    
    # 1. 删除缺失值
    df_clean = df.dropna()
    
    # 2. 删除异常值(使用IQR方法)
    for column in df_clean.select_dtypes(include=[np.number]).columns:
        Q1 = df_clean[column].quantile(0.25)
        Q3 = df_clean[column].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        df_clean = df_clean[
            (df_clean[column] >= lower_bound) & 
            (df_clean[column] <= upper_bound)
        ]
    
    print(f"清洗前: {before_shape}")
    print(f"清洗后: {df_clean.shape}")
    print(f"删除了 {before_shape[0] - df_clean.shape[0]} 行数据")
    
    return df_clean

def feature_engineering(df):
    """特征工程"""
    print("⚒️ 特征工程中...")
    
    df_fe = df.copy()
    
    # 1. 创建新特征
    df_fe['RoomsPerHousehold'] = df_fe['AveRooms'] / df_fe['AveOccup']
    df_fe['BedroomsPerRoom'] = df_fe['AveBedrms'] / df_fe['AveRooms']
    df_fe['PopulationPerHousehold'] = df_fe['Population'] / df_fe['HouseAge']
    
    # 2. 对数变换(针对偏斜分布)
    skewed_features = ['Population', 'AveOccup']
    for feature in skewed_features:
        if feature in df_fe.columns:
            df_fe[f'{feature}_log'] = np.log1p(df_fe[feature])
    
    print(f"特征工程后: {df_fe.shape}")
    print(f"新增特征: {df_fe.shape[1] - df.shape[1]} 个")
    
    return df_fe

def split_and_scale_data(df, test_size=0.2, val_size=0.1):
    """分割和标准化数据"""
    print("✂️ 分割和标准化数据...")
    
    # 分离特征和目标
    X = df.drop('MedHouseVal', axis=1)
    y = df['MedHouseVal']
    
    # 第一次分割:训练+验证 vs 测试
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42
    )
    
    # 第二次分割:训练 vs 验证
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_size/(1-test_size), random_state=42
    )
    
    print(f"训练集: {X_train.shape}")
    print(f"验证集: {X_val.shape}")
    print(f"测试集: {X_test.shape}")
    
    # 标准化特征(只在训练集上拟合)
    scaler = RobustScaler()  # 对异常值更鲁棒
    X_train_scaled = pd.DataFrame(
        scaler.fit_transform(X_train),
        columns=X_train.columns,
        index=X_train.index
    )
    X_val_scaled = pd.DataFrame(
        scaler.transform(X_val),
        columns=X_val.columns,
        index=X_val.index
    )
    X_test_scaled = pd.DataFrame(
        scaler.transform(X_test),
        columns=X_test.columns,
        index=X_test.index
    )
    
    return (X_train_scaled, X_val_scaled, X_test_scaled, 
            y_train, y_val, y_test, scaler)

def save_processed_data(X_train, X_val, X_test, y_train, y_val, y_test, scaler):
    """保存处理后的数据"""
    print("💾 保存处理后的数据...")
    
    os.makedirs('data/processed', exist_ok=True)
    
    # 保存数据
    X_train.to_csv('data/processed/X_train.csv', index=False)
    X_val.to_csv('data/processed/X_val.csv', index=False)
    X_test.to_csv('data/processed/X_test.csv', index=False)
    
    y_train.to_csv('data/processed/y_train.csv', index=False)
    y_val.to_csv('data/processed/y_val.csv', index=False)
    y_test.to_csv('data/processed/y_test.csv', index=False)
    
    # 保存标准化器
    joblib.dump(scaler, 'data/processed/scaler.pkl')
    
    # 保存数据处理配置
    config = {
        'processing_date': datetime.now().isoformat(),
        'train_shape': list(X_train.shape),
        'val_shape': list(X_val.shape),
        'test_shape': list(X_test.shape),
        'features': list(X_train.columns),
        'scaler_type': 'RobustScaler'
    }
    
    with open('data/processed/processing_config.yaml', 'w') as f:
        yaml.dump(config, f)
    
    print("✅ 数据保存完成")

def main():
    """主处理流程"""
    print("🎯 Day 2: 数据处理Pipeline")
    print("=" * 50)
    
    # 1. 加载原始数据
    df = load_raw_data()
    
    # 2. 数据清洗
    df_clean = clean_data(df)
    
    # 3. 特征工程
    df_fe = feature_engineering(df_clean)
    
    # 4. 分割和标准化
    (X_train, X_val, X_test, 
     y_train, y_val, y_test, scaler) = split_and_scale_data(df_fe)
    
    # 5. 保存处理后的数据
    save_processed_data(X_train, X_val, X_test, y_train, y_val, y_test, scaler)
    
    print("\n🎉 Day 2 完成!")
    print("✅ 数据处理pipeline建立完成")
    print("✅ 训练/验证/测试集准备就绪")
    print("✅ 数据版本管理已配置")

if __name__ == "__main__":
    main()

创建DVC Pipeline

📝 dvc.yaml - DVC Pipeline配置
# 创建配置文件 vi /root/mlops-project/dvc.yaml
stages:
  data_processing:
    cmd: python src/day2_data_pipeline.py
    deps:
      - src/day2_data_pipeline.py
      - data/raw/california_housing.csv
    outs:
      - data/processed/X_train.csv
      - data/processed/X_val.csv
      - data/processed/X_test.csv
      - data/processed/y_train.csv
      - data/processed/y_val.csv
      - data/processed/y_test.csv
      - data/processed/scaler.pkl
      - data/processed/processing_config.yaml

运行和提交

🔧 执行DVC Pipeline
# 运行数据处理
python src/day2_data_pipeline.py

# 运行DVC pipeline
dvc repro

# 推送数据到远程存储
dvc push

# 提交代码变更
git add .
git commit -m "Day 2: Data processing pipeline with DVC

- Implemented comprehensive data cleaning
- Added feature engineering (3 new features)
- Created train/val/test splits
- Added data versioning with DVC
- Pipeline reproducible with dvc repro"

🏆 阶段 3 成就解锁

  • DVC数据版本管理工作正常
  • 数据处理pipeline可重现
  • 训练/验证/测试集准备完毕
  • 信心值:30% → 50%

🚀 下一步预告

阶段 4: 将进行多模型对比实验(MLflow进阶),让你系统性地比较6种算法!

你将学会向客户证明:"我们测试了6种算法,随机森林在你的数据上效果最好,准确率提升了15%"。