Skip to main content

Platform Integration

Overview

Artos’s platform integration approach provides the “best of both worlds” - you get the benefits of continuous platform improvements and updates while maintaining full control over your custom systems and workflows. This approach ensures that your custom implementations benefit from core platform enhancements without requiring manual updates.

What is the “Best of Both Worlds” Approach?

The “best of both worlds” approach means that custom systems built on Artos automatically benefit from core platform updates while maintaining their unique customizations. This is achieved through a modular architecture that separates core functionality from custom implementations.

Key Benefits

  • Automatic Updates: Core platform improvements are automatically available to custom systems
  • Customization Preservation: Your unique customizations remain intact during platform updates
  • Reduced Maintenance: Less manual work required to keep systems up-to-date
  • Enhanced Security: Security patches and improvements are automatically applied
  • Performance Improvements: Core optimizations benefit all custom implementations

Core Architecture

Modular Design

Artos’s modular architecture consists of several layers:
┌─────────────────────────────────────┐
│           Custom Layer              │
│  (Your unique implementations)      │
├─────────────────────────────────────┤
│         Integration Layer           │
│  (Artos SDK & API interfaces)       │
├─────────────────────────────────────┤
│           Core Platform             │
│  (Artos's core functionality)       │
├─────────────────────────────────────┤
│         Infrastructure Layer        │
│  (AI providers, databases, etc.)    │
└─────────────────────────────────────┘

Update Flow

Platform updates flow through the architecture:
  1. Core Platform Updates: Improvements to the core Artos platform
  2. Automatic Propagation: Updates automatically available to integration layer
  3. Custom System Benefits: Your custom systems benefit without code changes
  4. Backward Compatibility: Existing customizations continue to work

Workflow Integration

Artos workflows are the foundation for integrating AI-powered processing into your existing systems. A workflow combines custom agents (scripts with preset input/output fields) with connectors and post-processing steps to create powerful, automated data processing pipelines.

Understanding Workflows and Custom Agents

What are Custom Agents?

Custom agents in Artos are essentially scripts that have preset input and output field definitions. They act as the building blocks of workflows, each designed to perform specific tasks: Agent Structure:
  • Input Fields: Defined parameters that the agent expects to receive
  • Processing Logic: The core functionality that transforms input into output
  • Output Fields: Structured data that the agent produces
  • Validation Rules: Constraints on input/output data formats
Agent Types:
  • Data Processors: Transform and validate data structures
  • Content Analyzers: Extract insights from text or documents
  • Format Converters: Convert between different data formats
  • Quality Checkers: Validate data quality and consistency
  • Custom Logic: Domain-specific processing tailored to your needs

Creating Custom Agents

Custom agents can be created through the Artos platform interface or programmatically via the API:
# Create a custom agent with preset fields
agent_definition = {
    "name": "Sales Data Processor",
    "description": "Processes sales data and generates insights",
    "input_fields": {
        "sales_data": {
            "type": "array",
            "description": "Array of sales records",
            "schema": {
                "customer_id": "string",
                "amount": "number",
                "date": "date",
                "product": "string"
            },
            "required": True
        },
        "analysis_type": {
            "type": "string",
            "description": "Type of analysis to perform",
            "enum": ["summary", "trends", "forecasting"],
            "default": "summary"
        },
        "date_range": {
            "type": "object",
            "description": "Date range for analysis",
            "properties": {
                "start_date": "date",
                "end_date": "date"
            },
            "required": False
        }
    },
    "output_fields": {
        "insights": {
            "type": "object",
            "description": "Generated insights from sales data",
            "properties": {
                "total_revenue": "number",
                "top_customers": "array",
                "growth_rate": "number",
                "recommendations": "array"
            }
        },
        "processed_records": {
            "type": "number",
            "description": "Number of records processed"
        },
        "confidence_score": {
            "type": "number",
            "description": "Confidence in analysis results (0-1)"
        }
    },
    "processing_script": """
    def process_sales_data(sales_data, analysis_type, date_range=None):
        # Filter data by date range if provided
        if date_range:
            sales_data = filter_by_date_range(sales_data, date_range)
        
        # Perform analysis based on type
        if analysis_type == "summary":
            return generate_summary(sales_data)
        elif analysis_type == "trends":
            return analyze_trends(sales_data)
        elif analysis_type == "forecasting":
            return generate_forecast(sales_data)
    """,
    "validation_rules": {
        "min_records": 10,
        "max_processing_time": 30,
        "required_fields": ["customer_id", "amount", "date"]
    }
}

# Create the agent
custom_agent = artos.create_custom_agent(agent_definition)

Where Custom Agents Are Created

Custom agents can be created in several ways:
  1. Artos Dashboard: Visual interface for creating agents with drag-and-drop field definition
  2. API Integration: Programmatic creation via REST API calls
  3. SDK Methods: Using the Artos Python SDK for integrated development
  4. Workflow Builder: Created inline while building workflows
  5. Template Library: Start from pre-built templates and customize

Integration Patterns

Pattern 1: Direct API Integration

Integrate Artos directly into your existing workflows using the REST API:
import requests
import json

class ArtosWorkflowIntegration:
    def __init__(self, api_key, base_url="https://api.artosai.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def create_workflow_with_custom_agents(self, workflow_config):
        """Create a workflow using custom agents"""
        # First, create custom agents if they don't exist
        agent_ids = []
        for agent_def in workflow_config.get("custom_agents", []):
            agent = self.create_custom_agent(agent_def)
            agent_ids.append(agent["id"])
        
        # Create workflow with custom agents
        workflow_data = {
            "name": workflow_config["name"],
            "description": workflow_config["description"],
            "agents": agent_ids + workflow_config.get("builtin_agents", []),
            "connectors": workflow_config.get("connectors", []),
            "postProcessing": workflow_config.get("post_processing", []),
            "configuration": workflow_config.get("configuration", {})
        }
        
        response = requests.post(
            f"{self.base_url}/workflows",
            headers=self.headers,
            json=workflow_data
        )
        return response.json()
    
    def create_custom_agent(self, agent_definition):
        """Create a custom agent with preset fields"""
        response = requests.post(
            f"{self.base_url}/agents/custom",
            headers=self.headers,
            json=agent_definition
        )
        return response.json()
    
    def execute_workflow(self, workflow_id, input_data):
        """Execute a workflow with structured input data"""
        response = requests.post(
            f"{self.base_url}/workflows/{workflow_id}/execute",
            headers=self.headers,
            json={"input": input_data}
        )
        return response.json()
    
    def get_execution_status(self, execution_id):
        """Get execution status and results"""
        response = requests.get(
            f"{self.base_url}/executions/{execution_id}",
            headers=self.headers
        )
        return response.json()

# Example: Create a sales analysis workflow
artos = ArtosWorkflowIntegration("your_api_key")

# Define custom agents for the workflow
workflow_config = {
    "name": "Sales Data Analysis Pipeline",
    "description": "Processes sales data and generates insights",
    "custom_agents": [
        {
            "name": "sales_data_validator",
            "description": "Validates sales data structure",
            "input_fields": {
                "raw_data": {"type": "array", "required": True},
                "validation_rules": {"type": "object", "required": False}
            },
            "output_fields": {
                "validated_data": {"type": "array"},
                "validation_errors": {"type": "array"},
                "records_processed": {"type": "number"}
            }
        },
        {
            "name": "sales_insights_generator",
            "description": "Generates business insights from sales data",
            "input_fields": {
                "sales_data": {"type": "array", "required": True},
                "analysis_period": {"type": "string", "required": True},
                "metrics_to_include": {"type": "array", "required": False}
            },
            "output_fields": {
                "insights": {"type": "object"},
                "recommendations": {"type": "array"},
                "confidence_score": {"type": "number"}
            }
        }
    ],
    "connectors": ["sales_data_connector"],
    "post_processing": ["content_optimizer", "format_standardizer"]
}

# Create the workflow
workflow = artos.create_workflow_with_custom_agents(workflow_config)

# Execute the workflow with your sales data
result = artos.execute_workflow(workflow["id"], {
    "raw_data": [
        {"customer_id": "C001", "amount": 1500, "date": "2024-01-15", "product": "Software License"},
        {"customer_id": "C002", "amount": 800, "date": "2024-01-16", "product": "Consulting"}
    ],
    "analysis_period": "monthly",
    "validation_rules": {
        "min_amount": 0,
        "required_fields": ["customer_id", "amount", "date"]
    }
})

Pattern 2: SDK Integration with Custom Agents

Use the Artos SDK for more advanced integration with custom agent creation:
from artos import ArtosClient, WorkflowBuilder, CustomAgentBuilder

class CustomWorkflowManager:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key)
    
    def create_data_processing_workflow_with_custom_agents(self, config):
        """Create a data processing workflow with custom agents"""
        
        # Create custom agents first
        data_enricher = CustomAgentBuilder.create("data_enricher") \
            .setDescription("Enriches data with external sources") \
            .addInputField("raw_data", "array", required=True) \
            .addInputField("enrichment_sources", "array", required=False) \
            .addOutputField("enriched_data", "array") \
            .addOutputField("enrichment_count", "number") \
            .setProcessingScript("""
                def enrich_data(raw_data, enrichment_sources=None):
                    enriched_records = []
                    for record in raw_data:
                        # Add enrichment logic here
                        enriched_record = add_external_data(record, enrichment_sources)
                        enriched_records.append(enriched_record)
                    return {
                        "enriched_data": enriched_records,
                        "enrichment_count": len(enriched_records)
                    }
            """) \
            .deploy()
        
        quality_scorer = CustomAgentBuilder.create("quality_scorer") \
            .setDescription("Assigns quality scores to data records") \
            .addInputField("data_records", "array", required=True) \
            .addInputField("quality_criteria", "object", required=False) \
            .addOutputField("scored_data", "array") \
            .addOutputField("average_quality", "number") \
            .addOutputField("quality_distribution", "object") \
            .setProcessingScript("""
                def score_quality(data_records, quality_criteria=None):
                    scored_records = []
                    total_score = 0
                    quality_buckets = {"high": 0, "medium": 0, "low": 0}
                    
                    for record in data_records:
                        score = calculate_quality_score(record, quality_criteria)
                        record["quality_score"] = score
                        scored_records.append(record)
                        total_score += score
                        
                        # Categorize quality
                        if score >= 0.8:
                            quality_buckets["high"] += 1
                        elif score >= 0.5:
                            quality_buckets["medium"] += 1
                        else:
                            quality_buckets["low"] += 1
                    
                    return {
                        "scored_data": scored_records,
                        "average_quality": total_score / len(data_records),
                        "quality_distribution": quality_buckets
                    }
            """) \
            .deploy()
        
        # Create workflow using custom agents
        workflow = WorkflowBuilder.create("enhanced_data_processor") \
            .addCustomAgent(data_enricher.id) \
            .addCustomAgent(quality_scorer.id) \
            .addConnector("table_processor_connector") \
            .addPostProcessing("data_validator") \
            .addPostProcessing("content_optimizer") \
            .setConfiguration(config) \
            .setAgentFlow([
                {
                    "agent_id": data_enricher.id,
                    "order": 1,
                    "input_mapping": {
                        "raw_data": "workflow_input.data",
                        "enrichment_sources": "workflow_input.enrichment_config"
                    }
                },
                {
                    "agent_id": quality_scorer.id,
                    "order": 2,
                    "input_mapping": {
                        "data_records": "step_1.enriched_data",
                        "quality_criteria": "workflow_input.quality_config"
                    }
                }
            ]) \
            .build()
        
        return self.client.deploy_workflow(workflow)
    
    def process_data_batch(self, workflow_id, data_batch):
        """Process a batch of data through custom agents"""
        results = []
        for data in data_batch:
            execution = self.client.execute_workflow(workflow_id, {
                "data": data,
                "enrichment_config": {
                    "sources": ["external_api", "reference_database"],
                    "timeout": 10
                },
                "quality_config": {
                    "completeness_weight": 0.4,
                    "accuracy_weight": 0.4,
                    "consistency_weight": 0.2
                }
            })
            results.append(execution.result)
        return results

# Integration with existing system
workflow_manager = CustomWorkflowManager("your_api_key")

# Create workflow with custom agents
workflow = workflow_manager.create_data_processing_workflow_with_custom_agents({
    "batchSize": 1000,
    "validationRules": {
        "requiredFields": ["id", "name", "value"]
    },
    "qualityThreshold": 0.7
})

# Process data through custom agents
data_batch = [
    {"id": 1, "name": "Item 1", "value": 100, "category": "electronics"},
    {"id": 2, "name": "Item 2", "value": 200, "category": "clothing"}
]

results = workflow_manager.process_data_batch(workflow.id, data_batch)

# Results will include enriched data and quality scores from custom agents
for result in results:
    print(f"Quality Score: {result['average_quality']}")
    print(f"Enriched Records: {len(result['scored_data'])}")
    print(f"Quality Distribution: {result['quality_distribution']}")

Pattern 3: Event-Driven Integration

Integrate using webhooks and event-driven patterns:
from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

class EventDrivenIntegration:
    def __init__(self, artos_api_key, webhook_url):
        self.artos_api_key = artos_api_key
        self.webhook_url = webhook_url
        self.setup_webhook()
    
    def setup_webhook(self):
        """Register webhook with Artos"""
        webhook_config = {
            "url": self.webhook_url,
            "events": ["workflow.completed", "workflow.failed"],
            "secret": "your_webhook_secret"
        }
        
        requests.post(
            "https://api.artosai.com/webhooks",
            headers={"Authorization": f"Bearer {self.artos_api_key}"},
            json=webhook_config
        )
    
    def handle_workflow_completion(self, execution_data):
        """Handle workflow completion"""
        # Process the completed workflow
        workflow_id = execution_data["workflow_id"]
        result = execution_data["result"]
        
        # Update your system with the results
        self.update_system_with_results(workflow_id, result)
    
    def update_system_with_results(self, workflow_id, result):
        """Update your system with workflow results"""
        # Implementation specific to your system
        pass

# Webhook endpoint
@app.route('/webhook/artos', methods=['POST'])
def artos_webhook():
    event_type = request.headers.get('X-Artos-Event')
    payload = request.json
    
    if event_type == 'workflow.completed':
        integration.handle_workflow_completion(payload)
    elif event_type == 'workflow.failed':
        # Handle workflow failure
        pass
    
    return jsonify({"status": "received"})

# Initialize integration
integration = EventDrivenIntegration("your_api_key", "https://your-domain.com/webhook/artos")

Integration with Existing Systems

Database Integration

import psycopg2
from artos import ArtosClient

class DatabaseIntegration:
    def __init__(self, db_config, artos_api_key):
        self.db_connection = psycopg2.connect(**db_config)
        self.artos_client = ArtosClient(artos_api_key)
    
    def process_database_records(self, table_name, workflow_id):
        """Process records from database using Artos workflow"""
        cursor = self.db_connection.cursor()
        
        # Fetch records to process
        cursor.execute(f"SELECT * FROM {table_name} WHERE processed = false")
        records = cursor.fetchall()
        
        for record in records:
            # Execute workflow for each record
            execution = self.artos_client.execute_workflow(workflow_id, {
                "record": record
            })
            
            # Update database with results
            cursor.execute(
                f"UPDATE {table_name} SET processed = true, result = %s WHERE id = %s",
                (json.dumps(execution.result), record[0])
            )
        
        self.db_connection.commit()
        cursor.close()
    
    def create_workflow_from_query(self, query, workflow_config):
        """Create workflow based on database query results"""
        cursor = self.db_connection.cursor()
        cursor.execute(query)
        sample_data = cursor.fetchone()
        
        # Create workflow based on data structure
        workflow = self.artos_client.create_workflow({
            "name": "Database Processing Workflow",
            "connectors": workflow_config["connectors"],
            "postProcessing": workflow_config["postProcessing"],
            "configuration": {
                "dataStructure": self.analyze_data_structure(sample_data),
                **workflow_config["configuration"]
            }
        })
        
        return workflow

# Usage
db_integration = DatabaseIntegration(
    db_config={
        "host": "localhost",
        "database": "myapp",
        "user": "username",
        "password": "password"
    },
    artos_api_key="your_api_key"
)

# Process records
db_integration.process_database_records("users", "workflow_123")

Message Queue Integration

import pika
import json
from artos import ArtosClient

class MessageQueueIntegration:
    def __init__(self, rabbitmq_url, artos_api_key):
        self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
        self.channel = self.connection.channel()
        self.artos_client = ArtosClient(artos_api_key)
        
        # Setup queues
        self.channel.queue_declare(queue='artos_processing')
        self.channel.queue_declare(queue='artos_results')
    
    def process_message(self, ch, method, properties, body):
        """Process message from queue using Artos workflow"""
        try:
            message_data = json.loads(body)
            workflow_id = message_data["workflow_id"]
            input_data = message_data["input_data"]
            
            # Execute workflow
            execution = self.artos_client.execute_workflow(workflow_id, input_data)
            
            # Send results to results queue
            result_message = {
                "original_message": message_data,
                "execution_id": execution.id,
                "result": execution.result,
                "status": execution.status
            }
            
            self.channel.basic_publish(
                exchange='',
                routing_key='artos_results',
                body=json.dumps(result_message)
            )
            
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            print(f"Error processing message: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag)
    
    def start_processing(self):
        """Start consuming messages"""
        self.channel.basic_consume(
            queue='artos_processing',
            on_message_callback=self.process_message
        )
        self.channel.start_consuming()
    
    def send_for_processing(self, workflow_id, input_data):
        """Send data for processing"""
        message = {
            "workflow_id": workflow_id,
            "input_data": input_data,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        self.channel.basic_publish(
            exchange='',
            routing_key='artos_processing',
            body=json.dumps(message)
        )

# Usage
mq_integration = MessageQueueIntegration(
    rabbitmq_url="amqp://localhost:5672",
    artos_api_key="your_api_key"
)

# Send data for processing
mq_integration.send_for_processing("workflow_123", {"data": "process_this"})

# Start processing (in separate thread/process)
mq_integration.start_processing()

Authentication and Authorization

API Key Management

Secure API Key Storage

import os
from cryptography.fernet import Fernet
import base64

class SecureAPIKeyManager:
    def __init__(self, encryption_key=None):
        if encryption_key:
            self.cipher = Fernet(encryption_key)
        else:
            # Generate new key if not provided
            key = Fernet.generate_key()
            self.cipher = Fernet(key)
            print(f"Generated encryption key: {key.decode()}")
    
    def encrypt_api_key(self, api_key):
        """Encrypt API key for secure storage"""
        return self.cipher.encrypt(api_key.encode()).decode()
    
    def decrypt_api_key(self, encrypted_key):
        """Decrypt API key for use"""
        return self.cipher.decrypt(encrypted_key.encode()).decode()
    
    def store_api_key(self, key_name, api_key):
        """Store encrypted API key in environment"""
        encrypted_key = self.encrypt_api_key(api_key)
        os.environ[f"ARTOS_API_KEY_{key_name.upper()}"] = encrypted_key
    
    def get_api_key(self, key_name):
        """Retrieve and decrypt API key"""
        encrypted_key = os.environ.get(f"ARTOS_API_KEY_{key_name.upper()}")
        if encrypted_key:
            return self.decrypt_api_key(encrypted_key)
        return None

# Usage
key_manager = SecureAPIKeyManager("your_encryption_key")

# Store API key securely
key_manager.store_api_key("production", "your_actual_api_key")

# Retrieve API key
api_key = key_manager.get_api_key("production")

Multi-Environment Support

import os
from typing import Dict, Optional

class EnvironmentManager:
    def __init__(self):
        self.environments = {
            "development": {
                "api_key": os.getenv("ARTOS_API_KEY_DEV"),
                "base_url": "https://api-dev.artosai.com",
                "timeout": 30
            },
            "staging": {
                "api_key": os.getenv("ARTOS_API_KEY_STAGING"),
                "base_url": "https://api-staging.artosai.com",
                "timeout": 60
            },
            "production": {
                "api_key": os.getenv("ARTOS_API_KEY_PROD"),
                "base_url": "https://api.artosai.com",
                "timeout": 120
            }
        }
    
    def get_config(self, environment: str) -> Optional[Dict]:
        """Get configuration for specific environment"""
        return self.environments.get(environment)
    
    def get_current_environment(self) -> str:
        """Get current environment from environment variable"""
        return os.getenv("ARTOS_ENVIRONMENT", "development")
    
    def get_current_config(self) -> Dict:
        """Get configuration for current environment"""
        env = self.get_current_environment()
        return self.get_config(env)

# Usage
env_manager = EnvironmentManager()
config = env_manager.get_current_config()

artos_client = ArtosClient(
    api_key=config["api_key"],
    base_url=config["base_url"],
    timeout=config["timeout"]
)

Role-Based Access Control

from enum import Enum
from typing import List, Dict

class ArtosRole(Enum):
    READER = "reader"
    WRITER = "writer"
    ADMIN = "admin"
    EXECUTOR = "executor"

class RBACManager:
    def __init__(self):
        self.role_permissions = {
            ArtosRole.READER: [
                "workflows:read",
                "executions:read",
                "connectors:read"
            ],
            ArtosRole.WRITER: [
                "workflows:read",
                "workflows:write",
                "connectors:read",
                "connectors:write",
                "executions:read"
            ],
            ArtosRole.EXECUTOR: [
                "workflows:read",
                "executions:read",
                "executions:write"
            ],
            ArtosRole.ADMIN: [
                "workflows:*",
                "executions:*",
                "connectors:*",
                "users:*",
                "system:*"
            ]
        }
    
    def has_permission(self, user_role: ArtosRole, permission: str) -> bool:
        """Check if user has specific permission"""
        permissions = self.role_permissions.get(user_role, [])
        
        # Check exact permission
        if permission in permissions:
            return True
        
        # Check wildcard permissions
        for perm in permissions:
            if perm.endswith("*"):
                base_perm = perm[:-1]
                if permission.startswith(base_perm):
                    return True
        
        return False
    
    def get_user_permissions(self, user_role: ArtosRole) -> List[str]:
        """Get all permissions for a user role"""
        return self.role_permissions.get(user_role, [])

class SecureArtosClient:
    def __init__(self, api_key: str, user_role: ArtosRole):
        self.api_key = api_key
        self.user_role = user_role
        self.rbac = RBACManager()
    
    def create_workflow(self, workflow_data: Dict):
        """Create workflow with permission check"""
        if not self.rbac.has_permission(self.user_role, "workflows:write"):
            raise PermissionError("Insufficient permissions to create workflows")
        
        # Proceed with workflow creation
        return self._create_workflow_internal(workflow_data)
    
    def execute_workflow(self, workflow_id: str, input_data: Dict):
        """Execute workflow with permission check"""
        if not self.rbac.has_permission(self.user_role, "executions:write"):
            raise PermissionError("Insufficient permissions to execute workflows")
        
        # Proceed with workflow execution
        return self._execute_workflow_internal(workflow_id, input_data)

# Usage
rbac_client = SecureArtosClient("your_api_key", ArtosRole.WRITER)

try:
    workflow = rbac_client.create_workflow({"name": "My Workflow"})
    print("Workflow created successfully")
except PermissionError as e:
    print(f"Permission denied: {e}")

Deployment Patterns

Containerized Deployment

Docker Integration

# Dockerfile for Artos integration
FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Set environment variables
ENV ARTOS_ENVIRONMENT=production
ENV PYTHONPATH=/app

# Expose port
EXPOSE 8000

# Run application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'

services:
  artos-integration:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ARTOS_API_KEY_PROD=${ARTOS_API_KEY_PROD}
      - ARTOS_ENVIRONMENT=production
      - DATABASE_URL=${DATABASE_URL}
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
  
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

volumes:
  redis_data:

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: artos-integration
spec:
  replicas: 3
  selector:
    matchLabels:
      app: artos-integration
  template:
    metadata:
      labels:
        app: artos-integration
    spec:
      containers:
      - name: artos-integration
        image: your-registry/artos-integration:latest
        ports:
        - containerPort: 8000
        env:
        - name: ARTOS_API_KEY_PROD
          valueFrom:
            secretKeyRef:
              name: artos-secrets
              key: api-key
        - name: ARTOS_ENVIRONMENT
          value: "production"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: artos-integration-service
spec:
  selector:
    app: artos-integration
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

---
apiVersion: v1
kind: Secret
metadata:
  name: artos-secrets
type: Opaque
data:
  api-key: <base64-encoded-api-key>

Serverless Deployment

AWS Lambda Integration

# lambda_function.py
import json
import os
from artos import ArtosClient

def lambda_handler(event, context):
    """AWS Lambda handler for Artos integration"""
    
    # Initialize Artos client
    artos_client = ArtosClient(
        api_key=os.environ['ARTOS_API_KEY'],
        base_url=os.environ.get('ARTOS_BASE_URL', 'https://api.artosai.com')
    )
    
    try:
        # Parse event
        body = json.loads(event['body']) if event.get('body') else event
        
        # Extract parameters
        workflow_id = body.get('workflow_id')
        input_data = body.get('input_data', {})
        
        if not workflow_id:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'workflow_id is required'})
            }
        
        # Execute workflow
        execution = artos_client.execute_workflow(workflow_id, input_data)
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'execution_id': execution.id,
                'status': execution.status,
                'result': execution.result
            })
        }
        
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }
# serverless.yml
service: artos-integration

