Skip to main content

Post-Processing

The Post-Processing system manages the sequence of agents that refine content after execution, including content optimization, style matching, and quality control. This system enables lean content generation while maintaining high quality and consistency.

Overview

The Post-Processing system enables you to configure and manage the sequence of agents responsible for refining content after execution. This pipeline handles content optimization, style matching, quality control, and custom processing steps to ensure high-quality output.

What is Post-Processing?

Post-processing is a series of specialized agents that work together to refine and enhance content after the initial generation phase. This multi-step approach ensures that output meets quality standards, matches desired styles, and maintains consistency across different content types.

Key Benefits

  • Content Optimization: Lean content generation with automatic refinement
  • Style Consistency: Maintain brand voice and formatting standards
  • Quality Control: Detect and prevent hallucinations and errors
  • Custom Processing: Add specialized steps for your specific needs
  • Iterative Improvement: Continuous refinement through multiple stages
  • Performance Optimization: Streamlined processing with targeted improvements
  • Quality Assurance: Automated validation and error detection

Core Concepts

Processing Pipeline

The post-processing pipeline consists of multiple stages, each handled by specialized agents:
  1. Content Optimization: Streamline and improve generated content
  2. Style Matching: Apply consistent formatting and tone
  3. Quality Control: Validate accuracy and detect issues
  4. Custom Processing: Apply domain-specific refinements
  5. Final Review: Ensure all requirements are met

Processing Steps

Each processing step can be configured with specific parameters and conditions:
{
  "stepId": "content_optimizer",
  "name": "Content Optimization",
  "description": "Streamlines and improves generated content",
  "agentType": "content_optimizer",
  "order": 1,
  "enabled": true,
  "configuration": {
    "targetLength": "concise",
    "styleGuide": "professional",
    "removeRedundancy": true,
    "improveReadability": true,
    "optimizationLevel": "aggressive"
  },
  "conditions": {
    "contentType": ["document", "report"],
    "minLength": 100,
    "maxLength": 10000
  },
  "performance": {
    "timeout": 30,
    "retryAttempts": 3,
    "fallbackEnabled": true
  }
}

API Endpoints

View Processing Pipeline

Authentication Required: All post-processing endpoints require a valid API key in the Authorization header.
curl -X GET https://api.artosai.com/post-processing/pipeline \
  -H "Authorization: Bearer YOUR_API_KEY"
{
  "pipeline": {
    "id": "pipeline_xyz789",
    "name": "Default Post-Processing Pipeline",
    "description": "Standard content refinement pipeline",
    "status": "active",
    "steps": [
      {
        "stepId": "content_optimizer",
        "name": "Content Optimization",
        "description": "Streamlines and improves generated content",
        "agentType": "content_optimizer",
        "order": 1,
        "enabled": true,
        "configuration": {
          "targetLength": "concise",
          "styleGuide": "professional",
          "removeRedundancy": true
        }
      },
      {
        "stepId": "style_matcher",
        "name": "Style Matching",
        "description": "Applies consistent formatting and tone",
        "agentType": "style_matcher",
        "order": 2,
        "enabled": true,
        "configuration": {
          "tone": "professional",
          "formatting": "standard",
          "brandGuidelines": "default"
        }
      },
      {
        "stepId": "quality_checker",
        "name": "Quality Control",
        "description": "Validates accuracy and detects issues",
        "agentType": "quality_checker",
        "order": 3,
        "enabled": true,
        "configuration": {
          "hallucinationDetection": true,
          "factChecking": true,
          "consistencyCheck": true
        }
      }
    ],
    "createdAt": "2024-01-15T10:30:00Z",
    "updatedAt": "2024-01-15T10:30:00Z"
  }
}

Add Processing Step

