> ## Documentation Index
> Fetch the complete documentation index at: https://docs.artosai.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Async Operations

> Understanding asynchronous processing patterns

# Async Operations

Artos uses asynchronous processing for long-running tasks like document generation. This guide explains how async operations work and best practices for handling them.

## Why Async Operations?

Document generation involves multiple time-consuming steps:

1. **Content Extraction** - Read and parse source documents
2. **Classification** - Categorize and organize content
3. **Rule Processing** - Apply extraction and validation rules
4. **Document Generation** - Create formatted output
5. **Post-Processing** - Optimize and finalize document

This can take several minutes to complete, making synchronous processing impractical for web APIs.

Async operations enable:

* **Immediate Response** - Client gets task ID immediately
* **Non-Blocking** - Client can continue other work
* **Progress Tracking** - Poll status at any time
* **Scalability** - Server can process multiple requests
* **Reliability** - Failed tasks can be retried

## How Async Works in Artos

### 1. Request Submitted

Client submits document generation request:

```bash theme={null}
POST /api/v1/documents/generate
Content-Type: application/json
Authorization: Bearer TOKEN

{
  "document_type": "CSR",
  "file_paths": ["org-id/documents/protocol.pdf"],
  ...
}
```

### 2. Task Queued (202 Accepted)

Request is accepted and queued as a Celery task:

```json theme={null}
HTTP/1.1 202 Accepted

{
  "message": "Document generation started",
  "task_id": "celery-task-uuid-abc123"
}
```

**Key Points**:

* Response returned immediately (not waiting for completion)
* HTTP status is 202 (Accepted), not 200 (OK)
* Client receives `task_id` for tracking

### 3. Processing

The task is processed asynchronously in the background:

```
Time: 0s     - Request received, task queued
Time: 0-5s   - Extracting content from source documents
Time: 5-10s  - Classifying and organizing content
Time: 10-20s - Applying extraction rules
Time: 20-30s - Generating final document
Time: 30s    - Task complete
```

Client doesn't wait - they can continue immediately.

### 4. Status Polling

Client polls the status endpoint to track progress:

```bash theme={null}
GET /api/v1/documents/status/{task_id}
Authorization: Bearer TOKEN
```

Response includes current status:

```json theme={null}
{
  "task_id": "celery-task-abc123",
  "status": "Generating",
  "progress": null,
  "error": null
}
```

**Status Values**:

* **Generating** - Task is currently processing
* **Complete** - Task finished successfully
* **Failed** - Task encountered an error

### 5. Result Retrieval

Once complete, retrieve the document:

```bash theme={null}
GET /api/v1/documents/{document_id}
Authorization: Bearer TOKEN
```

## Implementation Pattern

### Basic Polling Pattern

```python theme={null}
import requests
import time

def generate_and_wait(api_url, token, payload, max_wait=600):
    # 1. Submit generation request
    response = requests.post(
        f"{api_url}/api/v1/documents/generate",
        headers={"Authorization": f"Bearer {token}"},
        json=payload
    )

    if response.status_code != 202:
        raise Exception(f"Failed to start generation: {response.json()}")

    task_id = response.json()['task_id']
    print(f"Task started: {task_id}")

    # 2. Poll status until complete
    start_time = time.time()
    while time.time() - start_time < max_wait:
        status_response = requests.get(
            f"{api_url}/api/v1/documents/status/{task_id}",
            headers={"Authorization": f"Bearer {token}"}
        )

        status = status_response.json()
        print(f"Status: {status['status']}")

        if status['status'] == 'Complete':
            print("Document generation complete!")
            return task_id
        elif status['status'] == 'Failed':
            raise Exception(f"Generation failed: {status['error']}")

        # Wait before polling again
        time.sleep(5)

    raise TimeoutError(f"Document generation timed out after {max_wait}s")

# Usage
task_id = generate_and_wait(
    "https://api.artosai.com",
    token,
    payload
)
```

### Polling with Exponential Backoff

Optimize polling by increasing wait time:

```python theme={null}
def generate_with_backoff(api_url, token, payload):
    # 1. Submit request
    response = requests.post(...)
    task_id = response.json()['task_id']

    # 2. Poll with exponential backoff
    wait_time = 1  # Start with 1 second
    max_wait_time = 30  # Cap at 30 seconds

    while True:
        time.sleep(wait_time)

        status = requests.get(...)
        if status['status'] == 'Complete':
            return task_id
        elif status['status'] == 'Failed':
            raise Exception(f"Failed: {status['error']}")

        # Increase wait time, but cap it
        wait_time = min(wait_time * 1.5, max_wait_time)
```

### Async/Await Pattern

For async Python code:

```python theme={null}
import asyncio
import aiohttp

async def generate_async(session, api_url, token, payload):
    # 1. Submit request
    async with session.post(
        f"{api_url}/api/v1/documents/generate",
        headers={"Authorization": f"Bearer {token}"},
        json=payload
    ) as response:
        task_id = (await response.json())['task_id']

    # 2. Poll status
    while True:
        await asyncio.sleep(5)

        async with session.get(
            f"{api_url}/api/v1/documents/status/{task_id}",
            headers={"Authorization": f"Bearer {token}"}
        ) as response:
            status = await response.json()

            if status['status'] == 'Complete':
                return task_id
            elif status['status'] == 'Failed':
                raise Exception(f"Failed: {status['error']}")

# Usage
async with aiohttp.ClientSession() as session:
    task_id = await generate_async(session, api_url, token, payload)
```

## Polling Strategies

### 1. Aggressive Polling

Poll frequently for immediate feedback:

```python theme={null}
# Poll every 2 seconds
while True:
    status = get_status(task_id)
    if status['status'] in ['Complete', 'Failed']:
        break
    time.sleep(2)
```

**Pros**: Fast feedback
**Cons**: Higher server load, more requests

### 2. Conservative Polling

Poll less frequently to reduce load:

```python theme={null}
# Poll every 30 seconds
while True:
    status = get_status(task_id)
    if status['status'] in ['Complete', 'Failed']:
        break
    time.sleep(30)
```

**Pros**: Lower server load
**Cons**: Slower feedback

### 3. Exponential Backoff (Recommended)

Start fast, gradually slow down:

```python theme={null}
# Start fast, slow down over time
wait = 1
while True:
    status = get_status(task_id)
    if status['status'] in ['Complete', 'Failed']:
        break
    wait = min(wait * 1.5, 30)  # Cap at 30 seconds
    time.sleep(wait)
```

**Pros**: Good balance of responsiveness and load
**Cons**: More complex code

## Error Handling

### Task Failed

If a task fails during processing:

```bash theme={null}
GET /api/v1/documents/status/task-id
→ status: "Failed"
→ error: "Missing required section: Safety Analysis"
```

Handle failures:

```python theme={null}
status = get_status(task_id)

if status['status'] == 'Failed':
    error = status['error']
    print(f"Generation failed: {error}")

    # Log error for debugging
    logger.error(f"Document generation failed", extra={
        "task_id": task_id,
        "error": error
    })

    # Retry or notify user
    if should_retry():
        generate_and_wait(api_url, token, payload)
    else:
        notify_user_of_failure()
```

### Timeout

Task takes longer than expected:

```python theme={null}
def generate_with_timeout(task_id, max_wait=600):
    start = time.time()

    while time.time() - start < max_wait:
        status = get_status(task_id)

        if status['status'] in ['Complete', 'Failed']:
            return status

        time.sleep(5)

    raise TimeoutError(
        f"Task {task_id} exceeded {max_wait}s timeout"
    )
```

## Webhook Alternative (Future)

Currently polling is the only way to track status. In the future, Artos may support webhooks:

```python theme={null}
# Future: Register webhook endpoint
requests.post(
    f"{api_url}/api/v1/documents/generate",
    json={
        **payload,
        "webhook": "https://myapp.com/webhooks/document-complete"
    }
)

# Webhook callback receives completion notification
@app.post("/webhooks/document-complete")
def on_document_complete(event):
    task_id = event['task_id']
    status = event['status']
    document_id = event['document_id']
    # Process completion
```

Until webhooks are available, use polling.

## Best Practices

### 1. Implement Proper Timeout

Always set a reasonable timeout:

```python theme={null}
MAX_WAIT = 600  # 10 minutes max

task_id = submit_generation(payload)
result = poll_status(task_id, max_wait=MAX_WAIT)
```

### 2. Use Exponential Backoff

Start fast, slow down over time:

```python theme={null}
wait_time = 1
while True:
    status = get_status(task_id)
    if status['status'] != 'Generating':
        break
    wait_time = min(wait_time * 1.5, 30)
    time.sleep(wait_time)
```

### 3. Handle Errors Gracefully

Always handle failure cases:

```python theme={null}
try:
    task_id = submit_generation(payload)
except Exception as e:
    logger.error(f"Generation submission failed: {e}")
    notify_user_of_failure()

try:
    result = poll_status(task_id)
except TimeoutError:
    logger.error(f"Task {task_id} timed out")
    notify_user_of_timeout()
```

### 4. Log Task IDs

Keep task IDs for debugging and support:

```python theme={null}
logger.info(f"Document generation started", extra={
    "task_id": task_id,
    "document_type": payload['document_type'],
    "user_id": current_user.id
})
```

### 5. Cache Status

Avoid polling the same task multiple times:

```python theme={null}
# Use caching to avoid duplicate requests
status_cache = {}

def get_cached_status(task_id, cache_ttl=10):
    if task_id in status_cache:
        cached_time, cached_status = status_cache[task_id]
        if time.time() - cached_time < cache_ttl:
            return cached_status

    status = requests.get(f"/status/{task_id}").json()
    status_cache[task_id] = (time.time(), status)
    return status
```

## Related Topics

* **[Document Generation](document-generation)** - Generation pipeline details
* **[Documents API](../api-reference/documents)** - API endpoints and responses
* **[Status Polling](../api-reference/documents#get-document-status)** - Status endpoint documentation