provider:
  name: aws
  runtime: python3.9
  region: us-east-1
  environment:
    ARTOS_API_KEY: ${env:ARTOS_API_KEY}
    ARTOS_BASE_URL: https://api.artosai.com

functions:
  executeWorkflow:
    handler: lambda_function.lambda_handler
    events:
      - http:
          path: /execute
          method: post
          cors: true
    memorySize: 512
    timeout: 300

  createWorkflow:
    handler: lambda_function.create_workflow_handler
    events:
      - http:
          path: /workflows
          method: post
          cors: true
    memorySize: 256
    timeout: 60

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: non-linux

Azure Functions Integration

# function_app.py
import azure.functions as func
import logging
import json
import os
from artos import ArtosClient

app = func.FunctionApp()

@app.function_name(name="ExecuteWorkflow")
@app.route(route="execute")
def execute_workflow(req: func.HttpRequest) -> func.HttpResponse:
    """Azure Function to execute Artos workflows"""
    
    logging.info('Python HTTP trigger function processed a request.')
    
    try:
        # Get request body
        req_body = req.get_json()
        workflow_id = req_body.get('workflow_id')
        input_data = req_body.get('input_data', {})
        
        if not workflow_id:
            return func.HttpResponse(
                json.dumps({'error': 'workflow_id is required'}),
                status_code=400,
                mimetype='application/json'
            )
        
        # Initialize Artos client
        artos_client = ArtosClient(
            api_key=os.environ['ARTOS_API_KEY'],
            base_url=os.environ.get('ARTOS_BASE_URL', 'https://api.artosai.com')
        )
        
        # Execute workflow
        execution = artos_client.execute_workflow(workflow_id, input_data)
        
        return func.HttpResponse(
            json.dumps({
                'execution_id': execution.id,
                'status': execution.status,
                'result': execution.result
            }),
            status_code=200,
            mimetype='application/json'
        )
        
    except Exception as e:
        logging.error(f'Error executing workflow: {str(e)}')
        return func.HttpResponse(
            json.dumps({'error': str(e)}),
            status_code=500,
            mimetype='application/json'
        )