curl -X POST https://api.artosai.com/post-processing/steps \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Custom Data Validator",
    "description": "Validates data consistency and completeness",
    "agentType": "data_validator",
    "order": 4,
    "configuration": {
      "validationRules": {
        "requiredFields": ["id", "name", "value"],
        "dataTypes": {
          "id": "integer",
          "name": "string",
          "value": "float"
        },
        "constraints": {
          "minValue": 0,
          "maxLength": 255
        }
      }
    },
    "conditions": {
      "contentType": ["tabular", "structured"],
      "minRows": 1
    }
  }'
{
  "stepId": "custom_data_validator_abc123",
  "name": "Custom Data Validator",
  "description": "Validates data consistency and completeness",
  "agentType": "data_validator",
  "order": 4,
  "enabled": true,
  "configuration": {
    "validationRules": {
      "requiredFields": ["id", "name", "value"],
      "dataTypes": {
        "id": "integer",
        "name": "string",
        "value": "float"
      },
      "constraints": {
        "minValue": 0,
        "maxLength": 255
      }
    }
  },
  "conditions": {
    "contentType": ["tabular", "structured"],
    "minRows": 1
  },
  "createdAt": "2024-01-15T10:30:00Z"
}

Modify Processing Step

curl -X PUT https://api.artosai.com/post-processing/steps/custom_data_validator_abc123 \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "configuration": {
      "validationRules": {
        "requiredFields": ["id", "name", "value", "category"],
        "dataTypes": {
          "id": "integer",
          "name": "string",
          "value": "float",
          "category": "string"
        }
      }
    }
  }'
{
  "stepId": "custom_data_validator_abc123",
  "name": "Custom Data Validator",
  "description": "Validates data consistency and completeness",
  "agentType": "data_validator",
  "order": 4,
  "enabled": true,
  "configuration": {
    "validationRules": {
      "requiredFields": ["id", "name", "value", "category"],
      "dataTypes": {
        "id": "integer",
        "name": "string",
        "value": "float",
        "category": "string"
      },
      "constraints": {
        "minValue": 0,
        "maxLength": 255
      }
    }
  },
  "updatedAt": "2024-01-15T11:30:00Z"
}

Remove Processing Step

curl -X DELETE https://api.artosai.com/post-processing/steps/custom_data_validator_abc123 \
  -H "Authorization: Bearer YOUR_API_KEY"
{
  "success": true,
  "message": "Processing step removed successfully"
}

Test Processing Step

curl -X POST https://api.artosai.com/post-processing/steps/custom_data_validator_abc123/test \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "inputData": {
      "data": [
        {"id": 1, "name": "Test Item", "value": 100.5},
        {"id": 2, "name": "Test Item 2", "value": 200.3}
      ],
      "format": "json"
    },
    "expectedOutput": {
      "validationPassed": true,
      "processedRows": 2
    }
  }'
{
  "testId": "test_xyz789",
  "status": "passed",
  "results": {
    "validationPassed": true,
    "processedRows": 2,
    "processingTime": 0.3,
    "accuracy": 1.0
  },
  "details": {
    "validationResults": {
      "requiredFields": "passed",
      "dataTypes": "passed",
      "constraints": "passed"
    },
    "performanceMetrics": {
      "latency": "0.3s",
      "throughput": "6.7 rows/s"
    }
  }
}

SDK Methods

Python SDK

from artos import ArtosClient, PostProcessingPipeline, ProcessingStep

# Initialize client
client = ArtosClient(api_key="your_api_key")

# Create processing step
step = ProcessingStep(
    name="Custom Data Validator",
    description="Validates data consistency and completeness",
    agent_type="data_validator",
    order=4,
    configuration={
        "validationRules": {
            "requiredFields": ["id", "name", "value"],
            "dataTypes": {
                "id": "integer",
                "name": "string",
                "value": "float"
            },
            "constraints": {
                "minValue": 0,
                "maxLength": 255
            }
        }
    },
    conditions={
        "contentType": ["tabular", "structured"],
        "minRows": 1
    }
)

# Add step to pipeline
pipeline = client.post_processing.get_pipeline()
pipeline.add_step(step)
updated_pipeline = client.post_processing.update_pipeline(pipeline)

