Flujo General de Modelación en Machine Learning: Guía Exhaustiva

Análisis Exploratorio y Limpieza de Datos

El análisis exploratorio de datos (EDA) constituye el primer paso fundamental en cualquier proyecto de machine learning, representando hasta el 80% del tiempo total del proyecto. Esta fase permite comprender la estructura, distribución y calidades de los datos antes de proceder con la modelación.

Técnicas de Análisis Exploratorio

El proceso comienza con la exploración de la estructura básica de los datos, incluyendo dimensionalidad, tipos de variables, y presencia de valores faltantes. Se debe examinar cuidadosamente las relaciones temporales para evitar data leakage, especialmente en datasets que contengan información temporal.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Análisis inicial de estructura
def basic_data_exploration(df):
    print("=== INFORMACIÓN BÁSICA DEL DATASET ===")
    print(f"Dimensiones: {df.shape}")
    print(f"Tipos de datos:\n{df.dtypes}")
    print(f"Valores nulos:\n{df.isnull().sum()}")
    
    # Estadísticas descriptivas
    print("\n=== ESTADÍSTICAS DESCRIPTIVAS ===")
    print(df.describe())
    
    # Identificación de duplicados
    print(f"\nFilas duplicadas: {df.duplicated().sum()}")
    
    return df

Limpieza y Preparación de Datos

La limpieza de datos implica múltiples operaciones críticas para el éxito del modelo:

  • Detección y corrección de valores anómalos: Utilizando técnicas como IQR (Rango Intercuartílico) o Z-score
  • Tratamiento de valores faltantes: Mediante imputación KNN, medidas de tendencia central, o eliminación estratégica
  • Deduplicación: Aplicando técnicas exactas, difusas y semánticas para identificar registros duplicados
def data_cleaning_pipeline(df, target_column):
    # Tratamiento de outliers usando IQR
    def remove_outliers_iqr(data, column):
        Q1 = data[column].quantile(0.25)
        Q3 = data[column].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        return data[(data[column] >= lower_bound) & (data[column] <= upper_bound)]
    
    # Imputación de valores faltantes
    from sklearn.impute import KNNImputer
    
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    imputer = KNNImputer(n_neighbors=5)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    
    # Eliminación de duplicados
    df = df.drop_duplicates()
    
    return df

Ingeniería de Características y Selección de Variables

La ingeniería de características es crucial para mejorar el rendimiento del modelo mediante la creación, transformación y selección de variables predictivas.

Creación de Características

def feature_engineering(df):
    # Características polinomiales
    from sklearn.preprocessing import PolynomialFeatures
    
    # Interacciones entre características
    df['feature_interaction'] = df['feature1'] * df['feature2']
    
    # Características temporales (si aplica)
    if 'timestamp' in df.columns:
        df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
        df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
        df['month'] = pd.to_datetime(df['timestamp']).dt.month
    
    # Transformaciones logarítmicas
    for col in ['sales', 'revenue']:
        if col in df.columns:
            df[f'log_{col}'] = np.log1p(df[col])
    
    return df

Selección de Características

La selección de características debe realizarse dentro de cada fold de validación cruzada para prevenir data leakage:

from sklearn.feature_selection import SelectKBest, f_classif, RFE
from sklearn.ensemble import RandomForestClassifier

def feature_selection_pipeline(X, y, method='univariate'):
    if method == 'univariate':
        # Selección univariada
        selector = SelectKBest(score_func=f_classif, k=10)
        X_selected = selector.fit_transform(X, y)
        selected_features = X.columns[selector.get_support()]
        
    elif method == 'rfe':
        # Eliminación recursiva de características
        estimator = RandomForestClassifier(random_state=42)
        rfe = RFE(estimator, n_features_to_select=10)
        X_selected = rfe.fit_transform(X, y)
        selected_features = X.columns[rfe.support_]
    
    return X_selected, selected_features

División Temporal de Datos

