# Customizing the Pipeline

This guide covers how to customize the pipeline beyond basic recipe usage:
programmatic config modification, custom stages, custom dataloaders, IO
bindings, caching/resume, and observers.

## Programmatic Configuration

Still works with the pipeline, but modifies stage configurations
programmatically. Built on Level 2 (see [Getting Started with Pipeline](https://docs.qualcomm.com/doc/80-87189-2/topic/pipeline_getting_started.html)) with
additional programmatic control.

from qairt.experimental.pipeline.torch.llm.pipeline import LLMPipeline
    
    pipe = LLMPipeline.from_pretrained(
        "meta-llama/Llama-3.2-3B-Instruct",
        recipe="llama32_recipe.yaml",
    )
    
    # Modify stage configs programmatically before construct()
    pipe.stages.model_loader.model_config_overrides["input_tokens_per_inference"] = 2073
    pipe.stages.quantization.technique_kwargs["seqmse"]["num_batches"] = 30
    
    pipe.construct()
    
    # Evaluate model quality after quantization
    metrics = pipe.evaluate()
    print(f"Perplexity: {metrics}")
    
    result = pipe.generate("Explain transformers briefly.", device=device)
    result.print()
    Copy to clipboard

This level also supports:

- Injecting custom dataloaders for calibration
- Building the recipe as a Python dict instead of YAML
- Using `LLMPipeline.load()` to resume from cache
- Adding custom stages via `@register_stage`

## Custom Stages

Register custom stages using the `@register_stage` decorator:

from qairt.experimental.pipeline.torch.common.bases.stage_registry import register_stage
    from qairt.experimental.pipeline.torch.common.bases.stage import (
        Stage, StageInput, StageConfig, StageOutput,
    )
    
    class MyInput(StageInput):
        model: Any
        tokenizer: Any = None
    
    class MyConfig(StageConfig):
        my_parameter: str = "default"
    
    class MyOutput(StageOutput):
        model: Any
        metrics: dict = {}
    
    @register_stage("my_custom_stage")
    class MyCustomStage(Stage[MyInput, MyConfig, MyOutput]):
        name = "my_custom_stage"
        Input = MyInput
        Config = MyConfig
        Output = MyOutput
    
        def _execute(self, input: MyInput, config: MyConfig) -> MyOutput:
            # Your stage logic here
            return MyOutput(model=input.model, metrics={"custom": 1.0})
    Copy to clipboard

### Stage Contract

Every stage must:

1. Define `Input`, `Config`, and `Output` classes (Pydantic models)
2. Implement `_execute(input, config) -> output`
3. Optionally implement `_pre_hook` and `_post_hook` for setup/teardown
4. Optionally declare dependencies via `get_stage_dependency()`

### Stage Dependencies

Declare ordering constraints:

from qairt.experimental.pipeline.torch.common.bases.stage import StageDependencies
    
    @classmethod
    def get_stage_dependency(cls) -> StageDependencies:
        return StageDependencies(
            requires=[ModelLoadingStage],    # Must come after
            conflict=[OtherStage],           # Cannot coexist
            optional=[OptionalUpstream],     # If present, must precede
        )
    Copy to clipboard

## Custom Dataloaders

Inject custom dataloaders for calibration data:

from torch.utils.data import DataLoader
    from qairt.experimental.pipeline.torch.llm.pipeline import LLMPipeline, LLMPipelineConfig
    
    config = LLMPipelineConfig.from_recipe("recipe.yaml")
    
    # Custom calibration dataloader for quantization stage
    calibration_loader = DataLoader(my_calibration_dataset, batch_size=1)
    config.add_dataloader("quantization", calibration_loader)
    
    pipe = LLMPipeline(config)
    pipe.construct()
    Copy to clipboard

The dataloader is passed to the quantization recipe’s `apply()` method,
replacing the default internal dataloader construction.

## IO Bindings

When stage output field names don’t match the next stage’s input field names,
use IO bindings in the recipe to specify the mapping:

stages:
      my_stage:
        io_bindings:
          - target_field: model      # Field name in next stage's Input
            source_field: optimized_model  # Field name in this stage's Output
    Copy to clipboard

When output and input field names match (the common case), the pipeline
wires them automatically — no bindings needed.

## Caching and Resuming

When `enable_cache: true` is set in the recipe, the pipeline caches
stage outputs to disk. On subsequent runs, stages whose configuration
(and upstream dependencies) haven’t changed are skipped.

### Key-Chain Hashing

Each stage’s cache key is computed from:

1. The stage’s own config (serialized and hashed)
2. The upstream stage’s cache key (forming a chain)

This means any config change invalidates the affected stage *and alldownstream stages*.

### Directory Layout

pipeline_cache_dir/
    ├── .pipeline_state/
    │   ├── manifest.json      # Stage completion records
    │   └── recipe.yaml        # Saved recipe for resume
    ├── model_loader/
    │   └── <artifact files>
    ├── quantization/
    │   └── <artifact files>
    └── genai_builder/
        └── <artifact files>
    Copy to clipboard

### Resuming a Pipeline

Use `LLMPipeline.load()` to resume from a previous run:

from qairt.experimental.pipeline.torch.llm.pipeline import LLMPipeline
    
    # Resume from cache directory
    pipe = LLMPipeline.load("./pipeline_cache_dir")
    pipe.construct()  # Skips stages with valid cached outputs
    Copy to clipboard

The manifest tracks:

- Completed stage names
- Artifact paths for each stage
- Cache keys for invalidation detection
- Config snapshots for change detection

## Observers

The observer pattern allows monitoring stage execution without modifying
stage logic.

### Built-in Profiler Observer

Enable in recipe with `enable_observers: true`:

enable_observers: true
    Copy to clipboard

The `StageProfilerObserver` records:

- Wall-clock time per stage
- Peak memory usage (RAM and GPU)
- Stage start/end timestamps

### Custom Observer Pattern

Create custom observers by subclassing `StageObserver`:

from qairt.experimental.pipeline.torch.common.bases.stage_observer import StageObserver
    
    class MyObserver(StageObserver):
        def on_stage_start(self, stage_name: str, config) -> None:
            print(f"Starting stage: {stage_name}")
    
        def on_stage_end(self, stage_name: str, output, elapsed: float) -> None:
            print(f"Completed {stage_name} in {elapsed:.1f}s")
    
        def on_stage_error(self, stage_name: str, error: Exception) -> None:
            print(f"Error in {stage_name}: {error}")
    Copy to clipboard

## Next Steps

- [Advanced Usage](https://docs.qualcomm.com/doc/80-87189-2/topic/pipeline_expert_usage.html) — For maximum control, bypass the pipeline
and orchestrate building blocks directly
- [Pipeline Configuration](https://docs.qualcomm.com/doc/80-87189-2/topic/pipeline_configuration.html) — Full YAML recipe schema reference

Last Published: Jun 19, 2026

[Previous Topic
prefix\_quant](https://docs.qualcomm.com/bundle/publicresource/80-87189-2/topics/pipeline_quantization_recipes.md) [Next Topic
Advanced Usage](https://docs.qualcomm.com/bundle/publicresource/80-87189-2/topics/pipeline_expert_usage.md)