# Test processing step
test_result = client.post_processing.test_step(
    step_id="custom_data_validator_abc123",
    input_data={
        "data": [
            {"id": 1, "name": "Test Item", "value": 100.5}
        ],
        "format": "json"
    },
    expected_output={
        "validationPassed": True,
        "processedRows": 1
    }
)

# List all processing steps
steps = client.post_processing.list_steps(
    pipeline_id="pipeline_xyz789",
    enabled_only=True
)

# Update step configuration
updated_step = client.post_processing.update_step(
    step_id="custom_data_validator_abc123",
    configuration={
        "validationRules": {
            "requiredFields": ["id", "name", "value", "category"]
        }
    }
)

# Remove step from pipeline
client.post_processing.remove_step("custom_data_validator_abc123")

Processing Step Details

Content Optimization

The content optimizer step streamlines and improves generated content:
{
  "stepId": "content_optimizer",
  "name": "Content Optimization",
  "agentType": "content_optimizer",
  "configuration": {
    "targetLength": "concise",
    "styleGuide": "professional",
    "removeRedundancy": true,
    "improveReadability": true,
    "optimizationLevel": "aggressive",
    "preserveKeyInformation": true,
    "targetAudience": "technical",
    "contentType": "document"
  },
  "conditions": {
    "contentType": ["document", "report", "summary"],
    "minLength": 100,
    "maxLength": 10000
  }
}

Style Matching

The style matcher step applies consistent formatting and tone:
{
  "stepId": "style_matcher",
  "name": "Style Matching",
  "agentType": "style_matcher",
  "configuration": {
    "tone": "professional",
    "formatting": "standard",
    "brandGuidelines": "default",
    "language": "en",
    "formality": "formal",
    "technicalLevel": "intermediate",
    "customStyleRules": {
      "sentenceLength": "medium",
      "paragraphStructure": "standard",
      "terminology": "industry_standard"
    }
  },
  "conditions": {
    "contentType": ["document", "email", "report"],
    "targetAudience": ["internal", "external"]
  }
}

Quality Control

The quality checker step validates accuracy and detects issues:
{
  "stepId": "quality_checker",
  "name": "Quality Control",
  "agentType": "quality_checker",
  "configuration": {
    "hallucinationDetection": true,
    "factChecking": true,
    "consistencyCheck": true,
    "grammarCheck": true,
    "spellCheck": true,
    "plagiarismDetection": false,
    "accuracyThreshold": 0.95,
    "confidenceScoring": true,
    "errorReporting": {
      "level": "detailed",
      "includeSuggestions": true
    }
  },
  "conditions": {
    "contentType": ["document", "report", "analysis"],
    "minLength": 50
  }
}

Custom Processing Steps

Create custom processing steps for domain-specific requirements:
{
  "stepId": "custom_data_validator",
  "name": "Custom Data Validator",
  "agentType": "data_validator",
  "configuration": {
    "validationRules": {
      "requiredFields": ["id", "name", "value", "category"],
      "dataTypes": {
        "id": "integer",
        "name": "string",
        "value": "float",
        "category": "string"
      },
      "constraints": {
        "id": {
          "minValue": 1,
          "unique": true
        },
        "name": {
          "maxLength": 255,
          "pattern": "^[a-zA-Z0-9\\s-_]+$"
        },
        "value": {
          "minValue": 0,
          "maxValue": 1000000
        },
        "category": {
          "allowedValues": ["A", "B", "C", "D"]
        }
      }
    },
    "qualityChecks": {
      "duplicateDetection": true,
      "outlierDetection": true,
      "completenessCheck": true,
      "dataIntegrityCheck": true
    },
    "errorHandling": {
      "onValidationFailure": "log_and_continue",
      "onDataTypeMismatch": "attempt_conversion",
      "onConstraintViolation": "flag_for_review"
    }
  },
  "conditions": {
    "contentType": ["tabular", "structured"],
    "minRows": 1,
    "maxRows": 100000
  }
}

Integration Examples

AWS Bedrock Integration

from artos import ArtosClient, ProcessingStep
import boto3

