Python SDK Usage
The Artos Python SDK provides a comprehensive interface for building custom workflows with connectors, post-processing, platform integration, and debugging capabilities. This guide covers all major features with practical examples.Client Initialization
Copy
from artos import ArtosClient
# Initialize client with API key
client = ArtosClient(api_key="your_api_key")
# Alternative initialization with custom configuration
client = ArtosClient(
api_key="your_api_key",
api_url="https://api.artosai.com",
timeout=30,
retries=3
)
Document Management
Upload Documents
Copy
# Upload single document
document = client.documents.upload(
file_path="path/to/document.docx",
file_type="docx", # Optional - auto-detected
extraction_format="json"
)
print(f"Document uploaded: {document.id}")
print(f"Processing status: {document.status}")
# Upload with custom settings
document = client.documents.upload(
file_path="clinical-data.csv",
extraction_format="csv",
csv_delimiter=",",
extract_tables=True
)
# Upload waits for processing completion automatically
print(f"Document ready: {document.is_ready}")
print(f"Extracted content available: {document.has_content}")
List Documents
Copy
# List all documents
documents = client.documents.list()
print(f"Found {len(documents)} documents")
# List with filters
documents = client.documents.list(
file_type="docx",
limit=10,
page=1
)
# Access document properties
for doc in documents:
print(f"Document: {doc.filename}")
print(f"Size: {doc.file_size_mb} MB")
print(f"Upload date: {doc.upload_date}")
print(f"Processing status: {doc.status}")
Get Document Details
Copy
# Get specific document
document = client.documents.get("document-id")
# Document properties
print(f"ID: {document.id}")
print(f"Filename: {document.filename}")
print(f"Type: {document.file_type}")
print(f"S3 URL: {document.s3_url}")
print(f"Content extracted: {document.has_content}")
# Download document content
content = document.download()
print(f"Downloaded {len(content)} bytes")
Search Documents
Copy
# Search across all documents
results = client.documents.search(
query="patient eligibility criteria",
top_k=10
)
# Search specific documents
results = client.documents.search(
query="adverse events",
source_documents=["doc-1", "doc-2"],
search_type="hybrid"
)
# Access search results
for result in results:
print(f"Document: {result.document_title}")
print(f"Relevance: {result.score}")
print(f"Snippet: {result.snippet}")
print(f"Source: {result.url}")
Delete Documents (Admin Only)
Copy
# Delete document
client.documents.delete("document-id")
# Delete multiple documents
client.documents.delete_batch(["doc-1", "doc-2", "doc-3"])
Agent Management
List Available Agents
Copy
# List all agents
agents = client.agents.list()
# List by type
builtin_agents = client.agents.list(agent_type="builtin")
custom_agents = client.agents.list(agent_type="custom")
# Access agent properties
for agent in agents:
print(f"Agent: {agent.name}")
print(f"Type: {agent.type}")
print(f"Category: {agent.category}")
print(f"Capabilities: {agent.capabilities}")
Create Custom Agent
Copy
# Create custom agent
agent = client.agents.create(
name="Safety Report Analyzer",
description="Analyzes clinical trial safety data",
category="analysis",
instructions="Extract safety information and create summary tables",
output_format="html"
)
print(f"Created agent: {agent.id}")
print(f"Agent name: {agent.name}")
Call Agent Directly
Copy
# Call agent with documents
result = client.agents.call(
agent_id="agent-search-001",
document_urls=["s3://bucket/doc1.docx", "s3://bucket/doc2.docx"],
instructions="Focus on primary endpoints"
)
print(f"Agent output: {result.output}")
print(f"Processing time: {result.processing_time_seconds}")
print(f"Documents processed: {result.documents_processed}")
Test Custom Agent
Copy
# Test agent before using in production
test_result = client.agents.test(
agent_id="custom-agent-123",
test_documents=["test-doc-1"],
instructions="Test analysis"
)
if test_result.success:
print("Agent test passed")
print(f"Test output: {test_result.output}")
else:
print(f"Agent test failed: {test_result.error}")
Update and Delete Agents
Copy
# Update custom agent
updated_agent = client.agents.update(
agent_id="custom-agent-123",
name="Enhanced Safety Analyzer",
instructions="Updated analysis instructions"
)
# Delete custom agent
client.agents.delete("custom-agent-123")
Connectors API
Create Connectors
Copy
from artos import ConnectorBuilder
# Create a connector using the builder pattern
connector = ConnectorBuilder.create("table_processor_connector") \
.set_description("Handles tabular data processing and insertion") \
.add_agent_types(["data_validator", "table_processor"]) \
.add_data_types(["csv", "excel", "json"]) \
.set_usage_rules({
"when": "data contains tabular information",
"priority": "high",
"constraints": {
"maxFileSize": "10MB",
"supportedFormats": ["csv", "xlsx", "json"]
}
}) \
.set_configuration({
"validationRules": {
"requiredColumns": ["id", "name", "value"],
"dataTypes": {
"id": "integer",
"name": "string",
"value": "float"
}
},
"processingOptions": {
"batchSize": 1000,
"enableTransformation": True
}
}) \
.build()
# Deploy connector
deployed_connector = client.connectors.deploy(connector)
print(f"Connector deployed with ID: {deployed_connector.id}")
Manage Connectors
Copy
# List all connectors
connectors = client.connectors.list()
for conn in connectors:
print(f"Connector: {conn.name}")
print(f"Status: {conn.status}")
print(f"Agent Types: {conn.agent_types}")
# Get specific connector
connector = client.connectors.get("conn_abc123")
# Update connector
updated_connector = client.connectors.update(
connector_id="conn_abc123",
description="Updated description",
configuration={
"validationRules": {
"requiredColumns": ["id", "name", "value", "category"]
}
}
)
# Delete connector
client.connectors.delete("conn_abc123")
# Get connector schema
schema = client.connectors.get_schema("conn_abc123")
Connector Integration Examples
Copy
# AWS Bedrock Integration
import boto3
from artos_sdk import ConnectorBuilder
bedrock_client = boto3.client('bedrock-runtime', region_name='us-east-1')
connector = ConnectorBuilder.create("bedrock_processor") \
.defineAgent("bedrock_analyzer") \
.defineAgent("bedrock_summarizer") \
.setDataTypes(["text", "json"]) \
.setUsageRules({
"when": "using AWS Bedrock for analysis",
"priority": "high",
"constraints": {
"maxInputLength": "10000",
"model": "anthropic.claude-3-sonnet-20240229-v1:0"
}
}) \
.setConfiguration({
"bedrockConfig": {
"client": bedrock_client,
"modelId": "anthropic.claude-3-sonnet-20240229-v1:0",
"maxTokens": 4000
}
}) \
.deploy()
# Azure Foundry Integration
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
ml_client = MLClient(credential, subscription_id, resource_group, workspace_name)
connector = ConnectorBuilder.create("azure_processor") \
.defineAgent("azure_analyzer") \
.defineAgent("azure_transformer") \
.setDataTypes(["text", "tabular"]) \
.setUsageRules({
"when": "using Azure Foundry for processing",
"priority": "high",
"constraints": {
"maxFileSize": "20MB",
"computeType": "Standard_NC6"
}
}) \
.setConfiguration({
"azureConfig": {
"mlClient": ml_client,
"computeTarget": "Standard_NC6",
"modelName": "custom-transformer"
}
}) \
.deploy()
Post-Processing API
Configure Post-Processing Pipeline
Copy
from artos import ProcessingStep
# Create processing step
step = ProcessingStep(
name="Custom Data Validator",
description="Validates data consistency and completeness",
agent_type="data_validator",
order=4,
configuration={
"validationRules": {
"requiredFields": ["id", "name", "value"],
"dataTypes": {
"id": "integer",
"name": "string",
"value": "float"
},
"constraints": {
"minValue": 0,
"maxLength": 255
}
}
},
conditions={
"contentType": ["tabular", "structured"],
"minRows": 1
}
)
# Add step to pipeline
pipeline = client.post_processing.get_pipeline()
pipeline.add_step(step)
updated_pipeline = client.post_processing.update_pipeline(pipeline)
# Test processing step
test_result = client.post_processing.test_step(
step_id="custom_data_validator_abc123",
input_data={
"data": [
{"id": 1, "name": "Test Item", "value": 100.5}
],
"format": "json"
},
expected_output={
"validationPassed": True,
"processedRows": 1
}
)
# Execute post-processing
execution_result = client.post_processing.execute({
"content": "Your content to process here...",
"contentType": "document",
"pipelineId": "pipeline_xyz789",
"options": {
"skipSteps": ["quality_checker"],
"customConfig": {
"targetLength": "detailed",
"styleGuide": "academic"
}
}
})
print(f"Processing completed: {execution_result.status}")
print(f"Quality score: {execution_result.qualityScore}")
Manage Processing Steps
Copy
# Get current pipeline
pipeline = client.post_processing.get_pipeline()
# Add processing step
step = client.post_processing.add_step({
"name": "Custom Data Validator",
"description": "Validates data consistency and completeness",
"agentType": "data_validator",
"order": 4,
"configuration": {
"validationRules": {
"requiredFields": ["id", "name", "value"],
"dataTypes": {
"id": "integer",
"name": "string",
"value": "float"
}
}
},
"conditions": {
"contentType": ["tabular", "structured"],
"minRows": 1
}
})
# Update processing step
updated_step = client.post_processing.update_step(
step_id="custom_data_validator_abc123",
configuration={
"validationRules": {
"requiredFields": ["id", "name", "value", "category"]
}
}
)
# Remove processing step
client.post_processing.remove_step("custom_data_validator_abc123")
# Execute post-processing manually
execution_result = client.post_processing.execute({
"content": "Your content to process here...",
"contentType": "document",
"pipelineId": "pipeline_xyz789",
"options": {
"skipSteps": ["quality_checker"],
"customConfig": {
"targetLength": "detailed",
"styleGuide": "academic"
}
}
})
Frontend Customization
ArtosApp Component Integration
Copy
# Generate configuration for the ArtosApp React component
from artos import FrontendCustomizer
customizer = FrontendCustomizer()
# Create ArtosApp configuration
app_config = customizer.create_app_config({
"apiKey": "your_api_key",
"theme": "light",
"userId": "user_123",
"userEmail": "[email protected]",
"userRole": "admin",
"onWorkflowCreate": "handleWorkflowCreate",
"onWorkflowExecute": "handleWorkflowExecute",
"onError": "handleError",
"customConfig": {
"branding": {
"companyName": "Your Company",
"primaryColor": "#FF6B35",
"secondaryColor": "#004E89"
},
"features": {
"workflowCreation": True,
"workflowExecution": True,
"debugging": True,
"analytics": True
}
}
})
# Generate React component code
component_code = customizer.generate_component_code({
"componentType": "ArtosApp",
"config": app_config,
"authWrapper": True,
"customStyling": True
})
print("Generated ArtosApp component:")
print(component_code)
Custom Styling and Theming
Copy
# Create custom theme for ArtosApp
theme_config = customizer.create_theme({
"primary": "#FF6B35",
"primaryLight": "#FF8C61",
"primaryDark": "#E55A2B",
"secondary": "#004E89",
"secondaryLight": "#1A6BA8",
"secondaryDark": "#003A6B",
"bgPrimary": "#FFFFFF",
"bgSecondary": "#F8F9FA",
"textPrimary": "#212529",
"textSecondary": "#6C757D",
"borderPrimary": "#DEE2E6",
"fontFamily": "'Inter', -apple-system, BlinkMacSystemFont, sans-serif",
"spacing": {
"xs": "0.25rem",
"sm": "0.5rem",
"md": "1rem",
"lg": "1.5rem",
"xl": "2rem"
}
})
# Generate CSS variables
css_variables = customizer.generate_css_variables(theme_config)
# Generate component-specific styles
component_styles = customizer.generate_component_styles({
"artosApp": {
"header": {
"background": "linear-gradient(135deg, var(--artos-primary) 0%, var(--artos-secondary) 100%)",
"color": "white",
"padding": "var(--artos-spacing-lg)",
"borderRadius": "var(--artos-border-radius-lg)"
},
"button": {
"background": "linear-gradient(135deg, var(--artos-primary) 0%, var(--artos-primary-dark) 100%)",
"border": "none",
"color": "white",
"padding": "var(--artos-spacing-sm) var(--artos-spacing-md)",
"borderRadius": "var(--artos-border-radius-md)",
"transition": "all var(--artos-transition-fast)"
}
}
})
print("Generated styling configuration:")
print(f"CSS Variables: {css_variables}")
print(f"Component Styles: {component_styles}")
Platform Integration
Workflow Integration Patterns
Copy
from artos_sdk import PlatformIntegrator
integrator = PlatformIntegrator("your_api_key")
# Direct API Integration
class ArtosIntegration:
def __init__(self, api_key):
self.client = integrator.create_client(api_key)
def create_workflow(self, workflow_data):
"""Create a new workflow"""
return self.client.create_workflow(workflow_data)
def execute_workflow(self, workflow_id, input_data):
"""Execute a workflow with input data"""
return self.client.execute_workflow(workflow_id, input_data)
def get_execution_status(self, execution_id):
"""Get execution status and results"""
return self.client.get_execution_status(execution_id)
# SDK Integration
class CustomWorkflowManager:
def __init__(self, api_key):
self.client = integrator.create_sdk_client(api_key)
def create_data_processing_workflow(self, config):
"""Create a data processing workflow"""
workflow = self.client.create_workflow({
"name": "Data Processing Pipeline",
"connectors": ["table_processor_connector"],
"postProcessing": ["data_validator", "content_optimizer"],
"configuration": config
})
return workflow
def process_data_batch(self, workflow_id, data_batch):
"""Process a batch of data"""
results = []
for data in data_batch:
execution = self.client.execute_workflow(workflow_id, data)
results.append(execution.result)
return results
# Event-Driven Integration
class EventDrivenIntegration:
def __init__(self, api_key, webhook_url):
self.client = integrator.create_event_client(api_key, webhook_url)
def setup_webhook(self):
"""Register webhook with Artos"""
webhook_config = {
"url": self.webhook_url,
"events": ["workflow.completed", "workflow.failed"],
"secret": "your_webhook_secret"
}
return self.client.register_webhook(webhook_config)
def handle_workflow_completion(self, execution_data):
"""Handle workflow completion"""
workflow_id = execution_data["workflow_id"]
result = execution_data["result"]
# Process the results
return self.process_results(workflow_id, result)
Database Integration
Copy
# Database integration example
class DatabaseIntegration:
def __init__(self, db_config, artos_api_key):
self.db_connection = integrator.create_db_connection(db_config)
self.artos_client = integrator.create_artos_client(artos_api_key)
def process_database_records(self, table_name, workflow_id):
"""Process records from database using Artos workflow"""
cursor = self.db_connection.cursor()
# Fetch records to process
cursor.execute(f"SELECT * FROM {table_name} WHERE processed = false")
records = cursor.fetchall()
for record in records:
# Execute workflow for each record
execution = self.artos_client.execute_workflow(workflow_id, {
"record": record
})
# Update database with results
cursor.execute(
f"UPDATE {table_name} SET processed = true, result = %s WHERE id = %s",
(json.dumps(execution.result), record[0])
)
self.db_connection.commit()
cursor.close()
Message Queue Integration
Copy
# Message queue integration example
class MessageQueueIntegration:
def __init__(self, rabbitmq_url, artos_api_key):
self.connection = integrator.create_mq_connection(rabbitmq_url)
self.channel = self.connection.channel()
self.artos_client = integrator.create_artos_client(artos_api_key)
# Setup queues
self.channel.queue_declare(queue='artos_processing')
self.channel.queue_declare(queue='artos_results')
def process_message(self, ch, method, properties, body):
"""Process message from queue using Artos workflow"""
try:
message_data = json.loads(body)
workflow_id = message_data["workflow_id"]
input_data = message_data["input_data"]
# Execute workflow
execution = self.artos_client.execute_workflow(workflow_id, input_data)
# Send results to results queue
result_message = {
"original_message": message_data,
"execution_id": execution.id,
"result": execution.result,
"status": execution.status
}
self.channel.basic_publish(
exchange='',
routing_key='artos_results',
body=json.dumps(result_message)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag)
Debugging and Analytics
Component Testing
Copy
from artos_sdk import ComponentTester, TestCase
class ConnectorTestSuite:
def __init__(self, api_key):
self.tester = ComponentTester(api_key)
def test_table_processor_connector(self):
"""Test table processor connector in isolation"""
test_case = TestCase(
name="Table Processor Connector Test",
component_type="connector",
component_id="table_processor_connector",
input_data={
"data": [
{"id": 1, "name": "Item 1", "value": 100},
{"id": 2, "name": "Item 2", "value": 200}
],
"format": "json"
},
expected_output={
"processed_rows": 2,
"validation_passed": True,
"output_format": "structured"
}
)
result = self.tester.run_test(test_case)
return result
def test_content_optimizer(self):
"""Test content optimizer component"""
test_case = TestCase(
name="Content Optimizer Test",
component_type="post_processor",
component_id="content_optimizer",
input_data={
"content": "This is a very long and verbose piece of content that needs optimization.",
"target_length": "concise",
"style_guide": "professional"
},
expected_output={
"length_reduction": ">20%",
"readability_improvement": ">10%",
"style_compliance": True
}
)
result = self.tester.run_test(test_case)
return result
# Usage
test_suite = ConnectorTestSuite("your_api_key")
table_result = test_suite.test_table_processor_connector()
content_result = test_suite.test_content_optimizer()
print(f"Table processor test: {table_result.status}")
print(f"Content optimizer test: {content_result.status}")
Interactive Debugging
Copy
from artos_sdk import Debugger
class InteractiveDebugger:
def __init__(self, api_key):
self.debugger = Debugger(api_key)
def debug_workflow_step(self, workflow_id, step_id, input_data):
"""Debug a specific workflow step interactively"""
debug_session = self.debugger.start_session(
workflow_id=workflow_id,
step_id=step_id,
input_data=input_data
)
# Set breakpoints
debug_session.set_breakpoint("data_validation")
debug_session.set_breakpoint("content_processing")
# Start debugging
result = debug_session.run()
# Inspect intermediate results
for breakpoint in debug_session.breakpoints:
print(f"Breakpoint {breakpoint.name}:")
print(f" Input: {breakpoint.input_data}")
print(f" Output: {breakpoint.output_data}")
print(f" Variables: {breakpoint.variables}")
return result
def step_through_execution(self, execution_id):
"""Step through execution step by step"""
debug_session = self.debugger.attach_to_execution(execution_id)
while debug_session.has_next_step():
step_result = debug_session.next_step()
print(f"Step: {step_result.step_name}")
print(f" Duration: {step_result.duration}")
print(f" Status: {step_result.status}")
print(f" Output: {step_result.output}")
if step_result.has_errors():
print(f" Errors: {step_result.errors}")
return debug_session.get_final_result()
# Usage
debugger = InteractiveDebugger("your_api_key")
# Debug specific step
result = debugger.debug_workflow_step(
workflow_id="workflow_123",
step_id="data_validator",
input_data={"test_data": "sample"}
)
# Step through execution
final_result = debugger.step_through_execution("execution_456")
Analytics and Performance Monitoring
Copy
from artos_sdk import AnalyticsClient
class AnalyticsManager:
def __init__(self, api_key):
self.client = AnalyticsClient(api_key)
def get_performance_metrics(self, component_id, start_date, end_date, granularity="hour"):
"""Get performance metrics for a component"""
return self.client.get_performance_metrics(
component_id=component_id,
start_date=start_date,
end_date=end_date,
granularity=granularity
)
def get_success_rates(self, workflow_id, time_window="7d", group_by="step"):
"""Get success rates for a workflow"""
return self.client.get_success_rates(
workflow_id=workflow_id,
time_window=time_window,
group_by=group_by
)
def get_usage_statistics(self, start_date, end_date, group_by="day"):
"""Get usage statistics"""
return self.client.get_usage_statistics(
start_date=start_date,
end_date=end_date,
group_by=group_by
)
def define_custom_metric(self, metric_config):
"""Define a custom metric"""
return self.client.define_custom_metric(metric_config)
def get_custom_metric_data(self, metric_id, start_date, end_date):
"""Get data for a custom metric"""
return self.client.get_custom_metric_data(
metric_id=metric_id,
start_date=start_date,
end_date=end_date
)
# Usage
analytics = AnalyticsManager("your_api_key")
# Get performance metrics
performance = analytics.get_performance_metrics(
component_id="table_processor_connector",
start_date="2024-01-01",
end_date="2024-01-15"
)
# Get success rates
success_rates = analytics.get_success_rates(
workflow_id="workflow_123",
time_window="7d"
)
# Define custom metric
custom_metric = analytics.define_custom_metric({
"name": "data_quality_score",
"type": "score",
"calculation": {
"method": "weighted_average",
"weights": {
"completeness": 0.3,
"accuracy": 0.4,
"consistency": 0.3
}
}
})
Performance Optimization
Copy
from artos_sdk import OptimizationClient
class PerformanceOptimizer:
def __init__(self, api_key):
self.client = OptimizationClient(api_key)
def analyze_performance(self, workflow_id, time_range="7d"):
"""Analyze workflow performance and identify bottlenecks"""
analysis = self.client.analyze_performance(
workflow_id=workflow_id,
time_range=time_range
)
print("Performance Analysis:")
print(f" Overall Success Rate: {analysis.overall_success_rate}")
print(f" Average Execution Time: {analysis.avg_execution_time}")
print(f" Bottlenecks: {analysis.bottlenecks}")
print(f" Recommendations: {analysis.recommendations}")
return analysis
def optimize_workflow(self, workflow_id, optimization_target="performance"):
"""Optimize workflow based on analysis"""
optimization = self.client.optimize_workflow(
workflow_id=workflow_id,
target=optimization_target
)
print("Optimization Results:")
print(f" Original Performance: {optimization.original_metrics}")
print(f" Optimized Performance: {optimization.optimized_metrics}")
print(f" Improvements: {optimization.improvements}")
return optimization
def a_b_test_workflows(self, workflow_a_id, workflow_b_id, test_duration="1d"):
"""A/B test two workflow versions"""
test_result = self.client.a_b_test_workflows(
workflow_a_id=workflow_a_id,
workflow_b_id=workflow_b_id,
duration=test_duration
)
print("A/B Test Results:")
print(f" Workflow A Performance: {test_result.workflow_a_metrics}")
print(f" Workflow B Performance: {test_result.workflow_b_metrics}")
print(f" Winner: {test_result.winner}")
print(f" Confidence: {test_result.confidence}")
return test_result
# Usage
optimizer = PerformanceOptimizer("your_api_key")
# Analyze performance
analysis = optimizer.analyze_performance("workflow_123")
# Optimize workflow
optimization = optimizer.optimize_workflow("workflow_123", "performance")
# A/B test workflows
test_result = optimizer.a_b_test_workflows("workflow_123", "workflow_456")
Error Handling and Best Practices
Comprehensive Error Handling
Copy
from artos_sdk import ErrorHandler
class RobustArtosClient:
def __init__(self, api_key):
self.client = ArtosClient(api_key)
self.error_handler = ErrorHandler()
def execute_workflow_with_retry(self, workflow_id, input_data, max_retries=3):
"""Execute workflow with automatic retry logic"""
for attempt in range(max_retries):
try:
result = self.client.execute_workflow(workflow_id, input_data)
return result
except Exception as e:
if attempt == max_retries - 1:
raise e
# Wait before retry with exponential backoff
wait_time = 2 ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time} seconds...")
time.sleep(wait_time)
def handle_connector_errors(self, connector_id, operation):
"""Handle connector-specific errors"""
try:
return operation()
except ConnectorNotFoundError:
print(f"Connector {connector_id} not found")
return None
except ConnectorConfigurationError as e:
print(f"Configuration error in connector {connector_id}: {e}")
# Attempt to fix configuration
return self.fix_connector_configuration(connector_id, e)
except ConnectorExecutionError as e:
print(f"Execution error in connector {connector_id}: {e}")
# Log error and return fallback result
return self.get_fallback_result(connector_id)
def validate_workflow_configuration(self, workflow_config):
"""Validate workflow configuration before deployment"""
validation_result = self.error_handler.validate_config(workflow_config)
if not validation_result.is_valid:
print("Configuration validation failed:")
for error in validation_result.errors:
print(f" - {error}")
return False
return True
Best Practices Implementation
Copy
class ArtosBestPractices:
def __init__(self, api_key):
self.client = ArtosClient(api_key)
def create_production_workflow(self, workflow_config):
"""Create workflow following production best practices"""
# Validate configuration
if not self.validate_workflow_configuration(workflow_config):
raise ValueError("Invalid workflow configuration")
# Create workflow with proper naming
workflow_name = f"{workflow_config['name']}-{datetime.now().strftime('%Y%m%d')}"
# Add monitoring and logging
workflow_config['monitoring'] = {
'enable_logging': True,
'enable_metrics': True,
'alert_thresholds': {
'success_rate': 0.95,
'execution_time': 30
}
}
# Create workflow
workflow = self.client.create_workflow(workflow_config)
# Set up monitoring
self.setup_monitoring(workflow.id)
return workflow
def setup_monitoring(self, workflow_id):
"""Set up comprehensive monitoring for workflow"""
# Set up performance alerts
self.client.alerts.create({
'workflow_id': workflow_id,
'type': 'performance',
'threshold': 0.95,
'notification_channels': ['email', 'slack']
})
# Set up error alerts
self.client.alerts.create({
'workflow_id': workflow_id,
'type': 'error',
'threshold': 0.05,
'notification_channels': ['email', 'slack']
})
def implement_graceful_degradation(self, workflow_id, fallback_config):
"""Implement graceful degradation for workflow"""
# Create fallback workflow
fallback_workflow = self.client.create_workflow(fallback_config)
# Set up automatic fallback
self.client.workflows.set_fallback(
workflow_id=workflow_id,
fallback_workflow_id=fallback_workflow.id,
trigger_conditions=['execution_failure', 'timeout']
)
return fallback_workflow