// host.json
{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.*, 4.0.0)"
  }
}

Monitoring and Observability

Health Checks

from flask import Flask, jsonify
import requests
from artos import ArtosClient

app = Flask(__name__)

class HealthChecker:
    def __init__(self, artos_api_key):
        self.artos_client = ArtosClient(artos_api_key)
    
    def check_artos_connectivity(self):
        """Check connectivity to Artos API"""
        try:
            # Simple API call to test connectivity
            response = requests.get(
                "https://api.artosai.com/health",
                headers={"Authorization": f"Bearer {self.artos_api_key}"},
                timeout=10
            )
            return response.status_code == 200
        except Exception:
            return False
    
    def check_workflow_status(self, workflow_id):
        """Check if specific workflow is accessible"""
        try:
            workflow = self.artos_client.get_workflow(workflow_id)
            return workflow is not None
        except Exception:
            return False
    
    def get_system_health(self):
        """Get overall system health status"""
        artos_healthy = self.check_artos_connectivity()
        
        return {
            "status": "healthy" if artos_healthy else "unhealthy",
            "artos_connectivity": artos_healthy,
            "timestamp": datetime.utcnow().isoformat()
        }

# Health check endpoints
@app.route('/health')
def health_check():
    checker = HealthChecker(os.environ['ARTOS_API_KEY'])
    health_status = checker.get_system_health()
    
    status_code = 200 if health_status['status'] == 'healthy' else 503
    return jsonify(health_status), status_code

