Skip to main content

Connectors

Overview

Connectors are the primary mechanism for extending Artos functionality and enabling the Composer to understand available agents and their appropriate usage contexts. Unlike traditional prompt-based systems, Connectors provide a structured approach to agent composition that delivers higher accuracy and transparent agent selection.

What are Artos Connectors?

Artos Connectors define the relationship between data types, processing requirements, and the appropriate agents to handle specific tasks. They serve as the bridge between your custom workflows and Artos’s intelligent composition system, enabling the Composer to make informed decisions about which agents to use and when.

Key Benefits

  • Higher Accuracy: Structured agent selection vs. prompt-based guessing
  • Transparent Composition: Clear visibility into which agents are used and why
  • Scalable Architecture: Easy extension with custom agents and workflows
  • Context-Aware: Intelligent selection based on data types and usage patterns
  • Performance Optimization: Automatic selection of optimal agents for specific tasks
  • Extensibility: Easy addition of new agents and capabilities

Core Concepts

Agent Discovery

Connectors inform the Composer about available sub-agents and their capabilities. Each connector defines:
  • Agent Types: What kind of processing the agent can perform
  • Input/Output Schemas: Expected data formats and structures
  • Performance Characteristics: Speed, accuracy, and resource requirements
  • Dependencies: Required services or external integrations
  • Capabilities: Specific functions and features the agent provides

Usage Context

Connectors define when and where specific agents should be applied based on:
  • Data Types: Matching input data to appropriate processing agents
  • Task Requirements: Selecting agents based on desired outcomes
  • Performance Constraints: Choosing agents that meet speed/accuracy requirements
  • Resource Availability: Considering available compute and API quotas
  • Business Rules: Domain-specific requirements and constraints

Data-Agent Mapping

Connectors establish the relationship between data characteristics and agent selection:
{
  "dataType": "tabular_data",
  "format": "csv",
  "size": "large",
  "recommendedAgents": ["data_validator", "table_processor", "summary_generator"],
  "fallbackAgents": ["basic_processor"],
  "constraints": {
    "maxProcessingTime": "30s",
    "requiredAccuracy": 0.95,
    "maxFileSize": "100MB"
  },
  "priority": "high",
  "costOptimization": true
}

API Endpoints

Create Connector

Authentication Required: All connector endpoints require a valid API key in the Authorization header.
curl -X POST https://api.artosai.com/connectors \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "table_processor_connector",
    "description": "Handles tabular data processing and insertion",
    "agentTypes": ["data_validator", "table_processor"],
    "dataTypes": ["csv", "excel", "json"],
    "usageRules": {
      "when": "data contains tabular information",
      "priority": "high",
      "constraints": {
        "maxFileSize": "10MB",
        "supportedFormats": ["csv", "xlsx", "json"]
      }
    },
    "configuration": {
      "validationRules": {
        "requiredColumns": ["id", "name", "value"],
        "dataTypes": {
          "id": "integer",
          "name": "string",
          "value": "float"
        }
      },
      "processingOptions": {
        "batchSize": 1000,
        "enableTransformation": true
      }
    }
  }'
{
  "id": "conn_abc123def456",
  "name": "table_processor_connector",
  "description": "Handles tabular data processing and insertion",
  "status": "active",
  "createdAt": "2024-01-15T10:30:00Z",
  "updatedAt": "2024-01-15T10:30:00Z",
  "agentTypes": ["data_validator", "table_processor"],
  "dataTypes": ["csv", "excel", "json"],
  "usageRules": {
    "when": "data contains tabular information",
    "priority": "high",
    "constraints": {
      "maxFileSize": "10MB",
      "supportedFormats": ["csv", "xlsx", "json"]
    }
  },
  "configuration": {
    "validationRules": {
      "requiredColumns": ["id", "name", "value"],
      "dataTypes": {
        "id": "integer",
        "name": "string",
        "value": "float"
      }
    },
    "processingOptions": {
      "batchSize": 1000,
      "enableTransformation": true
    }
  }
}