class AWSBedrockPostProcessor:
    def __init__(self, api_key, aws_region="us-east-1"):
        self.client = ArtosClient(api_key=api_key)
        self.bedrock = boto3.client('bedrock-runtime', region_name=aws_region)
    
    def create_bedrock_optimizer_step(self, model_id):
        """Create content optimization step using AWS Bedrock"""
        step = ProcessingStep(
            name=f"Bedrock Content Optimizer ({model_id})",
            description=f"Content optimization using AWS Bedrock {model_id}",
            agent_type="content_optimizer",
            order=1,
            configuration={
                "bedrockConfig": {
                    "modelId": model_id,
                    "region": "us-east-1",
                    "maxTokens": 4000,
                    "temperature": 0.7
                },
                "optimizationSettings": {
                    "targetLength": "concise",
                    "styleGuide": "professional",
                    "removeRedundancy": True,
                    "improveReadability": True
                }
            },
            conditions={
                "contentType": ["document", "report"],
                "minLength": 100
            }
        )
        
        return self.client.post_processing.add_step(step)
    
    def create_bedrock_quality_checker(self, model_id):
        """Create quality checking step using AWS Bedrock"""
        step = ProcessingStep(
            name=f"Bedrock Quality Checker ({model_id})",
            description=f"Quality checking using AWS Bedrock {model_id}",
            agent_type="quality_checker",
            order=3,
            configuration={
                "bedrockConfig": {
                    "modelId": model_id,
                    "region": "us-east-1",
                    "maxTokens": 2000,
                    "temperature": 0.3
                },
                "qualityChecks": {
                    "hallucinationDetection": True,
                    "factChecking": True,
                    "consistencyCheck": True,
                    "accuracyThreshold": 0.95
                }
            },
            conditions={
                "contentType": ["document", "report", "analysis"],
                "minLength": 50
            }
        )
        
        return self.client.post_processing.add_step(step)

# Usage
bedrock_processor = AWSBedrockPostProcessor("your_api_key")

# Create optimization step
optimizer_step = bedrock_processor.create_bedrock_optimizer_step(
    "anthropic.claude-3-sonnet-20240229-v1:0"
)

# Create quality checker step
quality_step = bedrock_processor.create_bedrock_quality_checker(
    "anthropic.claude-3-sonnet-20240229-v1:0"
)

Azure Foundry Integration

from artos import ArtosClient, ProcessingStep
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

class AzureFoundryPostProcessor:
    def __init__(self, api_key, subscription_id, resource_group, workspace_name):
        self.client = ArtosClient(api_key=api_key)
        self.credential = DefaultAzureCredential()
        self.ml_client = MLClient(
            credential=self.credential,
            subscription_id=subscription_id,
            resource_group_name=resource_group,
            workspace_name=workspace_name
        )
    
    def create_foundry_style_matcher(self, endpoint_name, model_name):
        """Create style matching step using Azure Foundry"""
        step = ProcessingStep(
            name=f"Foundry Style Matcher ({endpoint_name})",
            description=f"Style matching using Azure Foundry {endpoint_name}",
            agent_type="style_matcher",
            order=2,
            configuration={
                "foundryConfig": {
                    "endpointName": endpoint_name,
                    "modelName": model_name,
                    "subscriptionId": self.subscription_id,
                    "resourceGroup": self.resource_group,
                    "workspaceName": self.workspace_name
                },
                "styleSettings": {
                    "tone": "professional",
                    "formatting": "standard",
                    "brandGuidelines": "default",
                    "language": "en",
                    "formality": "formal"
                }
            },
            conditions={
                "contentType": ["document", "email", "report"],
                "targetAudience": ["internal", "external"]
            }
        )
        
        return self.client.post_processing.add_step(step)
    
    def create_foundry_data_validator(self, endpoint_name, model_name):
        """Create data validation step using Azure Foundry"""
        step = ProcessingStep(
            name=f"Foundry Data Validator ({endpoint_name})",
            description=f"Data validation using Azure Foundry {endpoint_name}",
            agent_type="data_validator",
            order=4,
            configuration={
                "foundryConfig": {
                    "endpointName": endpoint_name,
                    "modelName": model_name,
                    "subscriptionId": self.subscription_id,
                    "resourceGroup": self.resource_group,
                    "workspaceName": self.workspace_name
                },
                "validationRules": {
                    "requiredFields": ["id", "name", "value"],
                    "dataTypes": {
                        "id": "integer",
                        "name": "string",
                        "value": "float"
                    },
                    "constraints": {
                        "minValue": 0,
                        "maxLength": 255
                    }
                }
            },
            conditions={
                "contentType": ["tabular", "structured"],
                "minRows": 1
            }
        )
        
        return self.client.post_processing.add_step(step)

