Skip to content

Pipelines API Reference

Pipelines in BatteryML are responsible for transforming raw time-series or summary data into machine-learning-ready Sample objects. They utilize a hash-based caching mechanism to avoid recomputing expensive features like ICA peaks.

Usage Example

from src.pipelines.ica_peaks import ICAPeaksPipeline

# Data objects should contain 'curves' and 'targets' keys
pipeline = ICAPeaksPipeline(num_peaks=3, sg_window=51)
samples = pipeline.fit_transform(raw_data)

sample

Canonical Sample schema for battery degradation data.

This is the universal format that all pipelines must produce and all models consume.

Classes

Sample dataclass

Sample(meta: Dict[str, Any] = dict(), x: Union[torch.Tensor, Dict[str, torch.Tensor], np.ndarray, None] = None, y: Union[torch.Tensor, np.ndarray, None] = None, mask: Optional[torch.Tensor] = None, t: Optional[torch.Tensor] = None)

Universal format every pipeline must produce.

This contract enables: - Model-agnostic pipelines (swap models without changing data code) - Time-aware models (ODEs use delta_t; LSTMs ignore it) - Clean validation splits (meta contains grouping keys) - Composable features (x can be dict of multiple feature types)

Attributes:

Name Type Description
meta Dict[str, Any]

Metadata for splits, logging, debugging — not fed to model. Required keys: - 'experiment_id': int (1-5) - 'cell_id': str ('A', 'B', ..., 'H') - 'temperature_C': float (10, 25, 40) Optional keys: - 'set_idx': int (ageing set index) - 'rpt_id': int (RPT measurement index) - 'cycle_idx': int (absolute cycle number) - 'timestamp': float (seconds or days since experiment start) - 'delta_t': float (time since previous sample) - 'cumulative_throughput_Ah': float

x Union[Tensor, Dict[str, Tensor], ndarray, None]

Features the model sees. Can be: - Tensor shape (feature_dim,): static features - Tensor shape (seq_len, feature_dim): sequences - Dict: {'summary': tensor, 'ica_peaks': tensor, ...}

y Union[Tensor, ndarray, None]

Target values. Shape (1,) for SOH regression or (n_targets,) for multi-task.

mask Optional[Tensor]

Optional boolean mask for variable-length sequences. Shape (seq_len,).

t Optional[Tensor]

Optional time vector for ODE models. Shape (seq_len,).

Attributes
feature_dim property
feature_dim: int

Get feature dimension.

seq_len property
seq_len: Optional[int]

Get sequence length (None if not a sequence).

Functions
clone
clone() -> Sample

Create a deep copy of the sample.

Source code in src/pipelines/sample.py
def clone(self) -> 'Sample':
    """Create a deep copy of the sample."""
    import copy
    return copy.deepcopy(self)
to_device
to_device(device: str) -> Sample

Move tensors to specified device (in-place).

Source code in src/pipelines/sample.py
def to_device(self, device: str) -> 'Sample':
    """Move tensors to specified device (in-place)."""
    if isinstance(self.x, torch.Tensor):
        self.x = self.x.to(device)
    elif isinstance(self.x, dict):
        self.x = {k: v.to(device) for k, v in self.x.items()}

    if self.y is not None and isinstance(self.y, torch.Tensor):
        self.y = self.y.to(device)

    if self.t is not None and isinstance(self.t, torch.Tensor):
        self.t = self.t.to(device)

    if self.mask is not None and isinstance(self.mask, torch.Tensor):
        self.mask = self.mask.to(device)

    return self
to_tensor
to_tensor() -> Sample

Convert numpy arrays to torch tensors (in-place).

Source code in src/pipelines/sample.py
def to_tensor(self) -> 'Sample':
    """Convert numpy arrays to torch tensors (in-place)."""
    if isinstance(self.x, np.ndarray):
        self.x = torch.from_numpy(self.x).float()
    elif isinstance(self.x, dict):
        self.x = {
            k: torch.from_numpy(v).float() if isinstance(v, np.ndarray) else v 
            for k, v in self.x.items()
        }

    if isinstance(self.y, np.ndarray):
        self.y = torch.from_numpy(self.y).float()

    if isinstance(self.t, np.ndarray):
        self.t = torch.from_numpy(self.t).float()

    if isinstance(self.mask, np.ndarray):
        self.mask = torch.from_numpy(self.mask).bool()

    return self

base

Base class for preprocessing pipelines.

Classes

BasePipeline

Bases: ABC

Abstract base class for all preprocessing pipelines.

All pipelines must: 1. Accept raw data (DataFrames, arrays) 2. Output Sample objects with consistent structure 3. Support fit/transform pattern for scalers 4. Be cacheable (expensive computations)

Example usage

pipeline = SummarySetPipeline(include_arrhenius=True) train_samples = pipeline.fit_transform({'df': train_df}) test_samples = pipeline.transform({'df': test_df})

Functions
fit abstractmethod
fit(data: Dict[str, Any]) -> BasePipeline