List Connectors

curl -X GET https://api.artosai.com/connectors \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -G \
  -d "limit=10" \
  -d "offset=0" \
  -d "status=active"
{
  "connectors": [
    {
      "id": "conn_abc123def456",
      "name": "table_processor_connector",
      "description": "Handles tabular data processing and insertion",
      "status": "active",
      "createdAt": "2024-01-15T10:30:00Z",
      "agentTypes": ["data_validator", "table_processor"],
      "dataTypes": ["csv", "excel", "json"]
    }
  ],
  "pagination": {
    "total": 1,
    "limit": 10,
    "offset": 0,
    "hasMore": false
  }
}

Update Connector

curl -X PUT https://api.artosai.com/connectors/conn_abc123def456 \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "description": "Updated description for table processing",
    "configuration": {
      "validationRules": {
        "requiredColumns": ["id", "name", "value", "category"],
        "dataTypes": {
          "id": "integer",
          "name": "string",
          "value": "float",
          "category": "string"
        }
      }
    }
  }'
{
  "id": "conn_abc123def456",
  "name": "table_processor_connector",
  "description": "Updated description for table processing",
  "status": "active",
  "updatedAt": "2024-01-15T11:30:00Z",
  "configuration": {
    "validationRules": {
      "requiredColumns": ["id", "name", "value", "category"],
      "dataTypes": {
        "id": "integer",
        "name": "string",
        "value": "float",
        "category": "string"
      }
    }
  }
}

Delete Connector

curl -X DELETE https://api.artosai.com/connectors/conn_abc123def456 \
  -H "Authorization: Bearer YOUR_API_KEY"
{
  "success": true,
  "message": "Connector deleted successfully"
}

Test Connector

curl -X POST https://api.artosai.com/connectors/conn_abc123def456/test \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "testData": {
      "data": [
        {"id": 1, "name": "Test Item", "value": 100.5},
        {"id": 2, "name": "Test Item 2", "value": 200.3}
      ],
      "format": "json"
    },
    "expectedOutput": {
      "processedRows": 2,
      "validationPassed": true
    }
  }'
{
  "testId": "test_xyz789",
  "status": "passed",
  "results": {
    "processedRows": 2,
    "validationPassed": true,
    "processingTime": 0.5,
    "accuracy": 1.0
  },
  "details": {
    "agentSelection": ["data_validator", "table_processor"],
    "executionPath": "optimal",
    "performanceMetrics": {
      "latency": "0.5s",
      "throughput": "4 rows/s"
    }
  }
}

SDK Methods

Python SDK

from artos import ArtosClient, ConnectorBuilder

# Initialize client
client = ArtosClient(api_key="your_api_key")

# Create connector using builder pattern
connector = ConnectorBuilder.create("table_processor_connector") \
    .set_description("Handles tabular data processing and insertion") \
    .add_agent_types(["data_validator", "table_processor"]) \
    .add_data_types(["csv", "excel", "json"]) \
    .set_usage_rules({
        "when": "data contains tabular information",
        "priority": "high",
        "constraints": {
            "maxFileSize": "10MB",
            "supportedFormats": ["csv", "xlsx", "json"]
        }
    }) \
    .set_configuration({
        "validationRules": {
            "requiredColumns": ["id", "name", "value"],
            "dataTypes": {
                "id": "integer",
                "name": "string",
                "value": "float"
            }
        },
        "processingOptions": {
            "batchSize": 1000,
            "enableTransformation": True
        }
    }) \
    .build()

# Deploy connector
deployed_connector = client.connectors.deploy(connector)
print(f"Connector deployed: {deployed_connector.id}")

# List connectors
connectors = client.connectors.list(
    status="active",
    limit=10,
    offset=0
)