# Usage
foundry_processor = AzureFoundryPostProcessor(
    api_key="your_api_key",
    subscription_id="your_subscription_id",
    resource_group="your_resource_group",
    workspace_name="your_workspace"
)

# Create style matcher step
style_step = foundry_processor.create_foundry_style_matcher(
    endpoint_name="style-matcher-endpoint",
    model_name="style-matcher-model"
)

# Create data validator step
validator_step = foundry_processor.create_foundry_data_validator(
    endpoint_name="data-validator-endpoint",
    model_name="data-validator-model"
)

Error Handling

Common Error Codes

Error CodeDescriptionResolution
STEP_NOT_FOUNDProcessing step does not existVerify step ID and permissions
INVALID_CONFIGURATIONStep configuration is invalidCheck configuration schema and required fields
AGENT_TYPE_NOT_SUPPORTEDSpecified agent type is not availableVerify agent type exists and is accessible
CONDITION_VIOLATIONStep conditions are not metEnsure input data meets step conditions
PROCESSING_TIMEOUTStep processing exceeded timeoutIncrease timeout or optimize processing
RATE_LIMIT_EXCEEDEDToo many requests to processing stepImplement exponential backoff and retry logic

Error Handling Examples

from artos import ArtosClient, PostProcessingError
import time

class RobustPostProcessor:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key=api_key)
    
    def add_step_with_retry(self, step_config, max_retries=3):
        """Add processing step with retry logic"""
        for attempt in range(max_retries):
            try:
                step = self.client.post_processing.add_step(step_config)
                return step
            except PostProcessingError as e:
                if e.code == "RATE_LIMIT_EXCEEDED":
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"Rate limited, waiting {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                elif e.code == "INVALID_CONFIGURATION":
                    print(f"Configuration error: {e.message}")
                    raise
                else:
                    print(f"Unexpected error: {e.message}")
                    raise
    
    def test_step_safely(self, step_id, test_data):
        """Test processing step with comprehensive error handling"""
        try:
            result = self.client.post_processing.test_step(step_id, test_data)
            return result
        except PostProcessingError as e:
            if e.code == "STEP_NOT_FOUND":
                print(f"Step {step_id} not found")
                return None
            elif e.code == "CONDITION_VIOLATION":
                print(f"Condition violation: {e.message}")
                # Try to adjust test data to meet conditions
                adjusted_data = self.adjust_test_data(test_data, e.details)
                return self.client.post_processing.test_step(step_id, adjusted_data)
            elif e.code == "PROCESSING_TIMEOUT":
                print(f"Processing timeout: {e.message}")
                # Try with reduced data size
                reduced_data = self.reduce_data_size(test_data)
                return self.client.post_processing.test_step(step_id, reduced_data)
            else:
                print(f"Test failed: {e.message}")
                raise
    
    def adjust_test_data(self, data, condition_details):
        """Adjust test data to meet step conditions"""
        # Implementation for data adjustment
        pass
    
    def reduce_data_size(self, data):
        """Reduce data size to avoid timeout"""
        # Implementation for data size reduction
        pass

# Usage
processor = RobustPostProcessor("your_api_key")

try:
    step = processor.add_step_with_retry({
        "name": "test_step",
        "description": "Test processing step",
        "agentType": "data_validator",
        "order": 1,
        "configuration": {
            "validationRules": {
                "requiredFields": ["id", "name"]
            }
        }
    })
    
    test_result = processor.test_step_safely(
        step.id,
        {
            "inputData": {
                "data": [{"id": 1, "name": "Test"}],
                "format": "json"
            }
        }
    )
    
