Skip to main content

Python SDK Usage

The Artos Python SDK provides a comprehensive interface for building custom workflows with connectors, post-processing, platform integration, and debugging capabilities. This guide covers all major features with practical examples.

Client Initialization

from artos import ArtosClient

# Initialize client with API key
client = ArtosClient(api_key="your_api_key")

# Alternative initialization with custom configuration
client = ArtosClient(
    api_key="your_api_key",
    api_url="https://api.artosai.com",
    timeout=30,
    retries=3
)

Document Management

Upload Documents

# Upload single document
document = client.documents.upload(
    file_path="path/to/document.docx",
    file_type="docx",  # Optional - auto-detected
    extraction_format="json"
)

print(f"Document uploaded: {document.id}")
print(f"Processing status: {document.status}")

# Upload with custom settings
document = client.documents.upload(
    file_path="clinical-data.csv",
    extraction_format="csv",
    csv_delimiter=",",
    extract_tables=True
)

# Upload waits for processing completion automatically
print(f"Document ready: {document.is_ready}")
print(f"Extracted content available: {document.has_content}")

List Documents

# List all documents
documents = client.documents.list()
print(f"Found {len(documents)} documents")

# List with filters
documents = client.documents.list(
    file_type="docx",
    limit=10,
    page=1
)

# Access document properties
for doc in documents:
    print(f"Document: {doc.filename}")
    print(f"Size: {doc.file_size_mb} MB")
    print(f"Upload date: {doc.upload_date}")
    print(f"Processing status: {doc.status}")

Get Document Details

# Get specific document
document = client.documents.get("document-id")

# Document properties
print(f"ID: {document.id}")
print(f"Filename: {document.filename}")
print(f"Type: {document.file_type}")
print(f"S3 URL: {document.s3_url}")
print(f"Content extracted: {document.has_content}")

# Download document content
content = document.download()
print(f"Downloaded {len(content)} bytes")

Search Documents

# Search across all documents
results = client.documents.search(
    query="patient eligibility criteria",
    top_k=10
)

# Search specific documents
results = client.documents.search(
    query="adverse events",
    source_documents=["doc-1", "doc-2"],
    search_type="hybrid"
)

# Access search results
for result in results:
    print(f"Document: {result.document_title}")
    print(f"Relevance: {result.score}")
    print(f"Snippet: {result.snippet}")
    print(f"Source: {result.url}")

Delete Documents (Admin Only)

# Delete document
client.documents.delete("document-id")

# Delete multiple documents
client.documents.delete_batch(["doc-1", "doc-2", "doc-3"])

Agent Management

List Available Agents

# List all agents
agents = client.agents.list()

# List by type
builtin_agents = client.agents.list(agent_type="builtin")
custom_agents = client.agents.list(agent_type="custom")

# Access agent properties
for agent in agents:
    print(f"Agent: {agent.name}")
    print(f"Type: {agent.type}")
    print(f"Category: {agent.category}")
    print(f"Capabilities: {agent.capabilities}")

Create Custom Agent

# Create custom agent
agent = client.agents.create(
    name="Safety Report Analyzer",
    description="Analyzes clinical trial safety data",
    category="analysis",
    instructions="Extract safety information and create summary tables",
    output_format="html"
)

print(f"Created agent: {agent.id}")
print(f"Agent name: {agent.name}")

Call Agent Directly

# Call agent with documents
result = client.agents.call(
    agent_id="agent-search-001",
    document_urls=["s3://bucket/doc1.docx", "s3://bucket/doc2.docx"],
    instructions="Focus on primary endpoints"
)

print(f"Agent output: {result.output}")
print(f"Processing time: {result.processing_time_seconds}")
print(f"Documents processed: {result.documents_processed}")

Test Custom Agent

# Test agent before using in production
test_result = client.agents.test(
    agent_id="custom-agent-123",
    test_documents=["test-doc-1"],
    instructions="Test analysis"
)

if test_result.success:
    print("Agent test passed")
    print(f"Test output: {test_result.output}")
else:
    print(f"Agent test failed: {test_result.error}")