Fit any scalers/normalizers on training data.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary containing raw data (e.g., {'df': DataFrame})

required

Returns:

Name Type Description
self BasePipeline

Fitted pipeline instance

Source code in src/pipelines/base.py
@abstractmethod
def fit(self, data: Dict[str, Any]) -> 'BasePipeline':
    """Fit any scalers/normalizers on training data.

    Args:
        data: Dictionary containing raw data (e.g., {'df': DataFrame})

    Returns:
        self: Fitted pipeline instance
    """
    pass
fit_transform
fit_transform(data: Dict[str, Any]) -> List[Sample]

Fit and transform in one call.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary containing raw data

required

Returns:

Type Description
List[Sample]

List of Sample objects

Source code in src/pipelines/base.py
def fit_transform(self, data: Dict[str, Any]) -> List[Sample]:
    """Fit and transform in one call.

    Args:
        data: Dictionary containing raw data

    Returns:
        List of Sample objects
    """
    return self.fit(data).transform(data)
get_feature_names abstractmethod
get_feature_names() -> List[str]

Return list of feature names (for SHAP/interpretability).

Returns:

Type Description
List[str]

List of feature name strings

Source code in src/pipelines/base.py
@abstractmethod
def get_feature_names(self) -> List[str]:
    """Return list of feature names (for SHAP/interpretability).

    Returns:
        List of feature name strings
    """
    pass
get_params
get_params() -> dict

Return pipeline parameters (for caching key).

Returns:

Type Description
dict

Dictionary of pipeline parameters

Source code in src/pipelines/base.py
def get_params(self) -> dict:
    """Return pipeline parameters (for caching key).

    Returns:
        Dictionary of pipeline parameters
    """
    return {}
transform abstractmethod
transform(data: Dict[str, Any]) -> List[Sample]

Transform raw data into Sample objects.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary containing raw data

required

Returns:

Type Description
List[Sample]

List of Sample objects

Source code in src/pipelines/base.py
@abstractmethod
def transform(self, data: Dict[str, Any]) -> List[Sample]:
    """Transform raw data into Sample objects.

    Args:
        data: Dictionary containing raw data

    Returns:
        List of Sample objects
    """
    pass

registry

Registry for preprocessing pipelines.

Classes

PipelineRegistry

Registry for pipeline classes.

Example usage

@PipelineRegistry.register("summary_set") ... class SummarySetPipeline(BasePipeline): ... pass

pipeline = PipelineRegistry.get("summary_set", normalize=True)

Functions
get classmethod
get(name: str, **kwargs) -> BasePipeline

Get a pipeline instance by name.

Parameters:

Name Type Description Default
name str

Registry name of the pipeline

required
**kwargs

Arguments to pass to pipeline constructor

{}

Returns:

Type Description
BasePipeline

Pipeline instance

Raises:

Type Description
ValueError

If pipeline name is not found

Source code in src/pipelines/registry.py
@classmethod
def get(cls, name: str, **kwargs) -> BasePipeline:
    """Get a pipeline instance by name.

    Args:
        name: Registry name of the pipeline
        **kwargs: Arguments to pass to pipeline constructor

    Returns:
        Pipeline instance

    Raises:
        ValueError: If pipeline name is not found
    """
    if name not in cls._pipelines:
        available = list(cls._pipelines.keys())
        raise ValueError(f"Unknown pipeline: {name}. Available: {available}")
    return cls._pipelines[name](**kwargs)
get_class classmethod
get_class(name: str) -> Optional[Type[BasePipeline]]

Get pipeline class by name (without instantiating).

Parameters:

Name Type Description Default
name str

Registry name of the pipeline

required

Returns:

Type Description
Optional[Type[BasePipeline]]

Pipeline class or None if not found

Source code in src/pipelines/registry.py
@classmethod
def get_class(cls, name: str) -> Optional[Type[BasePipeline]]:
    """Get pipeline class by name (without instantiating).

    Args:
        name: Registry name of the pipeline

    Returns:
        Pipeline class or None if not found
    """
    return cls._pipelines.get(name)
list_available classmethod
list_available() -> list

List all registered pipeline names.

Returns:

Type Description
list

List of pipeline names

Source code in src/pipelines/registry.py
@classmethod
def list_available(cls) -> list:
    """List all registered pipeline names.

    Returns:
        List of pipeline names
    """
    return list(cls._pipelines.keys())
register classmethod
register(name: str)

Decorator to register a pipeline class.

Parameters:

Name Type Description Default
name str

Registry name for the pipeline

required

Returns:

Type Description

Decorator function

Source code in src/pipelines/registry.py
@classmethod
def register(cls, name: str):
    """Decorator to register a pipeline class.

    Args:
        name: Registry name for the pipeline

    Returns:
        Decorator function
    """
    def decorator(pipeline_class: Type[BasePipeline]):
        cls._pipelines[name] = pipeline_class
        pipeline_class.name = name
        return pipeline_class
    return decorator