La división adecuada de datos es crítica para evitar data leakage y obtener estimaciones realistas del rendimiento del modelo.

Estrategias de División

from sklearn.model_selection import TimeSeriesSplit, train_test_split

def temporal_data_split(df, target_col, test_size=0.2, time_col=None):
    """
    División respetando el orden temporal para evitar data leakage
    """
    if time_col:
        # División temporal estricta
        df_sorted = df.sort_values(time_col)
        split_idx = int(len(df_sorted) * (1 - test_size))
        
        train_data = df_sorted.iloc[:split_idx]
        test_data = df_sorted.iloc[split_idx:]
        
        X_train = train_data.drop([target_col, time_col], axis=1)
        y_train = train_data[target_col]
        X_test = test_data.drop([target_col, time_col], axis=1)
        y_test = test_data[target_col]
        
    else:
        # División estratificada estándar
        X = df.drop(target_col, axis=1)
        y = df[target_col]
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, stratify=y, random_state=42
        )
    
    return X_train, X_test, y_train, y_test

# Validación cruzada temporal
def temporal_cross_validation(X, y, n_splits=5):
    tscv = TimeSeriesSplit(n_splits=n_splits)
    
    for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
        X_train_fold = X.iloc[train_idx]
        X_val_fold = X.iloc[val_idx]
        y_train_fold = y.iloc[train_idx]
        y_val_fold = y.iloc[val_idx]
        
        yield X_train_fold, X_val_fold, y_train_fold, y_val_fold

Preprocesamiento: Escalado, Encoding, Imputación

El preprocesamiento debe aplicarse consistentemente entre entrenamiento y producción, calculando las transformaciones únicamente en los datos de entrenamiento.

Pipeline de Preprocesamiento

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer

def create_preprocessing_pipeline(X_train):
    # Identificar tipos de columnas
    numeric_features = X_train.select_dtypes(include=[np.number]).columns.tolist()
    categorical_features = X_train.select_dtypes(include=['object']).columns.tolist()
    
    # Pipeline para características numéricas
    numeric_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    # Pipeline para características categóricas
    categorical_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
    ])
    
    # Combinador de pipelines
    preprocessor = ColumnTransformer([
        ('num', numeric_pipeline, numeric_features),
        ('cat', categorical_pipeline, categorical_features)
    ])
    
    return preprocessor

Implementación con Validación Cruzada

def preprocessing_with_cv(X, y, model, cv_folds=5):
    from sklearn.model_selection import cross_val_score
    
    # Crear pipeline completo
    full_pipeline = Pipeline([
        ('preprocessor', create_preprocessing_pipeline(X)),
        ('model', model)
    ])
    
    # Validación cruzada con preprocesamiento interno
    cv_scores = cross_val_score(full_pipeline, X, y, cv=cv_folds, scoring='accuracy')
    
    return cv_scores.mean(), cv_scores.std()

Selección y Entrenamiento de Modelos

La selección de modelos debe considerar el trade-off entre interpretabilidad y rendimiento, así como los requisitos específicos del negocio.

Comparación de Modelos

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import classification_report, roc_auc_score

def model_comparison(X_train, X_test, y_train, y_test, preprocessor):
    models = {
        'Logistic Regression': LogisticRegression(random_state=42),
        'Random Forest': RandomForestClassifier(random_state=42),
        'Gradient Boosting': GradientBoostingClassifier(random_state=42),
        'SVM': SVC(probability=True, random_state=42)
    }
    
    results = {}
    
    for name, model in models.items():
        # Pipeline completo
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', model)
        ])
        
        # Entrenamiento
        pipeline.fit(X_train, y_train)
        
        # Predicciones
        y_pred = pipeline.predict(X_test)
        y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
        
        # Métricas
        auc = roc_auc_score(y_test, y_pred_proba)
        
        results[name] = {
            'model': pipeline,
            'auc': auc,
            'predictions': y_pred,
            'probabilities': y_pred_proba
        }
        
        print(f"\n=== {name} ===")
        print(f"AUC: {auc:.4f}")
        print(classification_report(y_test, y_pred))
    
    return results