Update and Delete Agents

# Update custom agent
updated_agent = client.agents.update(
    agent_id="custom-agent-123",
    name="Enhanced Safety Analyzer",
    instructions="Updated analysis instructions"
)

# Delete custom agent
client.agents.delete("custom-agent-123")

Connectors API

Create Connectors

from artos import ConnectorBuilder

# Create a connector using the builder pattern
connector = ConnectorBuilder.create("table_processor_connector") \
    .set_description("Handles tabular data processing and insertion") \
    .add_agent_types(["data_validator", "table_processor"]) \
    .add_data_types(["csv", "excel", "json"]) \
    .set_usage_rules({
        "when": "data contains tabular information",
        "priority": "high",
        "constraints": {
            "maxFileSize": "10MB",
            "supportedFormats": ["csv", "xlsx", "json"]
        }
    }) \
    .set_configuration({
        "validationRules": {
            "requiredColumns": ["id", "name", "value"],
            "dataTypes": {
                "id": "integer",
                "name": "string",
                "value": "float"
            }
        },
        "processingOptions": {
            "batchSize": 1000,
            "enableTransformation": True
        }
    }) \
    .build()

# Deploy connector
deployed_connector = client.connectors.deploy(connector)
print(f"Connector deployed with ID: {deployed_connector.id}")

Manage Connectors

# List all connectors
connectors = client.connectors.list()
for conn in connectors:
    print(f"Connector: {conn.name}")
    print(f"Status: {conn.status}")
    print(f"Agent Types: {conn.agent_types}")

# Get specific connector
connector = client.connectors.get("conn_abc123")

# Update connector
updated_connector = client.connectors.update(
    connector_id="conn_abc123",
    description="Updated description",
    configuration={
        "validationRules": {
            "requiredColumns": ["id", "name", "value", "category"]
        }
    }
)

# Delete connector
client.connectors.delete("conn_abc123")

# Get connector schema
schema = client.connectors.get_schema("conn_abc123")

Connector Integration Examples

# AWS Bedrock Integration
import boto3
from artos_sdk import ConnectorBuilder

bedrock_client = boto3.client('bedrock-runtime', region_name='us-east-1')

connector = ConnectorBuilder.create("bedrock_processor") \
    .defineAgent("bedrock_analyzer") \
    .defineAgent("bedrock_summarizer") \
    .setDataTypes(["text", "json"]) \
    .setUsageRules({
        "when": "using AWS Bedrock for analysis",
        "priority": "high",
        "constraints": {
            "maxInputLength": "10000",
            "model": "anthropic.claude-3-sonnet-20240229-v1:0"
        }
    }) \
    .setConfiguration({
        "bedrockConfig": {
            "client": bedrock_client,
            "modelId": "anthropic.claude-3-sonnet-20240229-v1:0",
            "maxTokens": 4000
        }
    }) \
    .deploy()

# Azure Foundry Integration
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
ml_client = MLClient(credential, subscription_id, resource_group, workspace_name)

connector = ConnectorBuilder.create("azure_processor") \
    .defineAgent("azure_analyzer") \
    .defineAgent("azure_transformer") \
    .setDataTypes(["text", "tabular"]) \
    .setUsageRules({
        "when": "using Azure Foundry for processing",
        "priority": "high",
        "constraints": {
            "maxFileSize": "20MB",
            "computeType": "Standard_NC6"
        }
    }) \
    .setConfiguration({
        "azureConfig": {
            "mlClient": ml_client,
            "computeTarget": "Standard_NC6",
            "modelName": "custom-transformer"
        }
    }) \
    .deploy()

Post-Processing API

Configure Post-Processing Pipeline

from artos import ProcessingStep

# Create processing step
step = ProcessingStep(
    name="Custom Data Validator",
    description="Validates data consistency and completeness",
    agent_type="data_validator",
    order=4,
    configuration={
        "validationRules": {
            "requiredFields": ["id", "name", "value"],
            "dataTypes": {
                "id": "integer",
                "name": "string",
                "value": "float"
            },
            "constraints": {
                "minValue": 0,
                "maxLength": 255
            }
        }
    },
    conditions={
        "contentType": ["tabular", "structured"],
        "minRows": 1
    }
)