# Update connector
updated_connector = client.connectors.update(
    connector_id="conn_abc123def456",
    description="Updated description",
    configuration={
        "validationRules": {
            "requiredColumns": ["id", "name", "value", "category"]
        }
    }
)

# Test connector
test_result = client.connectors.test(
    connector_id="conn_abc123def456",
    test_data={
        "data": [
            {"id": 1, "name": "Test Item", "value": 100.5}
        ],
        "format": "json"
    },
    expected_output={
        "processedRows": 1,
        "validationPassed": True
    }
)

# Delete connector
client.connectors.delete("conn_abc123def456")

Configuration Examples

Basic Table Processor Connector

{
  "name": "basic_table_processor",
  "description": "Basic table processing for CSV and Excel files",
  "agentTypes": ["table_processor"],
  "dataTypes": ["csv", "excel"],
  "usageRules": {
    "when": "file extension is .csv or .xlsx",
    "priority": "medium"
  },
  "configuration": {
    "processingOptions": {
      "batchSize": 500,
      "enableTransformation": false
    }
  }
}

Advanced Data Validation Connector

{
  "name": "advanced_data_validator",
  "description": "Comprehensive data validation with custom rules",
  "agentTypes": ["data_validator", "quality_checker"],
  "dataTypes": ["csv", "excel", "json", "xml"],
  "usageRules": {
    "when": "data requires validation",
    "priority": "high",
    "constraints": {
      "maxFileSize": "50MB"
    }
  },
  "configuration": {
    "validationRules": {
      "requiredColumns": ["id", "name", "value", "category", "timestamp"],
      "dataTypes": {
        "id": "integer",
        "name": "string",
        "value": "float",
        "category": "string",
        "timestamp": "datetime"
      },
      "constraints": {
        "id": {
          "minValue": 1,
          "unique": true
        },
        "name": {
          "maxLength": 255,
          "pattern": "^[a-zA-Z0-9\\s-_]+$"
        },
        "value": {
          "minValue": 0,
          "maxValue": 1000000
        },
        "category": {
          "allowedValues": ["A", "B", "C", "D"]
        }
      }
    },
    "qualityChecks": {
      "duplicateDetection": true,
      "outlierDetection": true,
      "completenessCheck": true
    }
  }
}

Content Processing Connector

{
  "name": "content_processor",
  "description": "Processes text content with optimization and style matching",
  "agentTypes": ["content_optimizer", "style_matcher", "quality_checker"],
  "dataTypes": ["text", "markdown", "html"],
  "usageRules": {
    "when": "content requires optimization",
    "priority": "medium",
    "constraints": {
      "maxContentLength": "10000 characters"
    }
  },
  "configuration": {
    "optimizationSettings": {
      "targetLength": "concise",
      "styleGuide": "professional",
      "removeRedundancy": true,
      "improveReadability": true
    },
    "styleMatching": {
      "tone": "professional",
      "formatting": "standard",
      "brandGuidelines": "default"
    },
    "qualityControl": {
      "grammarCheck": true,
      "spellCheck": true,
      "consistencyCheck": true
    }
  }
}

Integration Patterns

AWS Bedrock Integration

from artos import ArtosClient, ConnectorBuilder
import boto3

class AWSBedrockConnector:
    def __init__(self, api_key, aws_region="us-east-1"):
        self.client = ArtosClient(api_key=api_key)
        self.bedrock = boto3.client('bedrock-runtime', region_name=aws_region)
    
    def create_bedrock_connector(self, model_id, capabilities):
        """Create connector for AWS Bedrock model"""
        connector = ConnectorBuilder.create(f"bedrock_{model_id}") \
            .set_description(f"Connector for AWS Bedrock {model_id}") \
            .add_agent_types(capabilities) \
            .add_data_types(["text", "json"]) \
            .set_usage_rules({
                "when": f"task requires {model_id} capabilities",
                "priority": "high",
                "constraints": {
                    "maxInputLength": "4000 tokens",
                    "requiresAWS": True
                }
            }) \
            .set_configuration({
                "bedrockConfig": {
                    "modelId": model_id,
                    "region": "us-east-1",
                    "maxTokens": 4000,
                    "temperature": 0.7
                },
                "capabilities": capabilities
            }) \
            .build()
        
        return self.client.connectors.deploy(connector)
    
    def execute_with_bedrock(self, connector_id, input_data):
        """Execute workflow using Bedrock connector"""
        workflow = self.client.workflows.create({
            "name": "Bedrock Workflow",
            "connectors": [connector_id],
            "input": input_data
        })
        
        return self.client.workflows.execute(workflow.id, input_data)