Hiperparametrización con Validación Cruzada

La validación cruzada anidada es esencial para obtener estimaciones no sesgadas del rendimiento mientras se optimizan hiperparámetros.

Grid Search con Validación Cruzada Anidada

from sklearn.model_selection import GridSearchCV, cross_val_score
from sklearn.ensemble import RandomForestClassifier

def nested_cross_validation_hypertuning(X, y, param_grid, model, cv_outer=5, cv_inner=3):
    """
    Validación cruzada anidada para hiperparametrización sin sesgo
    """
    from sklearn.model_selection import KFold
    
    # CV externo para evaluación final
    outer_cv = KFold(n_splits=cv_outer, shuffle=True, random_state=42)
    # CV interno para selección de hiperparámetros
    inner_cv = KFold(n_splits=cv_inner, shuffle=True, random_state=42)
    
    # Grid Search para hiperparámetros
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        cv=inner_cv,
        scoring='roc_auc',
        n_jobs=-1
    )
    
    # Pipeline completo con preprocesamiento
    preprocessor = create_preprocessing_pipeline(X)
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('model', grid_search)
    ])
    
    # Validación cruzada externa
    nested_scores = cross_val_score(
        pipeline, X, y, cv=outer_cv, scoring='roc_auc'
    )
    
    # Entrenar en todos los datos para obtener mejores parámetros
    pipeline.fit(X, y)
    best_params = pipeline.named_steps['model'].best_params_
    
    return {
        'nested_cv_scores': nested_scores,
        'mean_score': nested_scores.mean(),
        'std_score': nested_scores.std(),
        'best_params': best_params,
        'final_model': pipeline
    }

# Ejemplo de uso
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 5, 10, 15],
    'min_samples_split': [2, 5, 10]
}

model = RandomForestClassifier(random_state=42)
results = nested_cross_validation_hypertuning(X, y, param_grid, model)

Evaluación con Métricas Apropiadas

La evaluación debe incluir métricas relevantes para el problema de negocio y considerar el desbalance de clases si existe.

Métricas Comprehensivas

from sklearn.metrics import (accuracy_score, precision_score, recall_score, 
                           f1_score, roc_auc_score, confusion_matrix, 
                           classification_report)
import matplotlib.pyplot as plt

def comprehensive_evaluation(y_true, y_pred, y_pred_proba, model_name="Model"):
    """
    Evaluación comprehensiva con múltiples métricas
    """
    metrics = {
        'Accuracy': accuracy_score(y_true, y_pred),
        'Precision': precision_score(y_true, y_pred, average='weighted'),
        'Recall': recall_score(y_true, y_pred, average='weighted'),
        'F1-Score': f1_score(y_true, y_pred, average='weighted'),
        'ROC-AUC': roc_auc_score(y_true, y_pred_proba)
    }
    
    print(f"=== EVALUACIÓN: {model_name} ===")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    
    # Matriz de confusión
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'Matriz de Confusión - {model_name}')
    plt.xlabel('Predicción')
    plt.ylabel('Valor Real')
    plt.show()
    
    # Reporte detallado
    print(f"\nReporte de Clasificación:\n{classification_report(y_true, y_pred)}")
    
    return metrics

Selección del Modelo Final y Preparación para Producción

La transición a producción requiere consideraciones de escalabilidad, monitoreo y mantenimiento.

Preparación para Producción

import joblib
import json
from datetime import datetime