# Add step to pipeline
pipeline = client.post_processing.get_pipeline()
pipeline.add_step(step)
updated_pipeline = client.post_processing.update_pipeline(pipeline)

# Test processing step
test_result = client.post_processing.test_step(
    step_id="custom_data_validator_abc123",
    input_data={
        "data": [
            {"id": 1, "name": "Test Item", "value": 100.5}
        ],
        "format": "json"
    },
    expected_output={
        "validationPassed": True,
        "processedRows": 1
    }
)

# Execute post-processing
execution_result = client.post_processing.execute({
    "content": "Your content to process here...",
    "contentType": "document",
    "pipelineId": "pipeline_xyz789",
    "options": {
        "skipSteps": ["quality_checker"],
        "customConfig": {
            "targetLength": "detailed",
            "styleGuide": "academic"
        }
    }
})

print(f"Processing completed: {execution_result.status}")
print(f"Quality score: {execution_result.qualityScore}")

Manage Processing Steps

# Get current pipeline
pipeline = client.post_processing.get_pipeline()

# Add processing step
step = client.post_processing.add_step({
    "name": "Custom Data Validator",
    "description": "Validates data consistency and completeness",
    "agentType": "data_validator",
    "order": 4,
    "configuration": {
        "validationRules": {
            "requiredFields": ["id", "name", "value"],
            "dataTypes": {
                "id": "integer",
                "name": "string",
                "value": "float"
            }
        }
    },
    "conditions": {
        "contentType": ["tabular", "structured"],
        "minRows": 1
    }
})

# Update processing step
updated_step = client.post_processing.update_step(
    step_id="custom_data_validator_abc123",
    configuration={
        "validationRules": {
            "requiredFields": ["id", "name", "value", "category"]
        }
    }
)

# Remove processing step
client.post_processing.remove_step("custom_data_validator_abc123")

# Execute post-processing manually
execution_result = client.post_processing.execute({
    "content": "Your content to process here...",
    "contentType": "document",
    "pipelineId": "pipeline_xyz789",
    "options": {
        "skipSteps": ["quality_checker"],
        "customConfig": {
            "targetLength": "detailed",
            "styleGuide": "academic"
        }
    }
})

Frontend Customization

ArtosApp Component Integration

# Generate configuration for the ArtosApp React component
from artos import FrontendCustomizer

customizer = FrontendCustomizer()

# Create ArtosApp configuration
app_config = customizer.create_app_config({
    "apiKey": "your_api_key",
    "theme": "light",
    "userId": "user_123",
    "userEmail": "[email protected]", 
    "userRole": "admin",
    "onWorkflowCreate": "handleWorkflowCreate",
    "onWorkflowExecute": "handleWorkflowExecute",
    "onError": "handleError",
    "customConfig": {
        "branding": {
            "companyName": "Your Company",
            "primaryColor": "#FF6B35",
            "secondaryColor": "#004E89"
        },
        "features": {
            "workflowCreation": True,
            "workflowExecution": True,
            "debugging": True,
            "analytics": True
        }
    }
})

# Generate React component code
component_code = customizer.generate_component_code({
    "componentType": "ArtosApp",
    "config": app_config,
    "authWrapper": True,
    "customStyling": True
})

print("Generated ArtosApp component:")
print(component_code)

Custom Styling and Theming

# Create custom theme for ArtosApp
theme_config = customizer.create_theme({
    "primary": "#FF6B35",
    "primaryLight": "#FF8C61", 
    "primaryDark": "#E55A2B",
    "secondary": "#004E89",
    "secondaryLight": "#1A6BA8",
    "secondaryDark": "#003A6B",
    "bgPrimary": "#FFFFFF",
    "bgSecondary": "#F8F9FA",
    "textPrimary": "#212529",
    "textSecondary": "#6C757D",
    "borderPrimary": "#DEE2E6",
    "fontFamily": "'Inter', -apple-system, BlinkMacSystemFont, sans-serif",
    "spacing": {
        "xs": "0.25rem",
        "sm": "0.5rem", 
        "md": "1rem",
        "lg": "1.5rem",
        "xl": "2rem"
    }
})

