Anatalyst Pipeline Core API¶
This document provides detailed documentation for the core components of the Anatalyst
framework. The core modules form the foundation of the pipeline system, handling tasks such as configuration parsing, data management, module execution, and pipeline orchestration.
Overview¶
Analyst is designed around a modular architecture that allows for flexible composition of analysis workflows. Each analysis is broken down into a series of modules that each perform a specific task with the data. The core components facilitate this by:
- Configuration Management: Reading and validating YAML configuration files that define pipeline steps
- Data Context: Managing shared data between modules and providing checkpointing capabilities
- Module System: Defining the base interface for all analysis modules
- Pipeline Execution: Orchestrating the execution of modules in sequence
Core Components¶
ConfigParser¶
The ConfigParser
class handles the reading and validation of YAML configuration files that define pipeline structure and parameters.
from sc_pipeline.core.config import ConfigParser
parser = ConfigParser()
config = parser.parse("path/to/config.yaml")
Key Methods¶
parse(config_file)
: Parses a YAML configuration file and returns the validated configuration dictionary._validate_config(config)
: Internal method that validates the structure of the configuration._set_defaults(config)
: Internal method that sets default values for optional configuration parameters.
Configuration Structure¶
A basic configuration file should have the following structure:
pipeline:
name: my_pipeline
output_dir: ./output
r_memory_limit_gb: 8 # Optional
figure_defaults: # Optional
width: 8
height: 6
checkpointing: # Optional
enabled: true
modules_to_checkpoint: all # or a list of module names
max_checkpoints: 5
modules:
- name: module1
type: ModuleType
params:
param1: value1
param2: value2
- name: module2
type: AnotherModuleType
params:
param1: value1
Scope¶
In order to provide module level access to the pipeline's configuration, the executor adds the entire parsed configuration to the DataContext
shortly after its instantiation before any modules are run. This can be accessed from inside any module using data_context.get('CONFIG')
DataContext¶
The DataContext
class provides a shared storage space for data passed between modules in the pipeline. It also offers checkpointing capabilities to save and restore pipeline state. Anything handled inside of a module that is not explicitly written to a file space or to the DataContext
will be unavailable
from sc_pipeline.core.data_context import DataContext
# Create a data context with checkpointing enabled
data_context = DataContext(checkpoint_dir="./checkpoints", max_checkpoints=3)
# Store and retrieve data
context.set("key", value)
value = context.get("key")
# Save and load checkpoints
context.save_checkpoint("after_module1")
context.load_checkpoint("after_module1")
Figure Generation¶
To better facilitate record keeping and report generation at the end of the workflow, the DataContext
class has a built in add_figure
method. This takes the name of the module generating the figure, and optional title, description, caption, and image_path arguments. This is designed to be compatible with the ReportGenerator
module, which will essentially string together an html document of these elements. The save_figure
method of the AnalysisModule
class returns the path of the saved image, which can then be passed right to the DataContext.add_figure()
method if you want to include the generated figure.
class MyModule(AnalysisModule):
def run(self, data_context: DataContext):
title = "My Figure"
desc = "A description of my figure"
caption = "A caption for my figure"
fig = sc.pl.embedding( # Returns matplot figure (from scanpy)
adata,
basis=pca_key,
color=None,
return_fig=True,
title=f"My Figure"
)
img_path = self.save_figure(data_context, self.name, fig)
data_context.add_figure(
module_name= self.name,
title= title,
description= desc,
caption=caption,
image_path= img_path
)
return
Key Methods¶
set(key, value)
: Store data by keyget(key, default=None)
: Retrieve data by key, with an optional default value__contains__(key)
: Check if a key exists (key in context
)keys()
: Get all available data keyssave_checkpoint(checkpoint_name)
: Save the current state to a checkpoint file (.pkl file of the DataContext itself)load_checkpoint(checkpoint_name)
: Load data from a checkpoint fileadd_figure(module_name, title=None, description=None, image_path=None, caption=None)
: Add a figure to be included in reports
AnalysisModule¶
The AnalysisModule
class is the base class for all analysis modules in the pipeline. It defines the interface that modules must implement and provides common functionality.
from sc_pipeline.core.module import AnalysisModule
class MyModule(AnalysisModule):
"""Custom module implementation."""
PARAMETER_SCHEMA = {
'param1': {
'type': str,
'required': True,
'description': 'Description of parameter 1'
},
'param2': {
'type': int,
'default': 10,
'description': 'Description of parameter 2'
}
}
def __init__(self, name, params):
super().__init__(name, params)
self.required_inputs = ["input1", "input2"]
self.outputs = ["output1"]
def run(self, data_context):
# Implementation of module functionality
# Access parameters with self.params.get('param_name', default_value)
# Access inputs with data_context.get('input_name')
# Store outputs with data_context.set('output_name', value)
return True # Return True if successful, False otherwise
Key Methods¶
__init__(name, params)
: Initialize the module with a name and parametersrun(data_context)
: Execute the module's analysis (must be implemented by subclasses)validate_inputs(data_context)
: Check if all required inputs are availablevalidate_outputs(data_context)
: Check if all expected outputs were created_validate_params(provided_params)
: Validate parameters against schema and apply defaultsget_metadata()
: Return metadata about the modulesave_figure(data_context, module_name, fig, figsize=None, name=None, output_dir=None, dpi=300)
: Save a matplotlib figure
Parameter Schema Definition¶
Module parameters can be defined using the PARAMETER_SCHEMA
class variable. This schema is used to validate parameters and apply defaults. Each parameter entry should include:
type
: The expected Python type (e.g.,str
,int
,float
,bool
,list
,dict
)required
: Whether the parameter is required (default:False
)default
: Default value if not provided (optional)description
: Human-readable description of the parameter (optional)
For list parameters, you can specify the expected element type using element_type
.
PipelineExecutor¶
The PipelineExecutor
class handles the orchestration of pipeline execution, running modules in sequence according to the configuration.
from sc_pipeline.core.executor import PipelineExecutor
# Create and run a pipeline from a configuration file
executor = PipelineExecutor("path/to/config.yaml")
success = executor.run()
# Optionally, start from a checkpoint
success = executor.run(start_from="after_module1")
Key Methods¶
__init__(config_file)
: Initialize the executor with a configuration fileregister_module_type(module_type, module_class)
: Register a module type with its implementing classrun(start_from=None)
: Execute the pipeline, optionally starting from a checkpoint_get_module_class(module_type)
: Internal method to get the class for a module type_set_global_settings()
: Internal method to store global pipeline settings
Utility Modules¶
RBridge¶
The RBridge
class provides a bridge for calling R functions from Python, using a temporary workspace directory for file exchange.
from sc_pipeline.utils.r_bridge import RBridge
# Create an R bridge with a specified memory limit
r_bridge = RBridge(r_script_dir="./r_scripts", memory_limit_gb=16)
# Run an R script with arguments
success, stdout, stderr = r_bridge.run_r_script("script.R", {"arg1": "value1", "arg2": "value2"})
# Get the path to a file in the workspace
file_path = r_bridge.get_workspace_path("output.txt")
# Clean up when done
r_bridge.cleanup_workspace()
AnnData Utilities¶
The adata_utils
module provides utilities for working with AnnData objects, particularly focused on managing layers.
from sc_pipeline.utils.adata_utils import save_layer, set_active_layer
# Save data as a layer in AnnData
adata = save_layer(adata, name="raw_counts", data=None, make_active=False)
# Set a specific layer as the active layer (X matrix)
adata = set_active_layer(adata, layer_name="normalized")
Pipeline Execution Flow¶
- The
PipelineExecutor
parses the configuration file usingConfigParser
- Global settings are stored in the
DataContext
- Each module is executed in sequence:
- The module class is dynamically imported based on the module type
- An instance of the module is created with the specified name and parameters
- Required inputs are validated
- The module's
run
method is called with the data context - Outputs are validated
- If checkpointing is enabled, a checkpoint is created
- If any module fails, execution stops and returns
False
- If all modules complete successfully, execution returns
True
Checkpointing and Error Recovery¶
The pipeline supports checkpointing to save and restore state between runs. This allows for recovery from failures without having to restart the entire pipeline.
To enable checkpointing, configure the checkpointing section in your configuration:
pipeline:
# ... other settings ...
checkpointing:
enabled: true
modules_to_checkpoint: all # or a list of module names
max_checkpoints: 5 # leaving this at 1 will always save only the last successfully run module's checkpoint
To resume from a checkpoint, use the start_from
parameter when running the pipeline:
Module Development Guidelines¶
When developing new modules for the pipeline, follow these guidelines:
- Inherit from
AnalysisModule
- Define
PARAMETER_SCHEMA
to specify parameters and their validation rules - Define
required_inputs
andoutputs
in the__init__
method - Always define a logger instance:
self.logger = logging.getLogger(f"Module.{name}")
- Implement the
run(data_context)
method to perform the module's functionality - Handle exceptions and log appropriate messages
- Return
True
if successful,False
otherwise
Example implementation:
import logging
from sc_pipeline.core.module import AnalysisModule
class MyModule(AnalysisModule):
"""A custom module for the pipeline."""
PARAMETER_SCHEMA = {
'param1': {
'type': str,
'required': True,
'description': 'Description of parameter 1'
},
'param2': {
'type': int,
'default': 10,
'description': 'Description of parameter 2'
}
}
def __init__(self, name, params):
super().__init__(name, params)
self.logger = logging.getLogger(f"Module.{name}")
self.required_inputs = ["input1"]
self.outputs = ["output1"]
def run(self, data_context):
try:
# Get inputs
input_data = data_context.get("input1")
# Get parameters
param1 = self.params.get('param1')
param2 = self.params.get('param2', 10)
self.logger.info(f"Processing data with parameters: {param1}, {param2}")
# Process data
output_data = self._process_data(input_data, param1, param2)
# Store outputs
data_context.set("output1", output_data)
self.logger.info("Processing completed successfully")
return True
except Exception as e:
self.logger.error(f"Error in processing: {e}", exc_info=True)
return False
def _process_data(self, data, param1, param2):
# Implementation of data processing
pass
Troubleshooting and Debugging¶
Logging¶
The pipeline uses Python's logging module to provide detailed logs for troubleshooting. Adjust the log level to see more or less detail:
import logging
# Set log level for detailed output
logging.basicConfig(level=logging.DEBUG)
# For less detailed output
logging.basicConfig(level=logging.INFO)
Logger hierarchy:
- ConfigParser
: Messages related to configuration parsing
- DataContext
: Messages related to data management and checkpointing
- PipelineExecutor
: Messages related to pipeline execution
- Module.<name>
: Messages from specific modules (e.g., Module.data_loading
)
Advanced Configuration¶
Configuration File Structure¶
The configuration file controls the entire pipeline. Here's a more detailed explanation of its structure:
pipeline:
name: my_pipeline # Name of the pipeline
output_dir: ./output # Directory for output files
r_memory_limit_gb: 8 # Memory limit for R scripts
figure_defaults: # Default figure settings
width: 8
height: 6
checkpointing: # Checkpointing configuration
enabled: true
modules_to_checkpoint: all # or a list of module names
max_checkpoints: 5
modules: # List of modules to execute
- name: module1 # Name of this module instance
type: ModuleType # Type of module (class name)
params: # Module-specific parameters
param1: value1
param2: value2
- name: module2
type: AnotherModuleType
params:
param1: value1
Dynamic Module Loading¶
The pipeline automatically tries to import module classes based on their type. For example, if the configuration specifies type: DataLoading
, the executor will try to import:
register_module_type
method that can be passed a module_type
parameter used as a key to return the module_class
parameter. This would allow registering modules outside of the module folder, but it is as of yet unimplemented. For now, custom modules are best placed in the module directory.
Report Generation¶
The pipeline includes a ReportGenerator
module that can generate Markdown and HTML reports from the results of other modules. To add content to the report, modules can add figures using the data_context.add_figure()
method:
data_context.add_figure(
module_name=self.name,
title="Figure Title",
description="Description of the figure",
image_path=img_path,
caption="Figure caption"
)
Best Practices¶
- Immutability: Modules should avoid modifying input data directly, instead creating modified copies or using layers
- Error Handling: Always catch exceptions and log appropriate messages
- Validation: Validate inputs and parameters before processing
- Documentation: Provide detailed docstrings and parameter descriptions
- Logging: Use the logger to provide informative messages at appropriate levels
Conclusion¶
The Anatalyst core provides a flexible and extensible framework for building analysis pipelines that leverage both R and Python libraries. By understanding these core components, you can effectively develop new modules and customize the pipeline for specific research needs.
For more information on specific modules, see the Module Documentation.