cache

Cache expensive preprocessing results to disk.

Classes

PipelineCache

PipelineCache(cache_dir: Path = Path('artifacts/cache'), version: str = 'v1')

Cache expensive preprocessing results to disk.

Cache key components: - experiment_id, cell_id, rpt_id (data identity) - pipeline_name + params (transform identity) - version (manual invalidation)

Example usage

cache = PipelineCache() result = cache.get_or_compute( ... experiment_id=5, cell_id='A', rpt_id=3, ... pipeline_name='ica_peaks', ... pipeline_params={'sg_window': 51, 'sg_order': 3}, ... compute_fn=lambda: expensive_ica_computation(voltage, capacity) ... )

Initialize the cache.

Parameters:

Name Type Description Default
cache_dir Path

Directory for cache files

Path('artifacts/cache')
version str

Cache version (change to invalidate all cached data)

'v1'
Source code in src/pipelines/cache.py
def __init__(self, cache_dir: Path = Path("artifacts/cache"), version: str = "v1"):
    """Initialize the cache.

    Args:
        cache_dir: Directory for cache files
        version: Cache version (change to invalidate all cached data)
    """
    self.cache_dir = Path(cache_dir)
    self.cache_dir.mkdir(parents=True, exist_ok=True)
    self.version = version
    self.stats = {'hits': 0, 'misses': 0}
Functions
clear
clear(pipeline_name: Optional[str] = None) -> int

Clear cache (optionally filtered by pipeline name).

Parameters:

Name Type Description Default
pipeline_name Optional[str]

If provided, only clear this pipeline's cache

None

Returns:

Type Description
int

Number of cache entries cleared

Source code in src/pipelines/cache.py
def clear(self, pipeline_name: Optional[str] = None) -> int:
    """Clear cache (optionally filtered by pipeline name).

    Args:
        pipeline_name: If provided, only clear this pipeline's cache

    Returns:
        Number of cache entries cleared
    """
    count = 0
    for meta_path in self.cache_dir.rglob("*.meta.json"):
        if pipeline_name:
            try:
                with open(meta_path) as f:
                    meta = json.load(f)
                if meta.get('pipeline_name') != pipeline_name:
                    continue
            except Exception:
                continue

        # Remove both .pkl and .meta.json
        cache_path = meta_path.with_suffix('').with_suffix('.pkl')
        cache_path.unlink(missing_ok=True)
        meta_path.unlink()
        count += 1

    logger.info(f"Cleared {count} cache entries")
    return count
get
get(experiment_id: int, cell_id: str, rpt_id: Optional[int], pipeline_name: str, pipeline_params: dict) -> Optional[Any]

Retrieve cached result if exists.

Parameters:

Name Type Description Default
experiment_id int

Experiment ID

required
cell_id str

Cell identifier

required
rpt_id Optional[int]

RPT index

required
pipeline_name str

Pipeline name

required
pipeline_params dict

Pipeline parameters

required

Returns:

Type Description
Optional[Any]

Cached result or None if not found

Source code in src/pipelines/cache.py
def get(self,
        experiment_id: int,
        cell_id: str,
        rpt_id: Optional[int],
        pipeline_name: str,
        pipeline_params: dict) -> Optional[Any]:
    """Retrieve cached result if exists.

    Args:
        experiment_id: Experiment ID
        cell_id: Cell identifier
        rpt_id: RPT index
        pipeline_name: Pipeline name
        pipeline_params: Pipeline parameters

    Returns:
        Cached result or None if not found
    """
    key = self._make_key(experiment_id, cell_id, rpt_id, pipeline_name, pipeline_params)
    cache_path = self._get_cache_path(key)

    if cache_path.exists():
        self.stats['hits'] += 1
        logger.debug(f"Cache HIT: {pipeline_name} for cell {cell_id} RPT {rpt_id}")
        try:
            with open(cache_path, 'rb') as f:
                return pickle.load(f)
        except Exception as e:
            logger.warning(f"Failed to load cache: {e}")
            return None

    self.stats['misses'] += 1
    return None
get_or_compute
get_or_compute(experiment_id: int, cell_id: str, rpt_id: Optional[int], pipeline_name: str, pipeline_params: dict, compute_fn: Callable[[], Any]) -> Any

Return cached result or compute and cache.

Parameters:

Name Type Description Default
experiment_id int

Experiment ID

required
cell_id str

Cell identifier

required
rpt_id Optional[int]

RPT index

required
pipeline_name str

Pipeline name

required
pipeline_params dict

Pipeline parameters

required
compute_fn Callable[[], Any]

Function to compute result if not cached

required

Returns:

Type Description
Any

Cached or computed result