except Exception as e:
    print(f"Failed to add/test step: {e}")

Rate Limiting

Rate Limits

EndpointRate LimitWindow
View Pipeline100 requests1 minute
Add Step20 requests1 minute
Update Step30 requests1 minute
Remove Step10 requests1 minute
Test Step50 requests1 minute

Rate Limit Handling

import time
from artos import ArtosClient, RateLimitError

class RateLimitedPostProcessor:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key=api_key)
        self.request_timestamps = []
    
    def _check_rate_limit(self, endpoint):
        """Check if request would exceed rate limit"""
        current_time = time.time()
        window_start = current_time - 60  # 1 minute window
        
        # Remove old timestamps
        self.request_timestamps = [
            ts for ts in self.request_timestamps 
            if ts > window_start
        ]
        
        # Check limits based on endpoint
        limits = {
            "view": 100,
            "add": 20,
            "update": 30,
            "remove": 10,
            "test": 50
        }
        
        if len(self.request_timestamps) >= limits.get(endpoint, 20):
            wait_time = 60 - (current_time - self.request_timestamps[0])
            if wait_time > 0:
                time.sleep(wait_time)
        
        self.request_timestamps.append(current_time)
    
    def add_step(self, step_config):
        """Add step with rate limit handling"""
        self._check_rate_limit("add")
        
        try:
            return self.client.post_processing.add_step(step_config)
        except RateLimitError:
            print("Rate limit exceeded, waiting 60 seconds...")
            time.sleep(60)
            return self.client.post_processing.add_step(step_config)
    
    def test_step(self, step_id, test_data):
        """Test step with rate limit handling"""
        self._check_rate_limit("test")
        
        try:
            return self.client.post_processing.test_step(step_id, test_data)
        except RateLimitError:
            print("Rate limit exceeded, waiting 60 seconds...")
            time.sleep(60)
            return self.client.post_processing.test_step(step_id, test_data)

# Usage
client = RateLimitedPostProcessor("your_api_key")

# Multiple requests with automatic rate limit handling
for i in range(25):
    try:
        step = client.add_step({
            "name": f"test_step_{i}",
            "description": f"Test step {i}",
            "agentType": "data_validator",
            "order": i + 1,
            "configuration": {
                "validationRules": {
                    "requiredFields": ["id", "name"]
                }
            }
        })
        print(f"Added step {i}")
    except Exception as e:
        print(f"Failed to add step {i}: {e}")

Best Practices

Pipeline Design

  1. Logical Ordering: Arrange steps in logical processing order
  2. Conditional Processing: Use conditions to apply steps only when needed
  3. Performance Optimization: Place fast steps before slow ones
  4. Error Handling: Include error handling and fallback mechanisms
  5. Monitoring: Add monitoring and logging for each step

Configuration Management

  1. Environment-Specific Configs: Use different configurations for dev/staging/prod
  2. Version Control: Track configuration changes in version control
  3. Documentation: Document configuration options and their impact
  4. Testing: Test configurations with representative data

Performance Optimization

  1. Step Optimization: Optimize individual step performance
  2. Parallel Processing: Use parallel processing where possible
  3. Caching: Implement caching for expensive operations
  4. Resource Management: Monitor and optimize resource usage

Security Considerations

  1. Input Validation: Validate all input data before processing
  2. Access Control: Implement proper access controls for sensitive data
  3. Data Encryption: Encrypt sensitive data in transit and at rest
  4. Audit Logging: Log all processing operations for security auditing

Monitoring and Analytics

Performance Metrics

Track key performance indicators for your post-processing pipeline:
from artos import ArtosClient
import time

