Flujo General de Modelación en Machine Learning: Guía Exhaustiva
Análisis Exploratorio y Limpieza de Datos
El análisis exploratorio de datos (EDA) constituye el primer paso fundamental en cualquier proyecto de machine learning, representando hasta el 80% del tiempo total del proyecto. Esta fase permite comprender la estructura, distribución y calidades de los datos antes de proceder con la modelación.
Técnicas de Análisis Exploratorio
El proceso comienza con la exploración de la estructura básica de los datos, incluyendo dimensionalidad, tipos de variables, y presencia de valores faltantes. Se debe examinar cuidadosamente las relaciones temporales para evitar data leakage, especialmente en datasets que contengan información temporal.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# Análisis inicial de estructura
def basic_data_exploration(df):
print("=== INFORMACIÓN BÁSICA DEL DATASET ===")
print(f"Dimensiones: {df.shape}")
print(f"Tipos de datos:\n{df.dtypes}")
print(f"Valores nulos:\n{df.isnull().sum()}")
# Estadísticas descriptivas
print("\n=== ESTADÍSTICAS DESCRIPTIVAS ===")
print(df.describe())
# Identificación de duplicados
print(f"\nFilas duplicadas: {df.duplicated().sum()}")
return df
Limpieza y Preparación de Datos
La limpieza de datos implica múltiples operaciones críticas para el éxito del modelo:
- Detección y corrección de valores anómalos: Utilizando técnicas como IQR (Rango Intercuartílico) o Z-score
- Tratamiento de valores faltantes: Mediante imputación KNN, medidas de tendencia central, o eliminación estratégica
- Deduplicación: Aplicando técnicas exactas, difusas y semánticas para identificar registros duplicados
def data_cleaning_pipeline(df, target_column):
# Tratamiento de outliers usando IQR
def remove_outliers_iqr(data, column):
Q1 = data[column].quantile(0.25)
Q3 = data[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return data[(data[column] >= lower_bound) & (data[column] <= upper_bound)]
# Imputación de valores faltantes
from sklearn.impute import KNNImputer
numeric_cols = df.select_dtypes(include=[np.number]).columns
imputer = KNNImputer(n_neighbors=5)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
# Eliminación de duplicados
df = df.drop_duplicates()
return df
Ingeniería de Características y Selección de Variables
La ingeniería de características es crucial para mejorar el rendimiento del modelo mediante la creación, transformación y selección de variables predictivas.
Creación de Características
def feature_engineering(df):
# Características polinomiales
from sklearn.preprocessing import PolynomialFeatures
# Interacciones entre características
df['feature_interaction'] = df['feature1'] * df['feature2']
# Características temporales (si aplica)
if 'timestamp' in df.columns:
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
df['month'] = pd.to_datetime(df['timestamp']).dt.month
# Transformaciones logarítmicas
for col in ['sales', 'revenue']:
if col in df.columns:
df[f'log_{col}'] = np.log1p(df[col])
return df
Selección de Características
La selección de características debe realizarse dentro de cada fold de validación cruzada para prevenir data leakage:
from sklearn.feature_selection import SelectKBest, f_classif, RFE
from sklearn.ensemble import RandomForestClassifier
def feature_selection_pipeline(X, y, method='univariate'):
if method == 'univariate':
# Selección univariada
selector = SelectKBest(score_func=f_classif, k=10)
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()]
elif method == 'rfe':
# Eliminación recursiva de características
estimator = RandomForestClassifier(random_state=42)
rfe = RFE(estimator, n_features_to_select=10)
X_selected = rfe.fit_transform(X, y)
selected_features = X.columns[rfe.support_]
return X_selected, selected_features
División Temporal de Datos
La división adecuada de datos es crítica para evitar data leakage y obtener estimaciones realistas del rendimiento del modelo.
Estrategias de División
from sklearn.model_selection import TimeSeriesSplit, train_test_split
def temporal_data_split(df, target_col, test_size=0.2, time_col=None):
"""
División respetando el orden temporal para evitar data leakage
"""
if time_col:
# División temporal estricta
df_sorted = df.sort_values(time_col)
split_idx = int(len(df_sorted) * (1 - test_size))
train_data = df_sorted.iloc[:split_idx]
test_data = df_sorted.iloc[split_idx:]
X_train = train_data.drop([target_col, time_col], axis=1)
y_train = train_data[target_col]
X_test = test_data.drop([target_col, time_col], axis=1)
y_test = test_data[target_col]
else:
# División estratificada estándar
X = df.drop(target_col, axis=1)
y = df[target_col]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, stratify=y, random_state=42
)
return X_train, X_test, y_train, y_test
# Validación cruzada temporal
def temporal_cross_validation(X, y, n_splits=5):
tscv = TimeSeriesSplit(n_splits=n_splits)
for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
X_train_fold = X.iloc[train_idx]
X_val_fold = X.iloc[val_idx]
y_train_fold = y.iloc[train_idx]
y_val_fold = y.iloc[val_idx]
yield X_train_fold, X_val_fold, y_train_fold, y_val_fold
Preprocesamiento: Escalado, Encoding, Imputación
El preprocesamiento debe aplicarse consistentemente entre entrenamiento y producción, calculando las transformaciones únicamente en los datos de entrenamiento.
Pipeline de Preprocesamiento
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
def create_preprocessing_pipeline(X_train):
# Identificar tipos de columnas
numeric_features = X_train.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = X_train.select_dtypes(include=['object']).columns.tolist()
# Pipeline para características numéricas
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Pipeline para características categóricas
categorical_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
])
# Combinador de pipelines
preprocessor = ColumnTransformer([
('num', numeric_pipeline, numeric_features),
('cat', categorical_pipeline, categorical_features)
])
return preprocessor
Implementación con Validación Cruzada
def preprocessing_with_cv(X, y, model, cv_folds=5):
from sklearn.model_selection import cross_val_score
# Crear pipeline completo
full_pipeline = Pipeline([
('preprocessor', create_preprocessing_pipeline(X)),
('model', model)
])
# Validación cruzada con preprocesamiento interno
cv_scores = cross_val_score(full_pipeline, X, y, cv=cv_folds, scoring='accuracy')
return cv_scores.mean(), cv_scores.std()
Selección y Entrenamiento de Modelos
La selección de modelos debe considerar el trade-off entre interpretabilidad y rendimiento, así como los requisitos específicos del negocio.
Comparación de Modelos
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import classification_report, roc_auc_score
def model_comparison(X_train, X_test, y_train, y_test, preprocessor):
models = {
'Logistic Regression': LogisticRegression(random_state=42),
'Random Forest': RandomForestClassifier(random_state=42),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
'SVM': SVC(probability=True, random_state=42)
}
results = {}
for name, model in models.items():
# Pipeline completo
pipeline = Pipeline([
('preprocessor', preprocessor),
('model', model)
])
# Entrenamiento
pipeline.fit(X_train, y_train)
# Predicciones
y_pred = pipeline.predict(X_test)
y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
# Métricas
auc = roc_auc_score(y_test, y_pred_proba)
results[name] = {
'model': pipeline,
'auc': auc,
'predictions': y_pred,
'probabilities': y_pred_proba
}
print(f"\n=== {name} ===")
print(f"AUC: {auc:.4f}")
print(classification_report(y_test, y_pred))
return results
Hiperparametrización con Validación Cruzada
La validación cruzada anidada es esencial para obtener estimaciones no sesgadas del rendimiento mientras se optimizan hiperparámetros.
Grid Search con Validación Cruzada Anidada
from sklearn.model_selection import GridSearchCV, cross_val_score
from sklearn.ensemble import RandomForestClassifier
def nested_cross_validation_hypertuning(X, y, param_grid, model, cv_outer=5, cv_inner=3):
"""
Validación cruzada anidada para hiperparametrización sin sesgo
"""
from sklearn.model_selection import KFold
# CV externo para evaluación final
outer_cv = KFold(n_splits=cv_outer, shuffle=True, random_state=42)
# CV interno para selección de hiperparámetros
inner_cv = KFold(n_splits=cv_inner, shuffle=True, random_state=42)
# Grid Search para hiperparámetros
grid_search = GridSearchCV(
estimator=model,
param_grid=param_grid,
cv=inner_cv,
scoring='roc_auc',
n_jobs=-1
)
# Pipeline completo con preprocesamiento
preprocessor = create_preprocessing_pipeline(X)
pipeline = Pipeline([
('preprocessor', preprocessor),
('model', grid_search)
])
# Validación cruzada externa
nested_scores = cross_val_score(
pipeline, X, y, cv=outer_cv, scoring='roc_auc'
)
# Entrenar en todos los datos para obtener mejores parámetros
pipeline.fit(X, y)
best_params = pipeline.named_steps['model'].best_params_
return {
'nested_cv_scores': nested_scores,
'mean_score': nested_scores.mean(),
'std_score': nested_scores.std(),
'best_params': best_params,
'final_model': pipeline
}
# Ejemplo de uso
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [None, 5, 10, 15],
'min_samples_split': [2, 5, 10]
}
model = RandomForestClassifier(random_state=42)
results = nested_cross_validation_hypertuning(X, y, param_grid, model)
Evaluación con Métricas Apropiadas
La evaluación debe incluir métricas relevantes para el problema de negocio y considerar el desbalance de clases si existe.
Métricas Comprehensivas
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix,
classification_report)
import matplotlib.pyplot as plt
def comprehensive_evaluation(y_true, y_pred, y_pred_proba, model_name="Model"):
"""
Evaluación comprehensiva con múltiples métricas
"""
metrics = {
'Accuracy': accuracy_score(y_true, y_pred),
'Precision': precision_score(y_true, y_pred, average='weighted'),
'Recall': recall_score(y_true, y_pred, average='weighted'),
'F1-Score': f1_score(y_true, y_pred, average='weighted'),
'ROC-AUC': roc_auc_score(y_true, y_pred_proba)
}
print(f"=== EVALUACIÓN: {model_name} ===")
for metric, value in metrics.items():
print(f"{metric}: {value:.4f}")
# Matriz de confusión
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title(f'Matriz de Confusión - {model_name}')
plt.xlabel('Predicción')
plt.ylabel('Valor Real')
plt.show()
# Reporte detallado
print(f"\nReporte de Clasificación:\n{classification_report(y_true, y_pred)}")
return metrics
Selección del Modelo Final y Preparación para Producción
La transición a producción requiere consideraciones de escalabilidad, monitoreo y mantenimiento.
Preparación para Producción
import joblib
import json
from datetime import datetime
class ProductionModel:
def __init__(self, model, preprocessor, feature_names, model_metadata):
self.model = model
self.preprocessor = preprocessor
self.feature_names = feature_names
self.model_metadata = model_metadata
self.deployment_date = datetime.now()
def predict(self, X):
"""Predicción con validación de características"""
# Validar características
if list(X.columns) != self.feature_names:
missing = set(self.feature_names) - set(X.columns)
extra = set(X.columns) - set(self.feature_names)
if missing:
raise ValueError(f"Características faltantes: {missing}")
if extra:
print(f"Advertencia: Características extra ignoradas: {extra}")
X = X[self.feature_names]
# Preprocesar y predecir
return self.model.predict(X)
def predict_proba(self, X):
"""Predicción de probabilidades"""
if list(X.columns) != self.feature_names:
X = X[self.feature_names]
return self.model.predict_proba(X)
def save_model(self, filepath):
"""Guardar modelo para producción"""
model_package = {
'model': self.model,
'preprocessor': self.preprocessor,
'feature_names': self.feature_names,
'metadata': self.model_metadata,
'deployment_date': self.deployment_date
}
joblib.dump(model_package, filepath)
print(f"Modelo guardado en: {filepath}")
@classmethod
def load_model(cls, filepath):
"""Cargar modelo desde archivo"""
model_package = joblib.load(filepath)
return cls(
model=model_package['model'],
preprocessor=model_package['preprocessor'],
feature_names=model_package['feature_names'],
model_metadata=model_package['metadata']
)
def prepare_production_model(best_model, X_train, feature_names, performance_metrics):
"""
Preparar modelo para producción con metadata
"""
metadata = {
'model_type': type(best_model).__name__,
'training_samples': len(X_train),
'features_count': len(feature_names),
'performance_metrics': performance_metrics,
'training_date': datetime.now().isoformat()
}
production_model = ProductionModel(
model=best_model,
preprocessor=create_preprocessing_pipeline(X_train),
feature_names=feature_names,
model_metadata=metadata
)
return production_model
Consideraciones Críticas
Prevención de Data Leakage
El data leakage es uno de los problemas más críticos en machine learning. Las estrategias de prevención incluyen:
def prevent_data_leakage_checklist():
"""
Lista de verificación para prevenir data leakage
"""
checklist = {
"División temporal correcta": "✓ Datos futuros nunca en entrenamiento",
"Preprocesamiento en CV": "✓ Transformaciones calculadas solo en train",
"Validación de características": "✓ No features que incluyan información del target",
"Cross-validation apropiada": "✓ Respeta orden temporal si aplica",
"Test set intacto": "✓ Nunca usado para decisiones de modelado"
}
for item, status in checklist.items():
print(f"{status} {item}")
return checklist
Monitoreo de Drift de Datos
El model drift requiere monitoreo continuo en producción:
from scipy.stats import ks_2samp
import warnings
class DataDriftMonitor:
def __init__(self, reference_data, threshold=0.05):
self.reference_data = reference_data
self.threshold = threshold
self.drift_history = []
def detect_drift(self, new_data, feature_cols):
"""
Detectar drift usando test Kolmogorov-Smirnov
"""
drift_detected = {}
for col in feature_cols:
if col in self.reference_data.columns and col in new_data.columns:
# Test KS para detectar cambios en distribución
statistic, p_value = ks_2samp(
self.reference_data[col].dropna(),
new_data[col].dropna()
)
drift_detected[col] = {
'p_value': p_value,
'drift': p_value < self.threshold,
'statistic': statistic
}
if p_value < self.threshold:
warnings.warn(f"Drift detectado en característica '{col}' (p-value: {p_value:.4f})")
self.drift_history.append({
'timestamp': datetime.now(),
'drift_results': drift_detected
})
return drift_detected
def get_drift_summary(self):
"""Resumen de drift detectado"""
if not self.drift_history:
return "No hay datos de drift disponibles"
latest_drift = self.drift_history[-1]['drift_results']
drifted_features = [col for col, result in latest_drift.items() if result['drift']]
return {
'total_features_monitored': len(latest_drift),
'features_with_drift': len(drifted_features),
'drifted_features': drifted_features,
'drift_percentage': len(drifted_features) / len(latest_drift) * 100
}
Trade-off Interpretabilidad vs Rendimiento
La decisión entre interpretabilidad y rendimiento debe basarse en los requisitos del negocio:
def interpretability_vs_performance_analysis():
"""
Framework para evaluar trade-off interpretabilidad vs rendimiento
"""
model_characteristics = {
'Linear Regression': {'interpretability': 9, 'performance': 6, 'complexity': 2},
'Decision Tree': {'interpretability': 8, 'performance': 7, 'complexity': 4},
'Random Forest': {'interpretability': 5, 'performance': 8, 'complexity': 6},
'XGBoost': {'interpretability': 4, 'performance': 9, 'complexity': 7},
'Neural Networks': {'interpretability': 2, 'performance': 9, 'complexity': 9}
}
# Criterios de selección según contexto
selection_criteria = {
'high_stakes_decisions': 'Priorizar interpretabilidad (>7)',
'production_efficiency': 'Balance interpretabilidad-rendimiento (5-7)',
'pure_performance': 'Priorizar rendimiento (>8)',
'regulatory_compliance': 'Máxima interpretabilidad (>8)'
}
return model_characteristics, selection_criteria
Escalabilidad y Recursos Computacionales
Las consideraciones de escalabilidad son críticas para el éxito en producción:
class ScalabilityPlanner:
def __init__(self, expected_data_volume, latency_requirements):
self.data_volume = expected_data_volume
self.latency_requirements = latency_requirements
def recommend_scaling_strategy(self):
"""
Recomendar estrategia de escalabilidad
"""
recommendations = []
# Basado en volumen de datos
if self.data_volume > 1e6: # > 1M registros
recommendations.append("Considerar procesamiento distribuido (Spark)")
recommendations.append("Implementar batch processing")
# Basado en latencia
if self.latency_requirements < 100: # < 100ms
recommendations.append("Optimizar modelo para inferencia rápida")
recommendations.append("Considerar model quantization")
recommendations.append("Implementar caching de predicciones")
# Estrategias generales
recommendations.extend([
"Monitoreo continuo de recursos",
"Auto-scaling basado en demanda",
"Load balancing para múltiples instancias",
"Evaluación periódica de performance"
])
return recommendations
def estimate_resources(self, model_size_mb, predictions_per_second):
"""
Estimar recursos necesarios
"""
# Estimaciones básicas
memory_gb = max(model_size_mb / 1024 * 2, 1) # 2x model size mínimo
cpu_cores = max(predictions_per_second / 100, 1) # ~100 pred/core/second
return {
'estimated_memory_gb': memory_gb,
'estimated_cpu_cores': cpu_cores,
'storage_requirements': f"{model_size_mb * 1.5}MB",
'monitoring_overhead': "10-15% recursos adicionales"
}
Conclusión
El flujo de modelación en machine learning es un proceso iterativo y complejo que requiere atención meticulosa a cada etapa. La implementación exitosa depende de:
- Prevención rigurosa de data leakage mediante división temporal adecuada y preprocesamiento consistente
- Validación cruzada anidada para hiperparametrización sin sesgo
- Monitoreo continuo de drift de datos y performance del modelo
- Balance consciente entre interpretabilidad y rendimiento según requisitos del negocio
- Consideraciones de escalabilidad desde el diseño inicial
La adopción de estas mejores prácticas asegura modelos robustos, confiables y preparados para entornos de producción exigentes, maximizando el valor de negocio mientras se mantiene la integridad técnica del sistema de machine learning.
Hi :)
Matemáticas
Vectores
Álgebra Lineal
Geometría Analítica
Producto Punto
Espacios Vectoriales
Ortogonalidad
Normalización
Funciones
Álgebra
Composición de Funciones
Función Inversa
Combinación de Funciones
Transformaciones Gráficas
Aplicaciones Económicas
Interés Compuesto
Proporcionalidad
R
Data
Machine Learning
Aprendizaje Supervisado
Inteligencia Artificial
Clasificación
Regresión
Deep Learning