class ProductionModel:
    def __init__(self, model, preprocessor, feature_names, model_metadata):
        self.model = model
        self.preprocessor = preprocessor
        self.feature_names = feature_names
        self.model_metadata = model_metadata
        self.deployment_date = datetime.now()
    
    def predict(self, X):
        """Predicción con validación de características"""
        # Validar características
        if list(X.columns) != self.feature_names:
            missing = set(self.feature_names) - set(X.columns)
            extra = set(X.columns) - set(self.feature_names)
            if missing:
                raise ValueError(f"Características faltantes: {missing}")
            if extra:
                print(f"Advertencia: Características extra ignoradas: {extra}")
            X = X[self.feature_names]
        
        # Preprocesar y predecir
        return self.model.predict(X)
    
    def predict_proba(self, X):
        """Predicción de probabilidades"""
        if list(X.columns) != self.feature_names:
            X = X[self.feature_names]
        return self.model.predict_proba(X)
    
    def save_model(self, filepath):
        """Guardar modelo para producción"""
        model_package = {
            'model': self.model,
            'preprocessor': self.preprocessor,
            'feature_names': self.feature_names,
            'metadata': self.model_metadata,
            'deployment_date': self.deployment_date
        }
        joblib.dump(model_package, filepath)
        print(f"Modelo guardado en: {filepath}")
    
    @classmethod
    def load_model(cls, filepath):
        """Cargar modelo desde archivo"""
        model_package = joblib.load(filepath)
        return cls(
            model=model_package['model'],
            preprocessor=model_package['preprocessor'],
            feature_names=model_package['feature_names'],
            model_metadata=model_package['metadata']
        )

def prepare_production_model(best_model, X_train, feature_names, performance_metrics):
    """
    Preparar modelo para producción con metadata
    """
    metadata = {
        'model_type': type(best_model).__name__,
        'training_samples': len(X_train),
        'features_count': len(feature_names),
        'performance_metrics': performance_metrics,
        'training_date': datetime.now().isoformat()
    }
    
    production_model = ProductionModel(
        model=best_model,
        preprocessor=create_preprocessing_pipeline(X_train),
        feature_names=feature_names,
        model_metadata=metadata
    )
    
    return production_model

Consideraciones Críticas

Prevención de Data Leakage

El data leakage es uno de los problemas más críticos en machine learning. Las estrategias de prevención incluyen:

def prevent_data_leakage_checklist():
    """
    Lista de verificación para prevenir data leakage
    """
    checklist = {
        "División temporal correcta": "✓ Datos futuros nunca en entrenamiento",
        "Preprocesamiento en CV": "✓ Transformaciones calculadas solo en train",
        "Validación de características": "✓ No features que incluyan información del target",
        "Cross-validation apropiada": "✓ Respeta orden temporal si aplica",
        "Test set intacto": "✓ Nunca usado para decisiones de modelado"
    }
    
    for item, status in checklist.items():
        print(f"{status} {item}")
    
    return checklist

Monitoreo de Drift de Datos

El model drift requiere monitoreo continuo en producción:

from scipy.stats import ks_2samp
import warnings

class DataDriftMonitor:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.drift_history = []
    
    def detect_drift(self, new_data, feature_cols):
        """
        Detectar drift usando test Kolmogorov-Smirnov
        """
        drift_detected = {}
        
        for col in feature_cols:
            if col in self.reference_data.columns and col in new_data.columns:
                # Test KS para detectar cambios en distribución
                statistic, p_value = ks_2samp(
                    self.reference_data[col].dropna(),
                    new_data[col].dropna()
                )
                
                drift_detected[col] = {
                    'p_value': p_value,
                    'drift': p_value < self.threshold,
                    'statistic': statistic
                }
                
                if p_value < self.threshold:
                    warnings.warn(f"Drift detectado en característica '{col}' (p-value: {p_value:.4f})")
        
        self.drift_history.append({
            'timestamp': datetime.now(),
            'drift_results': drift_detected
        })
        
        return drift_detected
    
    def get_drift_summary(self):
        """Resumen de drift detectado"""
        if not self.drift_history:
            return "No hay datos de drift disponibles"
        
        latest_drift = self.drift_history[-1]['drift_results']
        drifted_features = [col for col, result in latest_drift.items() if result['drift']]
        
        return {
            'total_features_monitored': len(latest_drift),
            'features_with_drift': len(drifted_features),
            'drifted_features': drifted_features,
            'drift_percentage': len(drifted_features) / len(latest_drift) * 100
        }