# Generate CSS variables
css_variables = customizer.generate_css_variables(theme_config)

# Generate component-specific styles
component_styles = customizer.generate_component_styles({
    "artosApp": {
        "header": {
            "background": "linear-gradient(135deg, var(--artos-primary) 0%, var(--artos-secondary) 100%)",
            "color": "white",
            "padding": "var(--artos-spacing-lg)",
            "borderRadius": "var(--artos-border-radius-lg)"
        },
        "button": {
            "background": "linear-gradient(135deg, var(--artos-primary) 0%, var(--artos-primary-dark) 100%)",
            "border": "none",
            "color": "white",
            "padding": "var(--artos-spacing-sm) var(--artos-spacing-md)",
            "borderRadius": "var(--artos-border-radius-md)",
            "transition": "all var(--artos-transition-fast)"
        }
    }
})

print("Generated styling configuration:")
print(f"CSS Variables: {css_variables}")
print(f"Component Styles: {component_styles}")

Platform Integration

Workflow Integration Patterns

from artos_sdk import PlatformIntegrator

integrator = PlatformIntegrator("your_api_key")

# Direct API Integration
class ArtosIntegration:
    def __init__(self, api_key):
        self.client = integrator.create_client(api_key)
    
    def create_workflow(self, workflow_data):
        """Create a new workflow"""
        return self.client.create_workflow(workflow_data)
    
    def execute_workflow(self, workflow_id, input_data):
        """Execute a workflow with input data"""
        return self.client.execute_workflow(workflow_id, input_data)
    
    def get_execution_status(self, execution_id):
        """Get execution status and results"""
        return self.client.get_execution_status(execution_id)

# SDK Integration
class CustomWorkflowManager:
    def __init__(self, api_key):
        self.client = integrator.create_sdk_client(api_key)
    
    def create_data_processing_workflow(self, config):
        """Create a data processing workflow"""
        workflow = self.client.create_workflow({
            "name": "Data Processing Pipeline",
            "connectors": ["table_processor_connector"],
            "postProcessing": ["data_validator", "content_optimizer"],
            "configuration": config
        })
        return workflow
    
    def process_data_batch(self, workflow_id, data_batch):
        """Process a batch of data"""
        results = []
        for data in data_batch:
            execution = self.client.execute_workflow(workflow_id, data)
            results.append(execution.result)
        return results

# Event-Driven Integration
class EventDrivenIntegration:
    def __init__(self, api_key, webhook_url):
        self.client = integrator.create_event_client(api_key, webhook_url)
    
    def setup_webhook(self):
        """Register webhook with Artos"""
        webhook_config = {
            "url": self.webhook_url,
            "events": ["workflow.completed", "workflow.failed"],
            "secret": "your_webhook_secret"
        }
        return self.client.register_webhook(webhook_config)
    
    def handle_workflow_completion(self, execution_data):
        """Handle workflow completion"""
        workflow_id = execution_data["workflow_id"]
        result = execution_data["result"]
        # Process the results
        return self.process_results(workflow_id, result)

Database Integration

# Database integration example
class DatabaseIntegration:
    def __init__(self, db_config, artos_api_key):
        self.db_connection = integrator.create_db_connection(db_config)
        self.artos_client = integrator.create_artos_client(artos_api_key)
    
    def process_database_records(self, table_name, workflow_id):
        """Process records from database using Artos workflow"""
        cursor = self.db_connection.cursor()
        
        # Fetch records to process
        cursor.execute(f"SELECT * FROM {table_name} WHERE processed = false")
        records = cursor.fetchall()
        
        for record in records:
            # Execute workflow for each record
            execution = self.artos_client.execute_workflow(workflow_id, {
                "record": record
            })
            
            # Update database with results
            cursor.execute(
                f"UPDATE {table_name} SET processed = true, result = %s WHERE id = %s",
                (json.dumps(execution.result), record[0])
            )
        
        self.db_connection.commit()
        cursor.close()