Source code in src/pipelines/cache.py
def get_or_compute(self,
                   experiment_id: int,
                   cell_id: str,
                   rpt_id: Optional[int],
                   pipeline_name: str,
                   pipeline_params: dict,
                   compute_fn: Callable[[], Any]) -> Any:
    """Return cached result or compute and cache.

    Args:
        experiment_id: Experiment ID
        cell_id: Cell identifier
        rpt_id: RPT index
        pipeline_name: Pipeline name
        pipeline_params: Pipeline parameters
        compute_fn: Function to compute result if not cached

    Returns:
        Cached or computed result
    """
    # Try cache first
    result = self.get(experiment_id, cell_id, rpt_id, pipeline_name, pipeline_params)

    if result is not None:
        return result

    # Compute (expensive)
    logger.info(f"Computing {pipeline_name} for cell {cell_id} RPT {rpt_id}...")
    result = compute_fn()

    # Cache for next time
    self.set(experiment_id, cell_id, rpt_id, pipeline_name, pipeline_params, result)

    return result
get_size
get_size() -> dict

Get cache size statistics.

Returns:

Type Description
dict

Dictionary with num_files and total_bytes

Source code in src/pipelines/cache.py
def get_size(self) -> dict:
    """Get cache size statistics.

    Returns:
        Dictionary with num_files and total_bytes
    """
    pkl_files = list(self.cache_dir.rglob("*.pkl"))
    total_bytes = sum(f.stat().st_size for f in pkl_files)

    return {
        'num_files': len(pkl_files),
        'total_bytes': total_bytes,
        'total_mb': total_bytes / (1024 * 1024),
    }
get_stats
get_stats() -> dict

Return cache hit/miss statistics.

Returns:

Type Description
dict

Dictionary with hits, misses, hit_rate

Source code in src/pipelines/cache.py
def get_stats(self) -> dict:
    """Return cache hit/miss statistics.

    Returns:
        Dictionary with hits, misses, hit_rate
    """
    total = self.stats['hits'] + self.stats['misses']
    hit_rate = self.stats['hits'] / total if total > 0 else 0
    return {**self.stats, 'hit_rate': hit_rate, 'total': total}
set
set(experiment_id: int, cell_id: str, rpt_id: Optional[int], pipeline_name: str, pipeline_params: dict, result: Any) -> None

Store result in cache.

Parameters:

Name Type Description Default
experiment_id int

Experiment ID

required
cell_id str

Cell identifier

required
rpt_id Optional[int]

RPT index

required
pipeline_name str

Pipeline name

required
pipeline_params dict

Pipeline parameters

required
result Any

Result to cache

required
Source code in src/pipelines/cache.py
def set(self,
        experiment_id: int,
        cell_id: str,
        rpt_id: Optional[int],
        pipeline_name: str,
        pipeline_params: dict,
        result: Any) -> None:
    """Store result in cache.

    Args:
        experiment_id: Experiment ID
        cell_id: Cell identifier
        rpt_id: RPT index
        pipeline_name: Pipeline name
        pipeline_params: Pipeline parameters
        result: Result to cache
    """
    key = self._make_key(experiment_id, cell_id, rpt_id, pipeline_name, pipeline_params)
    cache_path = self._get_cache_path(key)
    meta_path = self._get_meta_path(key)

    # Save result
    with open(cache_path, 'wb') as f:
        pickle.dump(result, f)

    # Save metadata for debugging
    meta = {
        'experiment_id': experiment_id,
        'cell_id': cell_id,
        'rpt_id': rpt_id,
        'pipeline_name': pipeline_name,
        'pipeline_params': pipeline_params,
        'cached_at': datetime.now().isoformat(),
        'version': self.version,
    }
    with open(meta_path, 'w') as f:
        json.dump(meta, f, indent=2, default=str)

    logger.debug(f"Cache SET: {pipeline_name} for cell {cell_id} RPT {rpt_id}")

Functions

get_cache

get_cache(cache_dir: str = 'artifacts/cache', version: str = 'v1') -> PipelineCache

Get or create global cache instance.

Parameters:

Name Type Description Default
cache_dir str

Cache directory

'artifacts/cache'
version str

Cache version

'v1'

Returns:

Type Description
PipelineCache

PipelineCache instance

Source code in src/pipelines/cache.py
def get_cache(cache_dir: str = "artifacts/cache", version: str = "v1") -> PipelineCache:
    """Get or create global cache instance.

    Args:
        cache_dir: Cache directory
        version: Cache version

    Returns:
        PipelineCache instance
    """
    global _cache
    if _cache is None:
        _cache = PipelineCache(Path(cache_dir), version)
    return _cache

reset_cache

reset_cache() -> None

Reset global cache instance.

Source code in src/pipelines/cache.py
def reset_cache() -> None:
    """Reset global cache instance."""
    global _cache
    _cache = None

summary_set

Summary Set Pipeline - Features from Performance Summary data.

Classes

SummarySetPipeline

SummarySetPipeline(include_arrhenius: bool = True, arrhenius_Ea: float = 50000.0, normalize: bool = True)

Bases: BasePipeline

Pipeline using Performance Summary / Summary per Set data.