Trade-off Interpretabilidad vs Rendimiento

La decisión entre interpretabilidad y rendimiento debe basarse en los requisitos del negocio:

def interpretability_vs_performance_analysis():
    """
    Framework para evaluar trade-off interpretabilidad vs rendimiento
    """
    
    model_characteristics = {
        'Linear Regression': {'interpretability': 9, 'performance': 6, 'complexity': 2},
        'Decision Tree': {'interpretability': 8, 'performance': 7, 'complexity': 4},
        'Random Forest': {'interpretability': 5, 'performance': 8, 'complexity': 6},
        'XGBoost': {'interpretability': 4, 'performance': 9, 'complexity': 7},
        'Neural Networks': {'interpretability': 2, 'performance': 9, 'complexity': 9}
    }
    
    # Criterios de selección según contexto
    selection_criteria = {
        'high_stakes_decisions': 'Priorizar interpretabilidad (>7)',
        'production_efficiency': 'Balance interpretabilidad-rendimiento (5-7)',
        'pure_performance': 'Priorizar rendimiento (>8)',
        'regulatory_compliance': 'Máxima interpretabilidad (>8)'
    }
    
    return model_characteristics, selection_criteria

Escalabilidad y Recursos Computacionales

Las consideraciones de escalabilidad son críticas para el éxito en producción:

class ScalabilityPlanner:
    def __init__(self, expected_data_volume, latency_requirements):
        self.data_volume = expected_data_volume
        self.latency_requirements = latency_requirements
    
    def recommend_scaling_strategy(self):
        """
        Recomendar estrategia de escalabilidad
        """
        recommendations = []
        
        # Basado en volumen de datos
        if self.data_volume > 1e6:  # > 1M registros
            recommendations.append("Considerar procesamiento distribuido (Spark)")
            recommendations.append("Implementar batch processing")
        
        # Basado en latencia
        if self.latency_requirements < 100:  # < 100ms
            recommendations.append("Optimizar modelo para inferencia rápida")
            recommendations.append("Considerar model quantization")
            recommendations.append("Implementar caching de predicciones")
        
        # Estrategias generales
        recommendations.extend([
            "Monitoreo continuo de recursos",
            "Auto-scaling basado en demanda",
            "Load balancing para múltiples instancias",
            "Evaluación periódica de performance"
        ])
        
        return recommendations
    
    def estimate_resources(self, model_size_mb, predictions_per_second):
        """
        Estimar recursos necesarios
        """
        # Estimaciones básicas
        memory_gb = max(model_size_mb / 1024 * 2, 1)  # 2x model size mínimo
        cpu_cores = max(predictions_per_second / 100, 1)  # ~100 pred/core/second
        
        return {
            'estimated_memory_gb': memory_gb,
            'estimated_cpu_cores': cpu_cores,
            'storage_requirements': f"{model_size_mb * 1.5}MB",
            'monitoring_overhead': "10-15% recursos adicionales"
        }

Conclusión

El flujo de modelación en machine learning es un proceso iterativo y complejo que requiere atención meticulosa a cada etapa. La implementación exitosa depende de:

  1. Prevención rigurosa de data leakage mediante división temporal adecuada y preprocesamiento consistente
  2. Validación cruzada anidada para hiperparametrización sin sesgo
  3. Monitoreo continuo de drift de datos y performance del modelo
  4. Balance consciente entre interpretabilidad y rendimiento según requisitos del negocio
  5. Consideraciones de escalabilidad desde el diseño inicial

La adopción de estas mejores prácticas asegura modelos robustos, confiables y preparados para entornos de producción exigentes, maximizando el valor de negocio mientras se mantiene la integridad técnica del sistema de machine learning.

Flujo General de Modelación en Machine Learning - Guía Exhaustiva

Author

Juan Fuentes

Publish Date

07 - 15 - 2023