Message Queue Integration

# Message queue integration example
class MessageQueueIntegration:
    def __init__(self, rabbitmq_url, artos_api_key):
        self.connection = integrator.create_mq_connection(rabbitmq_url)
        self.channel = self.connection.channel()
        self.artos_client = integrator.create_artos_client(artos_api_key)
        
        # Setup queues
        self.channel.queue_declare(queue='artos_processing')
        self.channel.queue_declare(queue='artos_results')
    
    def process_message(self, ch, method, properties, body):
        """Process message from queue using Artos workflow"""
        try:
            message_data = json.loads(body)
            workflow_id = message_data["workflow_id"]
            input_data = message_data["input_data"]
            
            # Execute workflow
            execution = self.artos_client.execute_workflow(workflow_id, input_data)
            
            # Send results to results queue
            result_message = {
                "original_message": message_data,
                "execution_id": execution.id,
                "result": execution.result,
                "status": execution.status
            }
            
            self.channel.basic_publish(
                exchange='',
                routing_key='artos_results',
                body=json.dumps(result_message)
            )
            
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            print(f"Error processing message: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag)

Debugging and Analytics

Component Testing

from artos_sdk import ComponentTester, TestCase

class ConnectorTestSuite:
    def __init__(self, api_key):
        self.tester = ComponentTester(api_key)
    
    def test_table_processor_connector(self):
        """Test table processor connector in isolation"""
        test_case = TestCase(
            name="Table Processor Connector Test",
            component_type="connector",
            component_id="table_processor_connector",
            input_data={
                "data": [
                    {"id": 1, "name": "Item 1", "value": 100},
                    {"id": 2, "name": "Item 2", "value": 200}
                ],
                "format": "json"
            },
            expected_output={
                "processed_rows": 2,
                "validation_passed": True,
                "output_format": "structured"
            }
        )
        
        result = self.tester.run_test(test_case)
        return result
    
    def test_content_optimizer(self):
        """Test content optimizer component"""
        test_case = TestCase(
            name="Content Optimizer Test",
            component_type="post_processor",
            component_id="content_optimizer",
            input_data={
                "content": "This is a very long and verbose piece of content that needs optimization.",
                "target_length": "concise",
                "style_guide": "professional"
            },
            expected_output={
                "length_reduction": ">20%",
                "readability_improvement": ">10%",
                "style_compliance": True
            }
        )
        
        result = self.tester.run_test(test_case)
        return result

# Usage
test_suite = ConnectorTestSuite("your_api_key")
table_result = test_suite.test_table_processor_connector()
content_result = test_suite.test_content_optimizer()

print(f"Table processor test: {table_result.status}")
print(f"Content optimizer test: {content_result.status}")

Interactive Debugging

from artos_sdk import Debugger

class InteractiveDebugger:
    def __init__(self, api_key):
        self.debugger = Debugger(api_key)
    
    def debug_workflow_step(self, workflow_id, step_id, input_data):
        """Debug a specific workflow step interactively"""
        debug_session = self.debugger.start_session(
            workflow_id=workflow_id,
            step_id=step_id,
            input_data=input_data
        )
        
        # Set breakpoints
        debug_session.set_breakpoint("data_validation")
        debug_session.set_breakpoint("content_processing")
        
        # Start debugging
        result = debug_session.run()
        
        # Inspect intermediate results
        for breakpoint in debug_session.breakpoints:
            print(f"Breakpoint {breakpoint.name}:")
            print(f"  Input: {breakpoint.input_data}")
            print(f"  Output: {breakpoint.output_data}")
            print(f"  Variables: {breakpoint.variables}")
        
        return result
    
    def step_through_execution(self, execution_id):
        """Step through execution step by step"""
        debug_session = self.debugger.attach_to_execution(execution_id)
        
        while debug_session.has_next_step():
            step_result = debug_session.next_step()
            print(f"Step: {step_result.step_name}")
            print(f"  Duration: {step_result.duration}")
            print(f"  Status: {step_result.status}")
            print(f"  Output: {step_result.output}")
            
            if step_result.has_errors():
                print(f"  Errors: {step_result.errors}")
        
        return debug_session.get_final_result()

# Usage
debugger = InteractiveDebugger("your_api_key")

# Debug specific step
result = debugger.debug_workflow_step(
    workflow_id="workflow_123",
    step_id="data_validator",
    input_data={"test_data": "sample"}
)

# Step through execution
final_result = debugger.step_through_execution("execution_456")

Analytics and Performance Monitoring

from artos_sdk import AnalyticsClient

class AnalyticsManager:
    def __init__(self, api_key):
        self.client = AnalyticsClient(api_key)
    
    def get_performance_metrics(self, component_id, start_date, end_date, granularity="hour"):
        """Get performance metrics for a component"""
        return self.client.get_performance_metrics(
            component_id=component_id,
            start_date=start_date,
            end_date=end_date,
            granularity=granularity
        )
    
    def get_success_rates(self, workflow_id, time_window="7d", group_by="step"):
        """Get success rates for a workflow"""
        return self.client.get_success_rates(
            workflow_id=workflow_id,
            time_window=time_window,
            group_by=group_by
        )
    
    def get_usage_statistics(self, start_date, end_date, group_by="day"):
        """Get usage statistics"""
        return self.client.get_usage_statistics(
            start_date=start_date,
            end_date=end_date,
            group_by=group_by
        )
    
    def define_custom_metric(self, metric_config):
        """Define a custom metric"""
        return self.client.define_custom_metric(metric_config)
    
    def get_custom_metric_data(self, metric_id, start_date, end_date):
        """Get data for a custom metric"""
        return self.client.get_custom_metric_data(
            metric_id=metric_id,
            start_date=start_date,
            end_date=end_date
        )

# Usage
analytics = AnalyticsManager("your_api_key")

# Get performance metrics
performance = analytics.get_performance_metrics(
    component_id="table_processor_connector",
    start_date="2024-01-01",
    end_date="2024-01-15"
)

# Get success rates
success_rates = analytics.get_success_rates(
    workflow_id="workflow_123",
    time_window="7d"
)

# Define custom metric
custom_metric = analytics.define_custom_metric({
    "name": "data_quality_score",
    "type": "score",
    "calculation": {
        "method": "weighted_average",
        "weights": {
            "completeness": 0.3,
            "accuracy": 0.4,
            "consistency": 0.3
        }
    }
})

Performance Optimization

from artos_sdk import OptimizationClient

class PerformanceOptimizer:
    def __init__(self, api_key):
        self.client = OptimizationClient(api_key)
    
    def analyze_performance(self, workflow_id, time_range="7d"):
        """Analyze workflow performance and identify bottlenecks"""
        analysis = self.client.analyze_performance(
            workflow_id=workflow_id,
            time_range=time_range
        )
        
        print("Performance Analysis:")
        print(f"  Overall Success Rate: {analysis.overall_success_rate}")
        print(f"  Average Execution Time: {analysis.avg_execution_time}")
        print(f"  Bottlenecks: {analysis.bottlenecks}")
        print(f"  Recommendations: {analysis.recommendations}")
        
        return analysis
    
    def optimize_workflow(self, workflow_id, optimization_target="performance"):
        """Optimize workflow based on analysis"""
        optimization = self.client.optimize_workflow(
            workflow_id=workflow_id,
            target=optimization_target
        )
        
        print("Optimization Results:")
        print(f"  Original Performance: {optimization.original_metrics}")
        print(f"  Optimized Performance: {optimization.optimized_metrics}")
        print(f"  Improvements: {optimization.improvements}")
        
        return optimization
    
    def a_b_test_workflows(self, workflow_a_id, workflow_b_id, test_duration="1d"):
        """A/B test two workflow versions"""
        test_result = self.client.a_b_test_workflows(
            workflow_a_id=workflow_a_id,
            workflow_b_id=workflow_b_id,
            duration=test_duration
        )
        
        print("A/B Test Results:")
        print(f"  Workflow A Performance: {test_result.workflow_a_metrics}")
        print(f"  Workflow B Performance: {test_result.workflow_b_metrics}")
        print(f"  Winner: {test_result.winner}")
        print(f"  Confidence: {test_result.confidence}")
        
        return test_result

# Usage
optimizer = PerformanceOptimizer("your_api_key")

# Analyze performance
analysis = optimizer.analyze_performance("workflow_123")

# Optimize workflow
optimization = optimizer.optimize_workflow("workflow_123", "performance")

# A/B test workflows
test_result = optimizer.a_b_test_workflows("workflow_123", "workflow_456")

Error Handling and Best Practices

Comprehensive Error Handling

from artos_sdk import ErrorHandler

class RobustArtosClient:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key)
        self.error_handler = ErrorHandler()
    
    def execute_workflow_with_retry(self, workflow_id, input_data, max_retries=3):
        """Execute workflow with automatic retry logic"""
        for attempt in range(max_retries):
            try:
                result = self.client.execute_workflow(workflow_id, input_data)
                return result
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                
                # Wait before retry with exponential backoff
                wait_time = 2 ** attempt
                print(f"Attempt {attempt + 1} failed, retrying in {wait_time} seconds...")
                time.sleep(wait_time)
    
    def handle_connector_errors(self, connector_id, operation):
        """Handle connector-specific errors"""
        try:
            return operation()
        except ConnectorNotFoundError:
            print(f"Connector {connector_id} not found")
            return None
        except ConnectorConfigurationError as e:
            print(f"Configuration error in connector {connector_id}: {e}")
            # Attempt to fix configuration
            return self.fix_connector_configuration(connector_id, e)
        except ConnectorExecutionError as e:
            print(f"Execution error in connector {connector_id}: {e}")
            # Log error and return fallback result
            return self.get_fallback_result(connector_id)
    
    def validate_workflow_configuration(self, workflow_config):
        """Validate workflow configuration before deployment"""
        validation_result = self.error_handler.validate_config(workflow_config)
        
        if not validation_result.is_valid:
            print("Configuration validation failed:")
            for error in validation_result.errors:
                print(f"  - {error}")
            return False
        
        return True

