Pipelines API Reference¶
Pipelines in BatteryML are responsible for transforming raw time-series or summary data into machine-learning-ready Sample objects. They utilize a hash-based caching mechanism to avoid recomputing expensive features like ICA peaks.
Usage Example¶
from src.pipelines.ica_peaks import ICAPeaksPipeline
# Data objects should contain 'curves' and 'targets' keys
pipeline = ICAPeaksPipeline(num_peaks=3, sg_window=51)
samples = pipeline.fit_transform(raw_data)
sample ¶
Canonical Sample schema for battery degradation data.
This is the universal format that all pipelines must produce and all models consume.
Classes¶
Sample
dataclass
¶
Sample(meta: Dict[str, Any] = dict(), x: Union[torch.Tensor, Dict[str, torch.Tensor], np.ndarray, None] = None, y: Union[torch.Tensor, np.ndarray, None] = None, mask: Optional[torch.Tensor] = None, t: Optional[torch.Tensor] = None)
Universal format every pipeline must produce.
This contract enables: - Model-agnostic pipelines (swap models without changing data code) - Time-aware models (ODEs use delta_t; LSTMs ignore it) - Clean validation splits (meta contains grouping keys) - Composable features (x can be dict of multiple feature types)
Attributes:
| Name | Type | Description |
|---|---|---|
meta |
Dict[str, Any]
|
Metadata for splits, logging, debugging — not fed to model. Required keys: - 'experiment_id': int (1-5) - 'cell_id': str ('A', 'B', ..., 'H') - 'temperature_C': float (10, 25, 40) Optional keys: - 'set_idx': int (ageing set index) - 'rpt_id': int (RPT measurement index) - 'cycle_idx': int (absolute cycle number) - 'timestamp': float (seconds or days since experiment start) - 'delta_t': float (time since previous sample) - 'cumulative_throughput_Ah': float |
x |
Union[Tensor, Dict[str, Tensor], ndarray, None]
|
Features the model sees. Can be: - Tensor shape (feature_dim,): static features - Tensor shape (seq_len, feature_dim): sequences - Dict: {'summary': tensor, 'ica_peaks': tensor, ...} |
y |
Union[Tensor, ndarray, None]
|
Target values. Shape (1,) for SOH regression or (n_targets,) for multi-task. |
mask |
Optional[Tensor]
|
Optional boolean mask for variable-length sequences. Shape (seq_len,). |
t |
Optional[Tensor]
|
Optional time vector for ODE models. Shape (seq_len,). |
Attributes¶
Functions¶
clone ¶
to_device ¶
Move tensors to specified device (in-place).
Source code in src/pipelines/sample.py
to_tensor ¶
Convert numpy arrays to torch tensors (in-place).
Source code in src/pipelines/sample.py
base ¶
Base class for preprocessing pipelines.
Classes¶
BasePipeline ¶
Bases: ABC
Abstract base class for all preprocessing pipelines.
All pipelines must: 1. Accept raw data (DataFrames, arrays) 2. Output Sample objects with consistent structure 3. Support fit/transform pattern for scalers 4. Be cacheable (expensive computations)
Example usage
pipeline = SummarySetPipeline(include_arrhenius=True) train_samples = pipeline.fit_transform({'df': train_df}) test_samples = pipeline.transform({'df': test_df})
Functions¶
fit
abstractmethod
¶
Fit any scalers/normalizers on training data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary containing raw data (e.g., {'df': DataFrame}) |
required |
Returns:
| Name | Type | Description |
|---|---|---|
self |
BasePipeline
|
Fitted pipeline instance |
Source code in src/pipelines/base.py
fit_transform ¶
Fit and transform in one call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary containing raw data |
required |
Returns:
| Type | Description |
|---|---|
List[Sample]
|
List of Sample objects |
get_feature_names
abstractmethod
¶
Return list of feature names (for SHAP/interpretability).
Returns:
| Type | Description |
|---|---|
List[str]
|
List of feature name strings |
get_params ¶
Return pipeline parameters (for caching key).
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary of pipeline parameters |
registry ¶
Registry for preprocessing pipelines.
Classes¶
PipelineRegistry ¶
Registry for pipeline classes.
Example usage
@PipelineRegistry.register("summary_set") ... class SummarySetPipeline(BasePipeline): ... pass
pipeline = PipelineRegistry.get("summary_set", normalize=True)
Functions¶
get
classmethod
¶
Get a pipeline instance by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
Registry name of the pipeline |
required |
**kwargs |
Arguments to pass to pipeline constructor |
{}
|
Returns:
| Type | Description |
|---|---|
BasePipeline
|
Pipeline instance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pipeline name is not found |
Source code in src/pipelines/registry.py
get_class
classmethod
¶
Get pipeline class by name (without instantiating).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
Registry name of the pipeline |
required |
Returns:
| Type | Description |
|---|---|
Optional[Type[BasePipeline]]
|
Pipeline class or None if not found |
Source code in src/pipelines/registry.py
list_available
classmethod
¶
List all registered pipeline names.
Returns:
| Type | Description |
|---|---|
list
|
List of pipeline names |
register
classmethod
¶
Decorator to register a pipeline class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
Registry name for the pipeline |
required |
Returns:
| Type | Description |
|---|---|
|
Decorator function |
Source code in src/pipelines/registry.py
cache ¶
Cache expensive preprocessing results to disk.
Classes¶
PipelineCache ¶
Cache expensive preprocessing results to disk.
Cache key components: - experiment_id, cell_id, rpt_id (data identity) - pipeline_name + params (transform identity) - version (manual invalidation)
Example usage
cache = PipelineCache() result = cache.get_or_compute( ... experiment_id=5, cell_id='A', rpt_id=3, ... pipeline_name='ica_peaks', ... pipeline_params={'sg_window': 51, 'sg_order': 3}, ... compute_fn=lambda: expensive_ica_computation(voltage, capacity) ... )
Initialize the cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_dir |
Path
|
Directory for cache files |
Path('artifacts/cache')
|
version |
str
|
Cache version (change to invalidate all cached data) |
'v1'
|
Source code in src/pipelines/cache.py
Functions¶
clear ¶
Clear cache (optionally filtered by pipeline name).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_name |
Optional[str]
|
If provided, only clear this pipeline's cache |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of cache entries cleared |
Source code in src/pipelines/cache.py
get ¶
get(experiment_id: int, cell_id: str, rpt_id: Optional[int], pipeline_name: str, pipeline_params: dict) -> Optional[Any]
Retrieve cached result if exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
experiment_id |
int
|
Experiment ID |
required |
cell_id |
str
|
Cell identifier |
required |
rpt_id |
Optional[int]
|
RPT index |
required |
pipeline_name |
str
|
Pipeline name |
required |
pipeline_params |
dict
|
Pipeline parameters |
required |
Returns:
| Type | Description |
|---|---|
Optional[Any]
|
Cached result or None if not found |
Source code in src/pipelines/cache.py
get_or_compute ¶
get_or_compute(experiment_id: int, cell_id: str, rpt_id: Optional[int], pipeline_name: str, pipeline_params: dict, compute_fn: Callable[[], Any]) -> Any
Return cached result or compute and cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
experiment_id |
int
|
Experiment ID |
required |
cell_id |
str
|
Cell identifier |
required |
rpt_id |
Optional[int]
|
RPT index |
required |
pipeline_name |
str
|
Pipeline name |
required |
pipeline_params |
dict
|
Pipeline parameters |
required |
compute_fn |
Callable[[], Any]
|
Function to compute result if not cached |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Cached or computed result |
Source code in src/pipelines/cache.py
get_size ¶
Get cache size statistics.
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary with num_files and total_bytes |
Source code in src/pipelines/cache.py
get_stats ¶
Return cache hit/miss statistics.
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary with hits, misses, hit_rate |
Source code in src/pipelines/cache.py
set ¶
set(experiment_id: int, cell_id: str, rpt_id: Optional[int], pipeline_name: str, pipeline_params: dict, result: Any) -> None
Store result in cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
experiment_id |
int
|
Experiment ID |
required |
cell_id |
str
|
Cell identifier |
required |
rpt_id |
Optional[int]
|
RPT index |
required |
pipeline_name |
str
|
Pipeline name |
required |
pipeline_params |
dict
|
Pipeline parameters |
required |
result |
Any
|
Result to cache |
required |
Source code in src/pipelines/cache.py
Functions¶
get_cache ¶
Get or create global cache instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_dir |
str
|
Cache directory |
'artifacts/cache'
|
version |
str
|
Cache version |
'v1'
|
Returns:
| Type | Description |
|---|---|
PipelineCache
|
PipelineCache instance |
Source code in src/pipelines/cache.py
summary_set ¶
Summary Set Pipeline - Features from Performance Summary data.
Classes¶
SummarySetPipeline ¶
SummarySetPipeline(include_arrhenius: bool = True, arrhenius_Ea: float = 50000.0, normalize: bool = True)
Bases: BasePipeline
Pipeline using Performance Summary / Summary per Set data.
Features per set: - Cumulative throughput (Ah) - Average temperature (K, plus Arrhenius transform) - Resistance measurements - Cycle count - Previous SOH (if available)
Target: SOH or Cell Capacity
Example usage
pipeline = SummarySetPipeline(include_arrhenius=True) samples = pipeline.fit_transform({'df': df})
Initialize the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_arrhenius |
bool
|
Include Arrhenius temperature features |
True
|
arrhenius_Ea |
float
|
Activation energy for Arrhenius (J/mol) |
50000.0
|
normalize |
bool
|
Whether to apply StandardScaler normalization |
True
|
Source code in src/pipelines/summary_set.py
Functions¶
fit ¶
Fit scaler on training data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary with 'df' key containing DataFrame |
required |
Returns:
| Type | Description |
|---|---|
SummarySetPipeline
|
self |
Source code in src/pipelines/summary_set.py
get_feature_names ¶
get_params ¶
transform ¶
Transform data into Sample objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary with 'df' key containing DataFrame |
required |
Returns:
| Type | Description |
|---|---|
List[Sample]
|
List of Sample objects |
Source code in src/pipelines/summary_set.py
ica_peaks ¶
ICA Peaks Pipeline - Extract dQ/dV features from voltage curves.
Classes¶
ICAPeaksPipeline ¶
ICAPeaksPipeline(sg_window: int = 51, sg_order: int = 3, num_peaks: int = 3, voltage_range: Tuple[float, float] = (3.0, 4.2), resample_points: int = 500, normalize: bool = True, use_cache: bool = True)
Bases: BasePipeline
Pipeline extracting ICA (dQ/dV) peak features from voltage curves.
For each RPT: 1. Load 0.1C discharge curve 2. Compute dQ/dV with Savitzky-Golay smoothing 3. Extract peak positions, heights, widths, areas 4. Output fixed-length feature vector
These features are highly diagnostic for degradation mechanisms: - Peak shifts → Loss of Lithium Inventory (LLI) - Peak height changes → Loss of Active Material (LAM) - Peak width changes → Kinetic degradation / impedance rise
Example usage
pipeline = ICAPeaksPipeline(sg_window=51, num_peaks=3) samples = pipeline.fit_transform({'curves': curves, 'targets': targets})
Initialize the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sg_window |
int
|
Savitzky-Golay filter window size (must be odd) |
51
|
sg_order |
int
|
Savitzky-Golay polynomial order |
3
|
num_peaks |
int
|
Number of peaks to extract features for |
3
|
voltage_range |
Tuple[float, float]
|
Voltage range for ICA analysis |
(3.0, 4.2)
|
resample_points |
int
|
Number of points to resample curves to |
500
|
normalize |
bool
|
Whether to apply StandardScaler |
True
|
use_cache |
bool
|
Whether to cache computed features |
True
|
Source code in src/pipelines/ica_peaks.py
Functions¶
compute_ica ¶
Compute ICA curve (dQ/dV) with smoothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
voltage |
ndarray
|
Voltage array |
required |
capacity |
ndarray
|
Capacity array |
required |
Returns:
| Type | Description |
|---|---|
Tuple[ndarray, ndarray]
|
Tuple of (voltage_midpoints, dQ/dV values) |
Source code in src/pipelines/ica_peaks.py
extract_peak_features ¶
Extract fixed-length feature vector from ICA curve.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
V |
ndarray
|
Voltage midpoints |
required |
dQdV |
ndarray
|
dQ/dV values |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
Feature vector |
Source code in src/pipelines/ica_peaks.py
fit ¶
Fit scaler on training data features.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary with 'curves' key containing list of (voltage, capacity, meta) tuples |
required |
Returns:
| Type | Description |
|---|---|
ICAPeaksPipeline
|
self |
Source code in src/pipelines/ica_peaks.py
get_feature_names ¶
get_params ¶
Return pipeline parameters for caching.
Source code in src/pipelines/ica_peaks.py
transform ¶
Transform curves into Sample objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary with 'curves' and optionally 'targets' keys |
required |
Returns:
| Type | Description |
|---|---|
List[Sample]
|
List of Sample objects |
Source code in src/pipelines/ica_peaks.py
Functions¶
latent_ode_seq ¶
Latent ODE Sequence Pipeline - Sequences for Neural ODE models.
Classes¶
LatentODESequencePipeline ¶
LatentODESequencePipeline(time_unit: str = 'days', include_ica: bool = False, max_seq_len: Optional[int] = None, normalize: bool = True)
Bases: BasePipeline
Pipeline creating sequences for Neural ODE / Latent ODE models.
Key insight: Use ageing SET index as timeline (not cycles). - t = cumulative days or throughput (continuous time) - x_t = features at each set (summary + optionally ICA peaks) - Provides explicit time deltas for ODE integration
This is more appropriate than cycle-level for ODEs because: 1. Sets have meaningful time gaps (days/weeks) 2. RPT measurements provide consistent feature snapshots 3. Fewer points = faster ODE integration
Example usage
pipeline = LatentODESequencePipeline(time_unit="days") samples = pipeline.fit_transform({'df': df})
Initialize the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
time_unit |
str
|
Time unit for ODE integration ("days" or "throughput_Ah") |
'days'
|
include_ica |
bool
|
Whether to include ICA peak features |
False
|
max_seq_len |
Optional[int]
|
Maximum sequence length (truncate if longer) |
None
|
normalize |
bool
|
Whether to apply StandardScaler |
True
|
Source code in src/pipelines/latent_ode_seq.py
Functions¶
fit ¶
Fit scaler on training sequences.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary with 'df' key containing DataFrame |
required |
Returns:
| Type | Description |
|---|---|
LatentODESequencePipeline
|
self |
Source code in src/pipelines/latent_ode_seq.py
get_feature_names ¶
get_params ¶
transform ¶
Transform data into sequence samples (one per cell).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Dict[str, Any]
|
Dictionary with 'df' key containing DataFrame |
required |
Returns:
| Type | Description |
|---|---|
List[Sample]
|
List of Sample objects (one per cell) |