Features per set: - Cumulative throughput (Ah) - Average temperature (K, plus Arrhenius transform) - Resistance measurements - Cycle count - Previous SOH (if available)

Target: SOH or Cell Capacity

Example usage

pipeline = SummarySetPipeline(include_arrhenius=True) samples = pipeline.fit_transform({'df': df})

Initialize the pipeline.

Parameters:

Name Type Description Default
include_arrhenius bool

Include Arrhenius temperature features

True
arrhenius_Ea float

Activation energy for Arrhenius (J/mol)

50000.0
normalize bool

Whether to apply StandardScaler normalization

True
Source code in src/pipelines/summary_set.py
def __init__(self, 
             include_arrhenius: bool = True,
             arrhenius_Ea: float = 50000.0,
             normalize: bool = True):
    """Initialize the pipeline.

    Args:
        include_arrhenius: Include Arrhenius temperature features
        arrhenius_Ea: Activation energy for Arrhenius (J/mol)
        normalize: Whether to apply StandardScaler normalization
    """
    self.include_arrhenius = include_arrhenius
    self.arrhenius_Ea = arrhenius_Ea
    self.normalize = normalize
    self.scaler: Optional[StandardScaler] = None
    self.feature_names_: List[str] = []
Functions
fit
fit(data: Dict[str, Any]) -> SummarySetPipeline

Fit scaler on training data.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with 'df' key containing DataFrame

required

Returns:

Type Description
SummarySetPipeline

self

Source code in src/pipelines/summary_set.py
def fit(self, data: Dict[str, Any]) -> 'SummarySetPipeline':
    """Fit scaler on training data.

    Args:
        data: Dictionary with 'df' key containing DataFrame

    Returns:
        self
    """
    df = data['df']

    # Build feature names
    self.feature_names_ = list(self.FEATURE_COLS) + ['temp_K']
    if self.include_arrhenius:
        self.feature_names_.extend(['arrhenius', 'inv_temp'])

    # Extract all features for scaling
    X = []
    for idx, row in df.iterrows():
        temp_C = row.get('temperature_C', 25)
        X.append(self._extract_features(row, temp_C))
    X = np.vstack(X)

    if self.normalize:
        self.scaler = StandardScaler()
        self.scaler.fit(X)

    return self
get_feature_names
get_feature_names() -> List[str]

Return list of feature names.

Source code in src/pipelines/summary_set.py
def get_feature_names(self) -> List[str]:
    """Return list of feature names."""
    return self.feature_names_
get_params
get_params() -> dict

Return pipeline parameters for caching.

Source code in src/pipelines/summary_set.py
def get_params(self) -> dict:
    """Return pipeline parameters for caching."""
    return {
        'include_arrhenius': self.include_arrhenius,
        'arrhenius_Ea': self.arrhenius_Ea,
        'normalize': self.normalize,
    }
transform
transform(data: Dict[str, Any]) -> List[Sample]

Transform data into Sample objects.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with 'df' key containing DataFrame

required

Returns:

Type Description
List[Sample]

List of Sample objects

Source code in src/pipelines/summary_set.py
def transform(self, data: Dict[str, Any]) -> List[Sample]:
    """Transform data into Sample objects.

    Args:
        data: Dictionary with 'df' key containing DataFrame

    Returns:
        List of Sample objects
    """
    df = data['df']
    samples = []

    # Pre-compute initial capacity for each cell (first capacity value)
    initial_capacities = {}
    for cell_id in df['cell_id'].unique():
        cell_df = df[df['cell_id'] == cell_id].sort_index()
        target_col = self.TARGET_COL if self.TARGET_COL in cell_df.columns else 'SoH'

        if target_col in cell_df.columns:
            # Get first capacity value
            first_capacity = cell_df[target_col].iloc[0]
            if pd.isna(first_capacity):
                initial_capacities[cell_id] = None
            else:
                # Convert mAh to Ah if needed
                if first_capacity > 100:
                    first_capacity = first_capacity / 1000.0
                initial_capacities[cell_id] = first_capacity
        else:
            initial_capacities[cell_id] = None

    for idx, row in df.iterrows():
        temp_C = row.get('temperature_C', 25)

        # Extract features
        x = self._extract_features(row, temp_C)
        if self.normalize and self.scaler:
            x = self.scaler.transform(x.reshape(1, -1)).flatten()

        # Extract target and normalize to SOH
        cell_id = str(row.get('cell_id', 'unknown'))
        y_raw = row.get(self.TARGET_COL, row.get('SoH', 1.0))

        if pd.isna(y_raw):
            y = 1.0
        else:
            # Convert mAh to Ah if needed
            if y_raw > 100:
                y_raw = y_raw / 1000.0

            # Calculate SOH: current capacity / initial capacity
            initial_capacity = initial_capacities.get(cell_id)
            if initial_capacity and initial_capacity > 0:
                y = y_raw / initial_capacity
            else:
                y = 1.0

        # Build sample
        sample = Sample(
            meta={
                'experiment_id': int(row.get('experiment_id', 5)),
                'cell_id': cell_id,
                'temperature_C': float(temp_C),
                'set_idx': int(idx) if isinstance(idx, (int, np.integer)) else int(row.get('set_idx', 0)),
            },
            x=x,
            y=np.array([y], dtype=np.float32),
        )
        samples.append(sample)

    return samples