# Usage
bedrock_connector = AWSBedrockConnector("your_api_key")
connector = bedrock_connector.create_bedrock_connector(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    capabilities=["text_generation", "content_analysis"]
)

result = bedrock_connector.execute_with_bedrock(
    connector.id,
    {"prompt": "Analyze this document and provide a summary"}
)

Azure Foundry Integration

from artos import ArtosClient, ConnectorBuilder
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

class AzureFoundryConnector:
    def __init__(self, api_key, subscription_id, resource_group, workspace_name):
        self.client = ArtosClient(api_key=api_key)
        self.credential = DefaultAzureCredential()
        self.ml_client = MLClient(
            credential=self.credential,
            subscription_id=subscription_id,
            resource_group_name=resource_group,
            workspace_name=workspace_name
        )
    
    def create_foundry_connector(self, endpoint_name, model_name):
        """Create connector for Azure Foundry endpoint"""
        connector = ConnectorBuilder.create(f"foundry_{endpoint_name}") \
            .set_description(f"Connector for Azure Foundry {endpoint_name}") \
            .add_agent_types(["ml_inference", "data_processing"]) \
            .add_data_types(["json", "csv", "image"]) \
            .set_usage_rules({
                "when": f"task requires {model_name} inference",
                "priority": "high",
                "constraints": {
                    "requiresAzure": True,
                    "maxPayloadSize": "10MB"
                }
            }) \
            .set_configuration({
                "foundryConfig": {
                    "endpointName": endpoint_name,
                    "modelName": model_name,
                    "subscriptionId": self.subscription_id,
                    "resourceGroup": self.resource_group,
                    "workspaceName": self.workspace_name
                },
                "inferenceSettings": {
                    "timeout": 30,
                    "retryAttempts": 3
                }
            }) \
            .build()
        
        return self.client.connectors.deploy(connector)
    
    def deploy_model_endpoint(self, model_path, endpoint_name):
        """Deploy model to Azure Foundry endpoint"""
        # Deploy model to Azure ML
        model = self.ml_client.models.create_or_update(
            name=endpoint_name,
            path=model_path
        )
        
        # Create endpoint
        endpoint = self.ml_client.online_endpoints.begin_create_or_update(
            name=endpoint_name,
            model=model
        ).result()
        
        return endpoint

# Usage
foundry_connector = AzureFoundryConnector(
    api_key="your_api_key",
    subscription_id="your_subscription_id",
    resource_group="your_resource_group",
    workspace_name="your_workspace"
)

# Deploy model
endpoint = foundry_connector.deploy_model_endpoint(
    model_path="path/to/model",
    endpoint_name="custom-model-endpoint"
)

# Create connector
connector = foundry_connector.create_foundry_connector(
    endpoint_name="custom-model-endpoint",
    model_name="custom-model"
)

Error Handling

Common Error Codes

Error CodeDescriptionResolution
CONNECTOR_NOT_FOUNDConnector does not existVerify connector ID and permissions
INVALID_CONFIGURATIONConnector configuration is invalidCheck configuration schema and required fields
AGENT_TYPE_NOT_SUPPORTEDSpecified agent type is not availableVerify agent type exists and is accessible
DATA_TYPE_MISMATCHInput data type doesn’t match connector requirementsEnsure data format matches expected types
CONSTRAINT_VIOLATIONInput violates connector constraintsCheck file size, format, and other constraints
RATE_LIMIT_EXCEEDEDToo many requests to connectorImplement exponential backoff and retry logic

