Pipeline Module

Automated scikit-learn pipeline creation, version management, experiment tracking, and production-ready deployment capabilities.

Overview

The Pipeline module provides automated scikit-learn pipeline creation, version management, and comprehensive experiment tracking. It integrates seamlessly with preprocessing, AutoML, and deployment modules.

Basic Usage

from ai_ml_framework.pipeline import PipelineCreator

creator = PipelineCreator()
pipeline = creator.create_auto_pipeline(df, target_column='target')

Custom Pipeline

config = {
    'preprocessing': {'scaling': 'standard'},
    'model': {'type': 'random_forest'}
}
pipeline = creator.create_custom_pipeline(df, 'target', config)

PipelineCreator

class PipelineCreator()

Automated scikit-learn pipeline creation.

Key Features

  • Automated pipeline creation
  • Custom configuration support
  • Feature selection integration
  • Dimensionality reduction
  • Model optimization

Methods

create_auto_pipeline(df: pd.DataFrame, target_column: str, model: Any = None) -> Pipeline

Create automated pipeline.

Parameters:
  • df: Input DataFrame
  • target_column: Target column name
  • model: Custom model (optional)
Returns: Scikit-learn Pipeline
create_custom_pipeline(df: pd.DataFrame, target_column: str, config: Dict[str, Any]) -> Pipeline

Create custom pipeline with configuration.

Parameters:
  • df: Input DataFrame
  • target_column: Target column name
  • config: Pipeline configuration
Returns: Scikit-learn Pipeline
optimize_pipeline(pipeline: Pipeline, X: pd.DataFrame, y: pd.Series) -> Pipeline

Optimize pipeline hyperparameters.

Parameters:
  • pipeline: Base pipeline
  • X: Feature matrix
  • y: Target vector
Returns: Optimized pipeline

PipelineManager

class PipelineManager(workspace_dir: str = "ai_ml_workspace")

Advanced pipeline management with versioning.

Key Features

  • Pipeline registration and tracking
  • Version management
  • Experiment tracking
  • Performance comparison
  • Deployment management

Methods

register_pipeline(pipeline_id: str, name: str, config: Dict[str, Any], pipeline_object: Any, metrics: Dict[str, float] = None) -> str

Register pipeline in the system.

Parameters:
  • pipeline_id: Unique pipeline identifier
  • name: Pipeline name
  • config: Pipeline configuration
  • pipeline_object: Pipeline object
  • metrics: Performance metrics
Returns: Registered pipeline ID
load_pipeline(pipeline_id: str) -> Pipeline

Load pipeline from storage.

Parameters:
  • pipeline_id: Pipeline ID
Returns: Loaded pipeline
create_experiment(name: str, description: str, pipeline_id: str, config: Dict[str, Any], tags: List[str] = None) -> str

Create new experiment.

Parameters:
  • name: Experiment name
  • description: Experiment description
  • pipeline_id: Pipeline ID
  • config: Experiment configuration
  • tags: Experiment tags
Returns: Experiment ID

Examples

Automated Pipeline Creation

python
from ai_ml_framework.pipeline import PipelineCreator
import pandas as pd

# Load data
df = pd.read_csv('your_data.csv')

# Create automated pipeline
creator = PipelineCreator()
pipeline = creator.create_auto_pipeline(df, target_column='target')

print(f"Pipeline steps: {len(pipeline.steps)}")
for name, step in pipeline.steps:
    print(f"  {name}: {step.__class__.__name__}")

# Use pipeline
predictions = pipeline.predict(df.drop('target', axis=1))
print(f"Predictions: {predictions[:5]}")

Custom Pipeline Configuration

python
# Custom pipeline configuration
config = {
    'preprocessing': {
        'numeric_features': {
            'imputation': 'median',
            'scaling': 'standard'
        },
        'categorical_features': {
            'encoding': 'onehot'
        }
    },
    'feature_selection': {
        'method': 'selectkbest',
        'k': 10
    },
    'model': {
        'type': 'random_forest',
        'n_estimators': 200,
        'max_depth': 10
    }
}

custom_pipeline = creator.create_custom_pipeline(df, 'target', config)
print(f"Custom pipeline: {len(custom_pipeline.steps)} steps")

Pipeline Management

python
from ai_ml_framework.pipeline import PipelineManager

# Initialize manager
manager = PipelineManager()

# Register pipeline
pipeline_id = manager.register_pipeline(
    pipeline_id='pipeline_v1',
    name='Classification Pipeline v1',
    config=config,
    pipeline_object=pipeline,
    metrics={'accuracy': 0.95, 'f1': 0.93}
)

print(f"Registered pipeline: {pipeline_id}")

# Create experiment
experiment_id = manager.create_experiment(
    name='Classification Experiment',
    description='Testing classification pipeline',
    pipeline_id=pipeline_id,
    config=config,
    tags=['classification', 'production']
)

print(f"Created experiment: {experiment_id}")

# Load pipeline
loaded_pipeline = manager.load_pipeline(pipeline_id)
print(f"Loaded pipeline: {loaded_pipeline}")

# Get workspace summary
summary = manager.get_workspace_summary()
print(f"Workspace summary: {summary}")

Pipeline Comparison

python
# Create multiple pipelines
configs = [
    {'model': {'type': 'random_forest', 'n_estimators': 100}},
    {'model': {'type': 'xgboost', 'n_estimators': 100}},
    {'model': {'type': 'lightgbm', 'n_estimators': 100}}
]

pipelines = []
for i, config in enumerate(configs):
    pipeline = creator.create_custom_pipeline(df, 'target', config)
    pipeline_id = manager.register_pipeline(
        f'pipeline_v{i+1}',
        f'Pipeline {i+1}',
        config,
        pipeline
    )
    pipelines.append((pipeline_id, pipeline))

# Compare pipelines
from sklearn.metrics import accuracy_score
import numpy as np

results = {}
X_test = df.drop('target', axis=1)
y_test = df['target']

for pipeline_id, pipeline in pipelines:
    predictions = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    results[pipeline_id] = accuracy

print("Pipeline Comparison:")
for pipeline_id, accuracy in sorted(results.items(), key=lambda x: x[1], reverse=True):
    print(f"  {pipeline_id}: {accuracy:.3f}")

# Get best pipeline
best_pipeline_id = max(results.keys(), key=lambda x: results[x])
print(f"Best pipeline: {best_pipeline_id}")

Pipeline Optimization

python
# Create base pipeline
base_pipeline = creator.create_auto_pipeline(df, 'target')

# Optimize pipeline
optimized_pipeline = creator.optimize_pipeline(
    base_pipeline, 
    X_train, 
    y_train
)

# Compare performance
base_score = base_pipeline.score(X_test, y_test)
optimized_score = optimized_pipeline.score(X_test, y_test)

print(f"Base pipeline score: {base_score:.3f}")
print(f"Optimized pipeline score: {optimized_score:.3f}")
print(f"Improvement: {optimized_score - base_score:.3f}")

# Register optimized pipeline
optimized_id = manager.register_pipeline(
    'pipeline_optimized',
    'Optimized Classification Pipeline',
    config,
    optimized_pipeline,
    {'accuracy': optimized_score}
)