ica_peaks

ICA Peaks Pipeline - Extract dQ/dV features from voltage curves.

Classes

ICAPeaksPipeline

ICAPeaksPipeline(sg_window: int = 51, sg_order: int = 3, num_peaks: int = 3, voltage_range: Tuple[float, float] = (3.0, 4.2), resample_points: int = 500, normalize: bool = True, use_cache: bool = True)

Bases: BasePipeline

Pipeline extracting ICA (dQ/dV) peak features from voltage curves.

For each RPT: 1. Load 0.1C discharge curve 2. Compute dQ/dV with Savitzky-Golay smoothing 3. Extract peak positions, heights, widths, areas 4. Output fixed-length feature vector

These features are highly diagnostic for degradation mechanisms: - Peak shifts → Loss of Lithium Inventory (LLI) - Peak height changes → Loss of Active Material (LAM) - Peak width changes → Kinetic degradation / impedance rise

Example usage

pipeline = ICAPeaksPipeline(sg_window=51, num_peaks=3) samples = pipeline.fit_transform({'curves': curves, 'targets': targets})

Initialize the pipeline.

Parameters:

Name Type Description Default
sg_window int

Savitzky-Golay filter window size (must be odd)

51
sg_order int

Savitzky-Golay polynomial order

3
num_peaks int

Number of peaks to extract features for

3
voltage_range Tuple[float, float]

Voltage range for ICA analysis

(3.0, 4.2)
resample_points int

Number of points to resample curves to

500
normalize bool

Whether to apply StandardScaler

True
use_cache bool

Whether to cache computed features

True
Source code in src/pipelines/ica_peaks.py
def __init__(self,
             sg_window: int = 51,
             sg_order: int = 3,
             num_peaks: int = 3,
             voltage_range: Tuple[float, float] = (3.0, 4.2),
             resample_points: int = 500,
             normalize: bool = True,
             use_cache: bool = True):
    """Initialize the pipeline.

    Args:
        sg_window: Savitzky-Golay filter window size (must be odd)
        sg_order: Savitzky-Golay polynomial order
        num_peaks: Number of peaks to extract features for
        voltage_range: Voltage range for ICA analysis
        resample_points: Number of points to resample curves to
        normalize: Whether to apply StandardScaler
        use_cache: Whether to cache computed features
    """
    self.sg_window = sg_window
    self.sg_order = sg_order
    self.num_peaks = num_peaks
    self.voltage_range = voltage_range
    self.resample_points = resample_points
    self.normalize = normalize
    self.use_cache = use_cache

    self.scaler: Optional[StandardScaler] = None
    self.feature_names_: List[str] = []
    self._build_feature_names()
Functions
compute_ica
compute_ica(voltage: np.ndarray, capacity: np.ndarray) -> Tuple[np.ndarray, np.ndarray]

Compute ICA curve (dQ/dV) with smoothing.

Parameters:

Name Type Description Default
voltage ndarray

Voltage array

required
capacity ndarray

Capacity array

required

Returns:

Type Description
Tuple[ndarray, ndarray]

Tuple of (voltage_midpoints, dQ/dV values)