Error Handling Examples

from artos import ArtosClient, ConnectorError
import time

class RobustConnectorManager:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key=api_key)
    
    def create_connector_with_retry(self, connector_config, max_retries=3):
        """Create connector with retry logic"""
        for attempt in range(max_retries):
            try:
                connector = self.client.connectors.create(connector_config)
                return connector
            except ConnectorError as e:
                if e.code == "RATE_LIMIT_EXCEEDED":
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"Rate limited, waiting {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                elif e.code == "INVALID_CONFIGURATION":
                    print(f"Configuration error: {e.message}")
                    raise
                else:
                    print(f"Unexpected error: {e.message}")
                    raise
    
    def test_connector_safely(self, connector_id, test_data):
        """Test connector with comprehensive error handling"""
        try:
            result = self.client.connectors.test(connector_id, test_data)
            return result
        except ConnectorError as e:
            if e.code == "CONNECTOR_NOT_FOUND":
                print(f"Connector {connector_id} not found")
                return None
            elif e.code == "DATA_TYPE_MISMATCH":
                print(f"Data type mismatch: {e.message}")
                # Try to convert data format
                converted_data = self.convert_data_format(test_data)
                return self.client.connectors.test(connector_id, converted_data)
            else:
                print(f"Test failed: {e.message}")
                raise
    
    def convert_data_format(self, data):
        """Convert data to compatible format"""
        # Implementation for data format conversion
        pass

# Usage
manager = RobustConnectorManager("your_api_key")

try:
    connector = manager.create_connector_with_retry({
        "name": "test_connector",
        "description": "Test connector",
        "agentTypes": ["data_validator"],
        "dataTypes": ["csv"]
    })
    
    test_result = manager.test_connector_safely(
        connector.id,
        {"data": "test_data", "format": "csv"}
    )
    
except Exception as e:
    print(f"Failed to create/test connector: {e}")

Rate Limiting

Rate Limits

EndpointRate LimitWindow
Create Connector10 requests1 minute
List Connectors100 requests1 minute
Update Connector20 requests1 minute
Delete Connector5 requests1 minute
Test Connector50 requests1 minute

Rate Limit Handling

import time
from artos import ArtosClient, RateLimitError

class RateLimitedConnectorClient:
    def __init__(self, api_key):
        self.client = ArtosClient(api_key=api_key)
        self.request_timestamps = []
    
    def _check_rate_limit(self, endpoint):
        """Check if request would exceed rate limit"""
        current_time = time.time()
        window_start = current_time - 60  # 1 minute window
        
        # Remove old timestamps
        self.request_timestamps = [
            ts for ts in self.request_timestamps 
            if ts > window_start
        ]
        
        # Check limits based on endpoint
        limits = {
            "create": 10,
            "list": 100,
            "update": 20,
            "delete": 5,
            "test": 50
        }
        
        if len(self.request_timestamps) >= limits.get(endpoint, 10):
            wait_time = 60 - (current_time - self.request_timestamps[0])
            if wait_time > 0:
                time.sleep(wait_time)
        
        self.request_timestamps.append(current_time)
    
    def create_connector(self, config):
        """Create connector with rate limit handling"""
        self._check_rate_limit("create")
        
        try:
            return self.client.connectors.create(config)
        except RateLimitError:
            print("Rate limit exceeded, waiting 60 seconds...")
            time.sleep(60)
            return self.client.connectors.create(config)
    
    def list_connectors(self, **kwargs):
        """List connectors with rate limit handling"""
        self._check_rate_limit("list")
        
        try:
            return self.client.connectors.list(**kwargs)
        except RateLimitError:
            print("Rate limit exceeded, waiting 60 seconds...")
            time.sleep(60)
            return self.client.connectors.list(**kwargs)