class PostProcessingMonitor:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key=api_key)
    
    def get_pipeline_metrics(self, pipeline_id, time_range="24h"):
        """Get pipeline performance metrics"""
        metrics = self.client.post_processing.get_metrics(
            pipeline_id=pipeline_id,
            time_range=time_range
        )
        
        return {
            "totalExecutions": metrics.total_executions,
            "successRate": metrics.success_rate,
            "averageProcessingTime": metrics.avg_processing_time,
            "errorRate": metrics.error_rate,
            "stepMetrics": metrics.step_metrics
        }
    
    def get_step_metrics(self, step_id, time_range="24h"):
        """Get individual step performance metrics"""
        metrics = self.client.post_processing.get_step_metrics(
            step_id=step_id,
            time_range=time_range
        )
        
        return {
            "executions": metrics.executions,
            "successRate": metrics.success_rate,
            "averageProcessingTime": metrics.avg_processing_time,
            "errorCount": metrics.error_count,
            "performanceTrend": metrics.performance_trend
        }
    
    def monitor_pipeline_health(self, pipeline_id):
        """Monitor pipeline health and alert on issues"""
        metrics = self.get_pipeline_metrics(pipeline_id)
        
        alerts = []
        
        if metrics["successRate"] < 0.95:
            alerts.append({
                "type": "low_success_rate",
                "message": f"Success rate is {metrics['successRate']:.2%}",
                "severity": "warning"
            })
        
        if metrics["averageProcessingTime"] > 30:
            alerts.append({
                "type": "high_processing_time",
                "message": f"Average processing time is {metrics['averageProcessingTime']:.2f}s",
                "severity": "warning"
            })
        
        if metrics["errorRate"] > 0.05:
            alerts.append({
                "type": "high_error_rate",
                "message": f"Error rate is {metrics['errorRate']:.2%}",
                "severity": "critical"
            })
        
        return alerts

# Usage
monitor = PostProcessingMonitor("your_api_key")

# Get pipeline metrics
pipeline_metrics = monitor.get_pipeline_metrics("pipeline_xyz789")
print(f"Pipeline success rate: {pipeline_metrics['successRate']:.2%}")

# Get step metrics
step_metrics = monitor.get_step_metrics("custom_data_validator_abc123")
print(f"Step processing time: {step_metrics['averageProcessingTime']:.2f}s")

# Monitor pipeline health
alerts = monitor.monitor_pipeline_health("pipeline_xyz789")
for alert in alerts:
    print(f"{alert['severity'].upper()}: {alert['message']}")

Real-Time Monitoring

import asyncio
from artos import ArtosClient

class RealTimePostProcessingMonitor:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key=api_key)
    
    async def monitor_executions(self, pipeline_id):
        """Monitor post-processing executions in real-time"""
        async for execution in self.client.post_processing.monitor_executions(pipeline_id):
            print(f"Execution {execution.id}: {execution.status}")
            
            if execution.status == "completed":
                print(f"  Processing time: {execution.processing_time:.2f}s")
                print(f"  Steps completed: {len(execution.completed_steps)}")
                
                for step in execution.completed_steps:
                    print(f"    Step {step.name}: {step.duration:.2f}s")
            
            elif execution.status == "failed":
                print(f"  Error: {execution.error}")
                print(f"  Failed step: {execution.failed_step}")
    
    async def monitor_step_performance(self, step_id):
        """Monitor individual step performance in real-time"""
        async for metric in self.client.post_processing.monitor_step_performance(step_id):
            print(f"Step {step_id} performance update:")
            print(f"  Success rate: {metric.success_rate:.2%}")
            print(f"  Average time: {metric.avg_processing_time:.2f}s")
            print(f"  Error count: {metric.error_count}")

# Usage
async def main():
    monitor = RealTimePostProcessingMonitor("your_api_key")
    
    # Monitor pipeline executions
    await monitor.monitor_executions("pipeline_xyz789")
    
    # Monitor step performance
    await monitor.monitor_step_performance("custom_data_validator_abc123")

# Run monitoring
asyncio.run(main())
This comprehensive documentation covers all aspects of the Post-Processing API, including detailed examples, integration patterns, error handling, rate limiting, best practices, and monitoring/analytics capabilities. The documentation provides both API-level and SDK-level information to help developers effectively configure and manage post-processing pipelines.