📊 MLOps实战训练营
学习进度
阶段 3: 数据版本管理(DVC实战)
目标: 从30%到50%信心值 🎯
📅 第3阶段:数据版本管理(DVC实战)
🚀 掌握DVC = 掌握数据科学项目的时光机器!
🎯 为什么需要DVC?
在实际项目中,数据会不断更新,模型会不断调优。DVC让你能回答"这个模型是用哪版数据训练的?"这种关键问题,这在生产环境中是必需的能力。
职场价值:掌握数据版本控制,你就超越了80%的数据科学家!
DVC常见问题速查手册
❌ 问题1: dvc: command not found
解决方案
pip install dvc[s3]==3.48.4
深层原因:DVC需要单独安装,不像git那样系统自带
职场价值:掌握数据版本管理,你就超越了80%的数据科学家
❌ 问题2: git add .dvc 失败
解决方案
确保先git init
,DVC依赖于Git
深层原因:DVC是Git的扩展,不是独立工具
举一反三:大多数MLOps工具都不是孤立的,理解工具间关系很重要
❌ 问题3: 大文件推送失败
解决方案
检查.dvcignore
配置,排除超大文件
深层原因:DVC默认限制单文件大小,防止仓库过大
最佳实践:生产中使用S3/Azure作为DVC远程存储
上午:DVC初始化
💡 所有代码块默认收起,点击标题栏可展开查看完整代码,支持一键复制
🔧 DVC环境配置
# 初始化DVC
cd ~/mlops-project
dvc init
# 配置本地远程存储(学习阶段)
mkdir -p /tmp/dvc-storage
dvc remote add -d local /tmp/dvc-storage
# 从Git移除数据文件
git rm --cached data/raw/california_housing.csv
git commit -m "Remove data file from Git tracking"
# DVC接管追踪数据文件
dvc add data/raw/california_housing.csv
# 提交DVC配置
git add data/raw/california_housing.csv.dvc
git add .gitignore
git commit -m "Add data file to DVC tracking"
# 查看生成的文件
ls -la data/raw/
cat data/raw/california_housing.csv.dvc
晚上:数据处理Pipeline
🐍 src/day2_data_pipeline.py
"""
Day 2: 数据处理Pipeline
目标:建立可重现的数据处理流程
"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, RobustScaler
import yaml
import os
import joblib
from datetime import datetime
def load_raw_data():
"""加载原始数据"""
print("📊 加载原始数据...")
df = pd.read_csv('data/raw/california_housing.csv')
print(f"原始数据形状: {df.shape}")
print(f"缺失值统计:")
print(df.isnull().sum())
return df
def clean_data(df):
"""数据清洗"""
print("🧹 数据清洗中...")
# 记录清洗前状态
before_shape = df.shape
# 1. 删除缺失值
df_clean = df.dropna()
# 2. 删除异常值(使用IQR方法)
for column in df_clean.select_dtypes(include=[np.number]).columns:
Q1 = df_clean[column].quantile(0.25)
Q3 = df_clean[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df_clean = df_clean[
(df_clean[column] >= lower_bound) &
(df_clean[column] <= upper_bound)
]
print(f"清洗前: {before_shape}")
print(f"清洗后: {df_clean.shape}")
print(f"删除了 {before_shape[0] - df_clean.shape[0]} 行数据")
return df_clean
def feature_engineering(df):
"""特征工程"""
print("⚒️ 特征工程中...")
df_fe = df.copy()
# 1. 创建新特征
df_fe['RoomsPerHousehold'] = df_fe['AveRooms'] / df_fe['AveOccup']
df_fe['BedroomsPerRoom'] = df_fe['AveBedrms'] / df_fe['AveRooms']
df_fe['PopulationPerHousehold'] = df_fe['Population'] / df_fe['HouseAge']
# 2. 对数变换(针对偏斜分布)
skewed_features = ['Population', 'AveOccup']
for feature in skewed_features:
if feature in df_fe.columns:
df_fe[f'{feature}_log'] = np.log1p(df_fe[feature])
print(f"特征工程后: {df_fe.shape}")
print(f"新增特征: {df_fe.shape[1] - df.shape[1]} 个")
return df_fe
def split_and_scale_data(df, test_size=0.2, val_size=0.1):
"""分割和标准化数据"""
print("✂️ 分割和标准化数据...")
# 分离特征和目标
X = df.drop('MedHouseVal', axis=1)
y = df['MedHouseVal']
# 第一次分割:训练+验证 vs 测试
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# 第二次分割:训练 vs 验证
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=val_size/(1-test_size), random_state=42
)
print(f"训练集: {X_train.shape}")
print(f"验证集: {X_val.shape}")
print(f"测试集: {X_test.shape}")
# 标准化特征(只在训练集上拟合)
scaler = RobustScaler() # 对异常值更鲁棒
X_train_scaled = pd.DataFrame(
scaler.fit_transform(X_train),
columns=X_train.columns,
index=X_train.index
)
X_val_scaled = pd.DataFrame(
scaler.transform(X_val),
columns=X_val.columns,
index=X_val.index
)
X_test_scaled = pd.DataFrame(
scaler.transform(X_test),
columns=X_test.columns,
index=X_test.index
)
return (X_train_scaled, X_val_scaled, X_test_scaled,
y_train, y_val, y_test, scaler)
def save_processed_data(X_train, X_val, X_test, y_train, y_val, y_test, scaler):
"""保存处理后的数据"""
print("💾 保存处理后的数据...")
os.makedirs('data/processed', exist_ok=True)
# 保存数据
X_train.to_csv('data/processed/X_train.csv', index=False)
X_val.to_csv('data/processed/X_val.csv', index=False)
X_test.to_csv('data/processed/X_test.csv', index=False)
y_train.to_csv('data/processed/y_train.csv', index=False)
y_val.to_csv('data/processed/y_val.csv', index=False)
y_test.to_csv('data/processed/y_test.csv', index=False)
# 保存标准化器
joblib.dump(scaler, 'data/processed/scaler.pkl')
# 保存数据处理配置
config = {
'processing_date': datetime.now().isoformat(),
'train_shape': list(X_train.shape),
'val_shape': list(X_val.shape),
'test_shape': list(X_test.shape),
'features': list(X_train.columns),
'scaler_type': 'RobustScaler'
}
with open('data/processed/processing_config.yaml', 'w') as f:
yaml.dump(config, f)
print("✅ 数据保存完成")
def main():
"""主处理流程"""
print("🎯 Day 2: 数据处理Pipeline")
print("=" * 50)
# 1. 加载原始数据
df = load_raw_data()
# 2. 数据清洗
df_clean = clean_data(df)
# 3. 特征工程
df_fe = feature_engineering(df_clean)
# 4. 分割和标准化
(X_train, X_val, X_test,
y_train, y_val, y_test, scaler) = split_and_scale_data(df_fe)
# 5. 保存处理后的数据
save_processed_data(X_train, X_val, X_test, y_train, y_val, y_test, scaler)
print("\n🎉 Day 2 完成!")
print("✅ 数据处理pipeline建立完成")
print("✅ 训练/验证/测试集准备就绪")
print("✅ 数据版本管理已配置")
if __name__ == "__main__":
main()
创建DVC Pipeline
📝 dvc.yaml - DVC Pipeline配置
# 创建配置文件 vi /root/mlops-project/dvc.yaml
stages:
data_processing:
cmd: python src/day2_data_pipeline.py
deps:
- src/day2_data_pipeline.py
- data/raw/california_housing.csv
outs:
- data/processed/X_train.csv
- data/processed/X_val.csv
- data/processed/X_test.csv
- data/processed/y_train.csv
- data/processed/y_val.csv
- data/processed/y_test.csv
- data/processed/scaler.pkl
- data/processed/processing_config.yaml
运行和提交
🔧 执行DVC Pipeline
# 运行数据处理
python src/day2_data_pipeline.py
# 运行DVC pipeline
dvc repro
# 推送数据到远程存储
dvc push
# 提交代码变更
git add .
git commit -m "Day 2: Data processing pipeline with DVC
- Implemented comprehensive data cleaning
- Added feature engineering (3 new features)
- Created train/val/test splits
- Added data versioning with DVC
- Pipeline reproducible with dvc repro"
🏆 阶段 3 成就解锁
- DVC数据版本管理工作正常
- 数据处理pipeline可重现
- 训练/验证/测试集准备完毕
- 信心值:30% → 50%
🚀 下一步预告
阶段 4: 将进行多模型对比实验(MLflow进阶),让你系统性地比较6种算法!
你将学会向客户证明:"我们测试了6种算法,随机森林在你的数据上效果最好,准确率提升了15%"。