Best Practices Implementation

class ArtosBestPractices:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key)
    
    def create_production_workflow(self, workflow_config):
        """Create workflow following production best practices"""
        # Validate configuration
        if not self.validate_workflow_configuration(workflow_config):
            raise ValueError("Invalid workflow configuration")
        
        # Create workflow with proper naming
        workflow_name = f"{workflow_config['name']}-{datetime.now().strftime('%Y%m%d')}"
        
        # Add monitoring and logging
        workflow_config['monitoring'] = {
            'enable_logging': True,
            'enable_metrics': True,
            'alert_thresholds': {
                'success_rate': 0.95,
                'execution_time': 30
            }
        }
        
        # Create workflow
        workflow = self.client.create_workflow(workflow_config)
        
        # Set up monitoring
        self.setup_monitoring(workflow.id)
        
        return workflow
    
    def setup_monitoring(self, workflow_id):
        """Set up comprehensive monitoring for workflow"""
        # Set up performance alerts
        self.client.alerts.create({
            'workflow_id': workflow_id,
            'type': 'performance',
            'threshold': 0.95,
            'notification_channels': ['email', 'slack']
        })
        
        # Set up error alerts
        self.client.alerts.create({
            'workflow_id': workflow_id,
            'type': 'error',
            'threshold': 0.05,
            'notification_channels': ['email', 'slack']
        })
    
    def implement_graceful_degradation(self, workflow_id, fallback_config):
        """Implement graceful degradation for workflow"""
        # Create fallback workflow
        fallback_workflow = self.client.create_workflow(fallback_config)
        
        # Set up automatic fallback
        self.client.workflows.set_fallback(
            workflow_id=workflow_id,
            fallback_workflow_id=fallback_workflow.id,
            trigger_conditions=['execution_failure', 'timeout']
        )
        
        return fallback_workflow
This comprehensive SDK usage documentation covers all the major features and capabilities of the Artos platform, providing developers with practical examples and best practices for building robust, scalable applications.