Introduction
In machine learning, preprocessing data consistently across training and inference is crucial for model performance. This ensures that the model sees the same data transformations during both phases, preventing inconsistencies and unexpected behavior.
Amazon SageMaker provides a powerful platform for building and deploying machine learning models. This article will guide you through the process of using the same preprocessing code for both training and inference within SageMaker.
SageMaker Pipelines: The Key to Consistency
SageMaker Pipelines is a key component in ensuring consistent preprocessing. Pipelines allow you to define your machine learning workflow as a series of interconnected steps, including data preprocessing. This ensures that the same code is applied during both training and inference.
Defining a Preprocessing Step
1. Define a Preprocessing Function
First, you’ll need to create a Python function that encapsulates your preprocessing logic.
import pandas as pd from sklearn.preprocessing import StandardScaler def preprocess_data(data): """Preprocesses data by scaling numerical features.""" # Assuming data is a pandas DataFrame numerical_features = ['feature1', 'feature2'] # Replace with actual feature names scaler = StandardScaler() data[numerical_features] = scaler.fit_transform(data[numerical_features]) return data |
2. Packaging as a SageMaker Script
You need to package this function into a SageMaker script that can be executed on the SageMaker platform.
import pandas as pd from sklearn.preprocessing import StandardScaler def preprocess_data(data): """Preprocesses data by scaling numerical features.""" # Assuming data is a pandas DataFrame numerical_features = ['feature1', 'feature2'] # Replace with actual feature names scaler = StandardScaler() data[numerical_features] = scaler.fit_transform(data[numerical_features]) return data if __name__ == "__main__": # Example: Load data (replace with your actual data loading) data = pd.read_csv("data.csv") # Preprocess processed_data = preprocess_data(data) print("Processed data:") print(processed_data) |
Integration with SageMaker Pipelines
1. Create a Processing Step
In your SageMaker Pipeline, define a Processing step using the script you created:
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.sklearn.processing import SKLearnProcessor processor = SKLearnProcessor( framework_version="1.2.1", role=role, instance_count=1, instance_type="ml.m5.large", base_job_name="preprocess", ) # Define the processing step processing_step = processor.run( inputs=[ProcessingInput(source=train_data_source, destination="/opt/ml/processing/input")], outputs=[ProcessingOutput(output_name="processed_data", source="/opt/ml/processing/output")], code="preprocess.py", ) |
2. Training and Inference
You can now seamlessly integrate this preprocessing step into your training and inference pipelines:
- **Training:** Pass the output of the processing step as input to your training job.
- **Inference:** Include the preprocessing step in your inference pipeline, ensuring the same transformations are applied to incoming data.
Example
Below is an example of how to use the same preprocessing code for both training and inference in a SageMaker Pipeline:
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.sklearn import SKLearn from sagemaker.pipeline import Pipeline, PipelineModel from sagemaker.workflow.parameters import StringParameter from sagemaker.workflow.steps import ProcessingStep, TrainingStep from sagemaker.workflow.pipeline import Pipeline # Define pipeline parameters training_data_source = StringParameter(name="TrainingData", default_value="s3://bucket/data.csv") model_name = StringParameter(name="ModelName", default_value="my_model") # Define processing step (using the script you created) processor = SKLearnProcessor( framework_version="1.2.1", role=role, instance_count=1, instance_type="ml.m5.large", base_job_name="preprocess", ) processing_step = processor.run( inputs=[ProcessingInput(source=training_data_source, destination="/opt/ml/processing/input")], outputs=[ProcessingOutput(output_name="processed_data", source="/opt/ml/processing/output")], code="preprocess.py", ) # Define training step estimator = SKLearn( entry_point="train.py", role=role, instance_count=1, instance_type="ml.m5.large", framework_version="1.2.1", base_job_name="sklearn-trainer", ) training_step = TrainingStep( name="Training", estimator=estimator, inputs={"train": processing_step.properties.ProcessingOutputConfig.Outputs["processed_data"]}, ) # Define pipeline pipeline = Pipeline( name="preprocessing-pipeline", parameters=[training_data_source, model_name], steps=[processing_step, training_step], ) # Create the pipeline pipeline.create() # Start the pipeline execution pipeline.start() |
Benefits of Consistent Preprocessing
Using the same preprocessing code for both training and inference provides several significant benefits:
- Improved Model Performance: Ensures the model is trained and tested on consistent data, leading to better generalization and performance.
- Reduced Errors: Eliminates inconsistencies in data transformations that can introduce errors and unexpected behavior.
- Simplified Workflow: Centralizes preprocessing logic within a single pipeline, making it easier to manage and maintain.
Conclusion
By using SageMaker Pipelines and defining a dedicated preprocessing step, you can ensure consistent data transformations for both training and inference in your machine learning workflows. This leads to better model performance, fewer errors, and a streamlined development process.