# Usage
client = RateLimitedConnectorClient("your_api_key")

# Multiple requests with automatic rate limit handling
for i in range(15):
    try:
        connector = client.create_connector({
            "name": f"connector_{i}",
            "description": f"Test connector {i}",
            "agentTypes": ["data_validator"],
            "dataTypes": ["csv"]
        })
        print(f"Created connector {i}")
    except Exception as e:
        print(f"Failed to create connector {i}: {e}")

Best Practices

Connector Design

  1. Clear Naming: Use descriptive names that indicate the connector’s purpose
  2. Comprehensive Documentation: Provide detailed descriptions and usage examples
  3. Proper Categorization: Use appropriate agent types and data types
  4. Constraint Definition: Define clear constraints for when the connector should be used
  5. Error Handling: Include proper error handling and fallback mechanisms

Performance Optimization

  1. Efficient Agent Selection: Choose agents that minimize processing time
  2. Batch Processing: Use batch processing for large datasets
  3. Caching: Implement caching for frequently accessed data
  4. Resource Management: Monitor and optimize resource usage

Security Considerations

  1. Input Validation: Validate all input data before processing
  2. Access Control: Implement proper access controls for sensitive data
  3. Data Encryption: Encrypt sensitive data in transit and at rest
  4. Audit Logging: Log all connector operations for security auditing

Testing and Validation

  1. Unit Testing: Test individual connector components
  2. Integration Testing: Test connector integration with workflows
  3. Performance Testing: Test connector performance under load
  4. Security Testing: Test connector security measures

Migration Guide

From Legacy Connectors

If you’re migrating from legacy connector implementations:
  1. Update Configuration Format: Convert to new JSON configuration format
  2. Update API Calls: Use new SDK methods and endpoints
  3. Test Thoroughly: Test all connector functionality after migration
  4. Update Documentation: Update any documentation referencing old connectors

Migration Example

# Legacy connector creation
legacy_connector = {
    "connector_name": "old_table_processor",
    "agents": ["validator", "processor"],
    "file_types": ["csv", "xlsx"]
}

# New connector format
new_connector = {
    "name": "table_processor_connector",
    "description": "Handles tabular data processing and insertion",
    "agentTypes": ["data_validator", "table_processor"],
    "dataTypes": ["csv", "excel", "json"],
    "usageRules": {
        "when": "data contains tabular information",
        "priority": "high",
        "constraints": {
            "maxFileSize": "10MB",
            "supportedFormats": ["csv", "xlsx", "json"]
        }
    },
    "configuration": {
        "validationRules": {
            "requiredColumns": ["id", "name", "value"],
            "dataTypes": {
                "id": "integer",
                "name": "string",
                "value": "float"
            }
        },
        "processingOptions": {
            "batchSize": 1000,
            "enableTransformation": True
        }
    }
}

# Migration script
def migrate_connector(legacy_config):
    """Migrate legacy connector to new format"""
    return {
        "name": legacy_config["connector_name"],
        "description": f"Migrated from legacy connector: {legacy_config['connector_name']}",
        "agentTypes": legacy_config["agents"],
        "dataTypes": legacy_config["file_types"],
        "usageRules": {
            "when": "legacy connector would be used",
            "priority": "medium"
        },
        "configuration": {
            "legacyCompatibility": True,
            "migrationDate": "2024-01-15"
        }
    }

# Usage
migrated_connector = migrate_connector(legacy_connector)
new_connector = client.connectors.create(migrated_connector)
This comprehensive documentation covers all aspects of the Connectors API, including detailed examples, integration patterns, error handling, rate limiting, best practices, and migration guidance. The documentation provides both API-level and SDK-level information to help developers effectively use and extend Artos’s connector capabilities.