Source code in src/pipelines/ica_peaks.py
def compute_ica(self, voltage: np.ndarray, capacity: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """Compute ICA curve (dQ/dV) with smoothing.

    Args:
        voltage: Voltage array
        capacity: Capacity array

    Returns:
        Tuple of (voltage_midpoints, dQ/dV values)
    """
    # Sort by voltage
    sorted_idx = np.argsort(voltage)
    V = voltage[sorted_idx]
    Q = capacity[sorted_idx]

    # Filter to voltage range
    mask = (V >= self.voltage_range[0]) & (V <= self.voltage_range[1])
    V, Q = V[mask], Q[mask]

    if len(V) < self.sg_window:
        return np.array([]), np.array([])

    # Smooth capacity
    Q_smooth = savgol_filter(Q, self.sg_window, self.sg_order)

    # Compute derivative
    dQ = np.diff(Q_smooth)
    dV = np.diff(V)
    dV[dV == 0] = 1e-10
    dQdV = dQ / dV
    V_mid = (V[:-1] + V[1:]) / 2

    # Additional smoothing on dQ/dV
    if len(dQdV) > self.sg_window:
        dQdV = savgol_filter(dQdV, self.sg_window, self.sg_order)

    # Take absolute value as dQ/dV is negative during discharge
    return V_mid, np.abs(dQdV)
extract_peak_features
extract_peak_features(V: np.ndarray, dQdV: np.ndarray) -> np.ndarray

Extract fixed-length feature vector from ICA curve.

Parameters:

Name Type Description Default
V ndarray

Voltage midpoints

required
dQdV ndarray

dQ/dV values

required

Returns:

Type Description
ndarray

Feature vector

Source code in src/pipelines/ica_peaks.py
def extract_peak_features(self, V: np.ndarray, dQdV: np.ndarray) -> np.ndarray:
    """Extract fixed-length feature vector from ICA curve.

    Args:
        V: Voltage midpoints
        dQdV: dQ/dV values

    Returns:
        Feature vector
    """
    features = []

    if len(V) == 0 or len(dQdV) == 0:
        # Return zeros if curve is invalid
        return np.zeros(len(self.feature_names_), dtype=np.float32)

    # Find peaks
    peaks, properties = find_peaks(
        dQdV, 
        height=0.01,
        distance=200,
        prominence=0.01
    )

    # Get peak widths
    if len(peaks) > 0:
        try:
            widths, width_heights, left_ips, right_ips = peak_widths(dQdV, peaks, rel_height=0.5)
        except Exception:
            widths = np.zeros(len(peaks))
    else:
        widths = np.array([])

    # Sort peaks by height (descending)
    if len(peaks) > 0:
        sorted_idx = np.argsort(dQdV[peaks])[::-1]
        peaks = peaks[sorted_idx]
        if len(widths) == len(peaks):
            widths = widths[sorted_idx]
        else:
            widths = np.zeros(len(peaks))

    # Extract features for top N peaks
    dV = V[1] - V[0] if len(V) > 1 else 0.001
    for i in range(self.num_peaks):
        if i < len(peaks):
            peak_idx = peaks[i]
            features.extend([
                V[peak_idx],                    # voltage
                dQdV[peak_idx],                 # height
                widths[i] * dV if i < len(widths) else 0,  # width in V
                dQdV[peak_idx] * widths[i] * dV if i < len(widths) else 0,  # approx area
            ])
        else:
            features.extend([0.0, 0.0, 0.0, 0.0])

    # Global features
    features.append(np.trapz(np.maximum(dQdV, 0), V))  # total area
    features.append(float(len(peaks)))                  # num peaks detected
    features.append(V[np.argmax(dQdV)] if len(dQdV) > 0 else 0)  # voltage at max

    return np.array(features, dtype=np.float32)
fit
fit(data: Dict[str, Any]) -> ICAPeaksPipeline

Fit scaler on training data features.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with 'curves' key containing list of (voltage, capacity, meta) tuples

required

Returns:

Type Description
ICAPeaksPipeline

self

Source code in src/pipelines/ica_peaks.py
def fit(self, data: Dict[str, Any]) -> 'ICAPeaksPipeline':
    """Fit scaler on training data features.

    Args:
        data: Dictionary with 'curves' key containing list of (voltage, capacity, meta) tuples

    Returns:
        self
    """
    curves = data['curves']  # List of (voltage, capacity, meta) tuples

    X = []
    for voltage, capacity, meta in curves:
        features = self._process_single_curve(
            meta['experiment_id'],
            meta['cell_id'],
            meta['rpt_id'],
            voltage, capacity
        )
        X.append(features)
    X = np.vstack(X)

    if self.normalize:
        self.scaler = StandardScaler()
        self.scaler.fit(X)

    return self
get_feature_names
get_feature_names() -> List[str]

Return list of feature names.

Source code in src/pipelines/ica_peaks.py
def get_feature_names(self) -> List[str]:
    """Return list of feature names."""
    return self.feature_names_
get_params
get_params() -> dict

Return pipeline parameters for caching.

Source code in src/pipelines/ica_peaks.py
def get_params(self) -> dict:
    """Return pipeline parameters for caching."""
    return {
        'sg_window': self.sg_window,
        'sg_order': self.sg_order,
        'num_peaks': self.num_peaks,
        'voltage_range': self.voltage_range,
        'resample_points': self.resample_points,
    }
transform
transform(data: Dict[str, Any]) -> List[Sample]

Transform curves into Sample objects.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with 'curves' and optionally 'targets' keys

required

Returns:

Type Description
List[Sample]

List of Sample objects

Source code in src/pipelines/ica_peaks.py
def transform(self, data: Dict[str, Any]) -> List[Sample]:
    """Transform curves into Sample objects.

    Args:
        data: Dictionary with 'curves' and optionally 'targets' keys

    Returns:
        List of Sample objects
    """
    curves = data['curves']
    targets = data.get('targets', {})  # {(cell_id, rpt_id): soh_value}

    samples = []
    for voltage, capacity, meta in curves:
        features = self._process_single_curve(
            meta['experiment_id'],
            meta['cell_id'],
            meta['rpt_id'],
            voltage, capacity
        )

        if self.normalize and self.scaler:
            features = self.scaler.transform(features.reshape(1, -1)).flatten()

        # Get target
        key = (meta['cell_id'], meta['rpt_id'])
        y_value = targets.get(key, 0.0)

        sample = Sample(
            meta=meta,
            x=features,
            y=np.array([y_value], dtype=np.float32),
        )
        samples.append(sample)

    return samples

Functions

latent_ode_seq

Latent ODE Sequence Pipeline - Sequences for Neural ODE models.

Classes

LatentODESequencePipeline

LatentODESequencePipeline(time_unit: str = 'days', include_ica: bool = False, max_seq_len: Optional[int] = None, normalize: bool = True)

Bases: BasePipeline

Pipeline creating sequences for Neural ODE / Latent ODE models.

Key insight: Use ageing SET index as timeline (not cycles). - t = cumulative days or throughput (continuous time) - x_t = features at each set (summary + optionally ICA peaks) - Provides explicit time deltas for ODE integration

This is more appropriate than cycle-level for ODEs because: 1. Sets have meaningful time gaps (days/weeks) 2. RPT measurements provide consistent feature snapshots 3. Fewer points = faster ODE integration

Example usage

pipeline = LatentODESequencePipeline(time_unit="days") samples = pipeline.fit_transform({'df': df})

Initialize the pipeline.

Parameters:

Name Type Description Default
time_unit str

Time unit for ODE integration ("days" or "throughput_Ah")

'days'
include_ica bool

Whether to include ICA peak features

False
max_seq_len Optional[int]

Maximum sequence length (truncate if longer)

None
normalize bool

Whether to apply StandardScaler

True
Source code in src/pipelines/latent_ode_seq.py
def __init__(self,
             time_unit: str = "days",  # "days" or "throughput_Ah"
             include_ica: bool = False,
             max_seq_len: Optional[int] = None,
             normalize: bool = True):
    """Initialize the pipeline.

    Args:
        time_unit: Time unit for ODE integration ("days" or "throughput_Ah")
        include_ica: Whether to include ICA peak features
        max_seq_len: Maximum sequence length (truncate if longer)
        normalize: Whether to apply StandardScaler
    """
    self.time_unit = time_unit
    self.include_ica = include_ica
    self.max_seq_len = max_seq_len
    self.normalize = normalize

    self.scaler: Optional[StandardScaler] = None
    self.feature_names_: List[str] = []
Functions
fit
fit(data: Dict[str, Any]) -> LatentODESequencePipeline

Fit scaler on training sequences.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with 'df' key containing DataFrame

required

Returns:

Type Description
LatentODESequencePipeline

self

Source code in src/pipelines/latent_ode_seq.py
def fit(self, data: Dict[str, Any]) -> 'LatentODESequencePipeline':
    """Fit scaler on training sequences.

    Args:
        data: Dictionary with 'df' key containing DataFrame

    Returns:
        self
    """
    df = data['df']

    # Build feature names
    self.feature_names_ = [
        'throughput_Ah', 'R_0.1s', 'R_10s', 'temp_K', 'arrhenius'
    ]

    # Collect all feature vectors for scaling
    all_X = []
    for cell_id in df['cell_id'].unique():
        cell_df = df[df['cell_id'] == cell_id]
        sample = self._build_sequence(
            cell_df, cell_id, 
            cell_df['temperature_C'].iloc[0],
            cell_df['experiment_id'].iloc[0]
        )
        all_X.append(sample.x)

    if all_X:
        all_X = np.vstack(all_X)

        if self.normalize:
            self.scaler = StandardScaler()
            self.scaler.fit(all_X)

    return self
get_feature_names
get_feature_names() -> List[str]

Return list of feature names.

Source code in src/pipelines/latent_ode_seq.py
def get_feature_names(self) -> List[str]:
    """Return list of feature names."""
    return self.feature_names_
get_params
get_params() -> dict

Return pipeline parameters for caching.

Source code in src/pipelines/latent_ode_seq.py
def get_params(self) -> dict:
    """Return pipeline parameters for caching."""
    return {
        'time_unit': self.time_unit,
        'include_ica': self.include_ica,
        'max_seq_len': self.max_seq_len,
    }
transform
transform(data: Dict[str, Any]) -> List[Sample]

Transform data into sequence samples (one per cell).

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with 'df' key containing DataFrame

required

Returns:

Type Description
List[Sample]

List of Sample objects (one per cell)

Source code in src/pipelines/latent_ode_seq.py
def transform(self, data: Dict[str, Any]) -> List[Sample]:
    """Transform data into sequence samples (one per cell).

    Args:
        data: Dictionary with 'df' key containing DataFrame

    Returns:
        List of Sample objects (one per cell)
    """
    df = data['df']
    samples = []

    for cell_id in df['cell_id'].unique():
        cell_df = df[df['cell_id'] == cell_id]
        sample = self._build_sequence(
            cell_df, cell_id,
            cell_df['temperature_C'].iloc[0],
            cell_df['experiment_id'].iloc[0]
        )

        if self.normalize and self.scaler:
            # Normalize each timestep
            sample.x = self.scaler.transform(sample.x).astype(np.float32)

        samples.append(sample.to_tensor())

    return samples