@app.route('/ready')
def readiness_check():
    """Readiness check for Kubernetes"""
    checker = HealthChecker(os.environ['ARTOS_API_KEY'])
    health_status = checker.get_system_health()
    
    if health_status['status'] == 'healthy':
        return jsonify({"status": "ready"}), 200
    else:
        return jsonify({"status": "not ready"}), 503

Logging and Metrics

import logging
import time
from functools import wraps
from prometheus_client import Counter, Histogram, generate_latest

# Prometheus metrics
workflow_executions = Counter('artos_workflow_executions_total', 'Total workflow executions', ['workflow_id', 'status'])
execution_duration = Histogram('artos_execution_duration_seconds', 'Workflow execution duration', ['workflow_id'])

class MetricsCollector:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def log_execution(self, workflow_id, status, duration=None):
        """Log execution metrics"""
        workflow_executions.labels(workflow_id=workflow_id, status=status).inc()
        
        if duration:
            execution_duration.labels(workflow_id=workflow_id).observe(duration)
        
        self.logger.info(f"Workflow {workflow_id} executed with status {status} in {duration}s")
    
    def get_metrics(self):
        """Get Prometheus metrics"""
        return generate_latest()

def monitor_execution(workflow_id):
    """Decorator to monitor workflow execution"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            metrics = MetricsCollector()
            
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                metrics.log_execution(workflow_id, "success", duration)
                return result
            except Exception as e:
                duration = time.time() - start_time
                metrics.log_execution(workflow_id, "error", duration)
                raise
        return wrapper
    return decorator

# Usage
@monitor_execution("workflow_123")
def execute_workflow(input_data):
    # Workflow execution logic
    pass

# Metrics endpoint
@app.route('/metrics')
def metrics():
    collector = MetricsCollector()
    return collector.get_metrics(), 200, {'Content-Type': 'text/plain'}

Best Practices

1. Security Best Practices

  • API Key Management: Store API keys securely using environment variables or secret management systems
  • Network Security: Use HTTPS for all API communications
  • Access Control: Implement role-based access control for different user types
  • Input Validation: Validate all input data before processing
  • Error Handling: Never expose sensitive information in error messages

2. Performance Best Practices

  • Connection Pooling: Reuse HTTP connections when possible
  • Caching: Cache frequently accessed data and workflow configurations
  • Batch Processing: Process data in batches for better efficiency
  • Async Processing: Use asynchronous processing for long-running operations
  • Resource Management: Monitor and optimize resource usage

3. Reliability Best Practices

  • Retry Logic: Implement exponential backoff for failed requests
  • Circuit Breakers: Use circuit breakers to prevent cascading failures
  • Health Checks: Implement comprehensive health checks
  • Monitoring: Set up monitoring and alerting for critical metrics
  • Backup Strategies: Implement backup and recovery procedures

4. Scalability Best Practices

  • Horizontal Scaling: Design for horizontal scaling from the start
  • Load Balancing: Use load balancers for distributing traffic
  • Database Optimization: Optimize database queries and connections
  • Caching Strategy: Implement appropriate caching strategies
  • Resource Limits: Set appropriate resource limits and quotas

5. Maintenance Best Practices

  • Version Management: Use semantic versioning for your integrations
  • Documentation: Maintain comprehensive documentation
  • Testing: Implement comprehensive testing strategies
  • Deployment: Use automated deployment pipelines
  • Monitoring: Continuously monitor system health and performance

Migration and Upgrade Guidance

Version Compatibility

import pkg_resources
from artos import __version__ as artos_version

class VersionCompatibility:
    def __init__(self):
        self.required_version = "1.0.0"
        self.current_version = artos_version
    
    def check_compatibility(self):
        """Check if current version is compatible"""
        current = pkg_resources.parse_version(self.current_version)
        required = pkg_resources.parse_version(self.required_version)
        
        return current >= required
    
    def get_upgrade_notes(self):
        """Get upgrade notes for current version"""
        # Implementation to fetch upgrade notes
        pass
    
    def validate_configuration(self, config):
        """Validate configuration for current version"""
        # Implementation to validate configuration
        pass

# Usage
compatibility = VersionCompatibility()
if not compatibility.check_compatibility():
    print("Please upgrade Artos SDK to latest version")

Upgrade Procedures

class UpgradeManager:
    def __init__(self, backup_dir="/backups"):
        self.backup_dir = backup_dir
    
    def create_backup(self, config):
        """Create backup before upgrade"""
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        backup_file = f"{self.backup_dir}/config_backup_{timestamp}.json"
        
        with open(backup_file, 'w') as f:
            json.dump(config, f, indent=2)
        
        return backup_file
    
    def upgrade_configuration(self, old_config):
        """Upgrade configuration to new version"""
        # Implementation to upgrade configuration
        new_config = old_config.copy()
        
        # Apply upgrade transformations
        if "old_field" in new_config:
            new_config["new_field"] = new_config.pop("old_field")
        
        return new_config
    
    def rollback(self, backup_file):
        """Rollback to previous configuration"""
        with open(backup_file, 'r') as f:
            config = json.load(f)
        
        # Restore configuration
        return config

# Usage
upgrade_manager = UpgradeManager()

# Create backup
backup_file = upgrade_manager.create_backup(current_config)

try:
    # Perform upgrade
    new_config = upgrade_manager.upgrade_configuration(current_config)
    # Apply new configuration
except Exception as e:
    # Rollback on failure
    current_config = upgrade_manager.rollback(backup_file)
    print(f"Upgrade failed, rolled back: {e}")

Custom Agent Development Best Practices

Designing Effective Custom Agents

  1. Clear Input/Output Definitions: Define precise field schemas with validation rules and comprehensive documentation
  2. Single Responsibility: Each agent should have one clear, focused purpose to ensure maintainability
  3. Comprehensive Error Handling: Implement robust error handling with meaningful error messages and recovery strategies
  4. Performance Optimization: Consider processing time, memory usage, and resource efficiency
  5. Reusability: Design agents to be reusable across different workflows and use cases
  6. Security: Implement proper input validation and secure coding practices
  7. Documentation: Provide thorough documentation including examples and troubleshooting guides

Agent Testing and Validation

class AgentTester:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key)
    
    def test_agent_with_multiple_scenarios(self, agent_id, test_scenarios):
        """Test agent with multiple input scenarios"""
        results = []
        
        for scenario in test_scenarios:
            try:
                result = self.client.test_custom_agent(agent_id, scenario['input'])
                test_result = {
                    "scenario": scenario['name'],
                    "status": "passed" if self.validate_output(result, scenario['expected']) else "failed",
                    "output": result,
                    "expected": scenario['expected'],
                    "performance": result.get('performance_metrics', {})
                }
                results.append(test_result)
            except Exception as e:
                results.append({
                    "scenario": scenario['name'],
                    "status": "error",
                    "error": str(e)
                })
        
        return results
    
    def validate_output(self, actual, expected):
        """Validate agent output against expected results"""
        # Implement validation logic based on your requirements
        return self.compare_outputs(actual, expected)
    
    def benchmark_agent_performance(self, agent_id, benchmark_data):
        """Benchmark agent performance with various data sizes"""
        benchmarks = []
        
        for data_size in benchmark_data:
            start_time = time.time()
            result = self.client.execute_custom_agent(agent_id, data_size['input'])
            end_time = time.time()
            
            benchmarks.append({
                "data_size": data_size['size'],
                "execution_time": end_time - start_time,
                "memory_usage": result.get('memory_usage', 0),
                "throughput": data_size['size'] / (end_time - start_time)
            })
        
        return benchmarks
    
    def validate_agent_schema(self, agent_id):
        """Validate agent input/output schema compliance"""
        agent_config = self.client.get_custom_agent(agent_id)
        
        validation_results = {
            "schema_valid": True,
            "issues": [],
            "recommendations": []
        }
        
        # Check input field definitions
        for field_name, field_config in agent_config['input_fields'].items():
            if not field_config.get('description'):
                validation_results['issues'].append(f"Missing description for input field '{field_name}'")
            
            if not field_config.get('type'):
                validation_results['issues'].append(f"Missing type for input field '{field_name}'")
        
        # Check output field definitions
        for field_name, field_config in agent_config['output_fields'].items():
            if not field_config.get('description'):
                validation_results['issues'].append(f"Missing description for output field '{field_name}'")
        
        validation_results['schema_valid'] = len(validation_results['issues']) == 0
        
        return validation_results

# Example usage
tester = AgentTester("your_api_key")

# Test scenarios for a data validation agent
validation_scenarios = [
    {
        "name": "valid_data",
        "input": {
            "data": [{"id": 1, "name": "Test", "value": 100}],
            "validation_rules": {"required_fields": ["id", "name", "value"]}
        },
        "expected": {
            "validation_passed": True,
            "errors": []
        }
    },
    {
        "name": "missing_fields",
        "input": {
            "data": [{"id": 1, "name": "Test"}],
            "validation_rules": {"required_fields": ["id", "name", "value"]}
        },
        "expected": {
            "validation_passed": False,
            "errors": ["Missing required field: value"]
        }
    },
    {
        "name": "empty_dataset",
        "input": {
            "data": [],
            "validation_rules": {"required_fields": ["id", "name", "value"]}
        },
        "expected": {
            "validation_passed": False,
            "errors": ["No data provided for validation"]
        }
    }
]

# Run comprehensive tests
test_results = tester.test_agent_with_multiple_scenarios(
    "validation_agent_123", 
    validation_scenarios
)

# Display results
for result in test_results:
    print(f"Scenario: {result['scenario']} - Status: {result['status']}")
    if result['status'] == 'failed':
        print(f"  Expected: {result['expected']}")
        print(f"  Actual: {result['output']}")

# Validate agent schema
schema_validation = tester.validate_agent_schema("validation_agent_123")
if not schema_validation['schema_valid']:
    print("Schema validation issues:")
    for issue in schema_validation['issues']:
        print(f"  - {issue}")

# Benchmark performance
benchmark_data = [
    {"size": 100, "input": {"data": generate_test_data(100)}},
    {"size": 1000, "input": {"data": generate_test_data(1000)}},
    {"size": 10000, "input": {"data": generate_test_data(10000)}}
]

performance_results = tester.benchmark_agent_performance("validation_agent_123", benchmark_data)
for result in performance_results:
    print(f"Data size: {result['data_size']} - Execution time: {result['execution_time']:.2f}s")
    print(f"  Throughput: {result['throughput']:.2f} records/sec")

Agent Lifecycle Management

class AgentLifecycleManager:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key)
    
    def create_agent_version(self, agent_id, version_config):
        """Create a new version of an existing agent"""
        base_agent = self.client.get_custom_agent(agent_id)
        
        new_version = {
            **base_agent,
            **version_config,
            "version": base_agent.get('version', '1.0.0') + '.1',
            "parent_agent_id": agent_id,
            "created_at": datetime.now().isoformat()
        }
        
        return self.client.create_custom_agent(new_version)
    
    def deploy_agent_to_production(self, agent_id, deployment_config):
        """Deploy agent to production environment"""
        # Run pre-deployment tests
        test_results = self.run_pre_deployment_tests(agent_id)
        
        if not test_results['all_passed']:
            raise Exception(f"Pre-deployment tests failed: {test_results['failures']}")
        
        # Deploy agent
        deployment = self.client.deploy_agent(agent_id, {
            "environment": "production",
            "monitoring": True,
            "alerts": True,
            **deployment_config
        })
        
        # Set up monitoring
        self.setup_agent_monitoring(agent_id)
        
        return deployment
    
    def rollback_agent_deployment(self, agent_id, target_version):
        """Rollback agent to previous version"""
        rollback_result = self.client.rollback_agent(agent_id, target_version)
        
        # Verify rollback success
        current_agent = self.client.get_custom_agent(agent_id)
        if current_agent['version'] == target_version:
            print(f"Successfully rolled back agent {agent_id} to version {target_version}")
        else:
            raise Exception(f"Rollback failed for agent {agent_id}")
        
        return rollback_result
    
    def setup_agent_monitoring(self, agent_id):
        """Set up comprehensive monitoring for agent"""
        monitoring_config = {
            "metrics": [
                "execution_time",
                "success_rate",
                "error_rate",
                "throughput",
                "memory_usage"
            ],
            "alerts": [
                {
                    "metric": "success_rate",
                    "threshold": 0.95,
                    "operator": "less_than",
                    "notification_channels": ["email", "slack"]
                },
                {
                    "metric": "execution_time",
                    "threshold": 30,
                    "operator": "greater_than",
                    "notification_channels": ["email"]
                }
            ],
            "dashboards": [
                "agent_performance",
                "error_analysis",
                "usage_patterns"
            ]
        }
        
        return self.client.setup_agent_monitoring(agent_id, monitoring_config)

# Usage example
lifecycle_manager = AgentLifecycleManager("your_api_key")

# Create new version with improvements
new_version = lifecycle_manager.create_agent_version("sales_processor_v1", {
    "name": "Sales Processor v1.1",
    "description": "Enhanced sales processor with improved error handling",
    "input_fields": {
        # Enhanced input fields with better validation
    },
    "processing_script": """
        # Improved processing logic with better error handling
    """
})

# Deploy to production
deployment = lifecycle_manager.deploy_agent_to_production(new_version.id, {
    "auto_scaling": True,
    "max_instances": 10,
    "health_check_interval": 60
})

# If issues arise, rollback
if deployment['status'] == 'failed':
    lifecycle_manager.rollback_agent_deployment(new_version.id, "1.0.0")

Summary

This comprehensive platform integration documentation provides the foundation for integrating Artos into existing systems while maintaining the benefits of continuous platform improvements. The focus on custom agents with preset input/output fields enables users to create powerful, reusable workflows that can be easily integrated into any system architecture.

Key Takeaways

  1. Custom Agents: Scripts with preset input/output fields provide predictable, type-safe integration points
  2. Workflow Flexibility: Combine agents, connectors, and post-processing for complex data pipelines
  3. Multiple Creation Methods: Dashboard, API, SDK, and template-based approaches serve different user needs
  4. Comprehensive Testing: Built-in testing and validation ensure reliability and performance
  5. Lifecycle Management: Full version control, deployment, and monitoring capabilities
  6. Integration Patterns: Support for API, SDK, event-driven, and database integration approaches
  7. Best Practices: Security, performance, and maintainability guidelines built into the platform
By following these patterns and best practices, organizations can successfully integrate Artos into their existing systems while maintaining the flexibility to adapt and scale as their needs evolve.