Skip to main content

Async Operations

Artos uses asynchronous processing for long-running tasks like document generation. This guide explains how async operations work and best practices for handling them.

Why Async Operations?

Document generation involves multiple time-consuming steps:
  1. Content Extraction - Read and parse source documents
  2. Classification - Categorize and organize content
  3. Rule Processing - Apply extraction and validation rules
  4. Document Generation - Create formatted output
  5. Post-Processing - Optimize and finalize document
This can take several minutes to complete, making synchronous processing impractical for web APIs. Async operations enable:
  • Immediate Response - Client gets task ID immediately
  • Non-Blocking - Client can continue other work
  • Progress Tracking - Poll status at any time
  • Scalability - Server can process multiple requests
  • Reliability - Failed tasks can be retried

How Async Works in Artos

1. Request Submitted

Client submits document generation request:
POST /api/v1/documents/generate
Content-Type: application/json
Authorization: Bearer TOKEN

{
  "document_type": "CSR",
  "file_paths": ["s3://bucket/protocol.pdf"],
  ...
}

2. Task Queued (202 Accepted)

Request is accepted and queued as a Celery task:
HTTP/1.1 202 Accepted

{
  "message": "Document generation started",
  "task_id": "celery-task-uuid-abc123"
}
Key Points:
  • Response returned immediately (not waiting for completion)
  • HTTP status is 202 (Accepted), not 200 (OK)
  • Client receives task_id for tracking

3. Processing

The task is processed asynchronously in the background:
Time: 0s     - Request received, task queued
Time: 0-5s   - Extracting content from source documents
Time: 5-10s  - Classifying and organizing content
Time: 10-20s - Applying extraction rules
Time: 20-30s - Generating final document
Time: 30s    - Task complete
Client doesn’t wait - they can continue immediately.

4. Status Polling

Client polls the status endpoint to track progress:
GET /api/v1/documents/status/{task_id}
Authorization: Bearer TOKEN
Response includes current status:
{
  "task_id": "celery-task-abc123",
  "status": "Generating",
  "progress": null,
  "error": null
}
Status Values:
  • Generating - Task is currently processing
  • Complete - Task finished successfully
  • Failed - Task encountered an error

5. Result Retrieval

Once complete, retrieve the document:
GET /api/v1/documents/{document_id}
Authorization: Bearer TOKEN

Implementation Pattern

Basic Polling Pattern

import requests
import time

def generate_and_wait(api_url, token, payload, max_wait=600):
    # 1. Submit generation request
    response = requests.post(
        f"{api_url}/api/v1/documents/generate",
        headers={"Authorization": f"Bearer {token}"},
        json=payload
    )

    if response.status_code != 202:
        raise Exception(f"Failed to start generation: {response.json()}")

    task_id = response.json()['task_id']
    print(f"Task started: {task_id}")

    # 2. Poll status until complete
    start_time = time.time()
    while time.time() - start_time < max_wait:
        status_response = requests.get(
            f"{api_url}/api/v1/documents/status/{task_id}",
            headers={"Authorization": f"Bearer {token}"}
        )

        status = status_response.json()
        print(f"Status: {status['status']}")

        if status['status'] == 'Complete':
            print("Document generation complete!")
            return task_id
        elif status['status'] == 'Failed':
            raise Exception(f"Generation failed: {status['error']}")

        # Wait before polling again
        time.sleep(5)

    raise TimeoutError(f"Document generation timed out after {max_wait}s")

# Usage
task_id = generate_and_wait(
    "https://api.artosai.com",
    token,
    payload
)

Polling with Exponential Backoff

Optimize polling by increasing wait time:
def generate_with_backoff(api_url, token, payload):
    # 1. Submit request
    response = requests.post(...)
    task_id = response.json()['task_id']

    # 2. Poll with exponential backoff
    wait_time = 1  # Start with 1 second
    max_wait_time = 30  # Cap at 30 seconds

    while True:
        time.sleep(wait_time)

        status = requests.get(...)
        if status['status'] == 'Complete':
            return task_id
        elif status['status'] == 'Failed':
            raise Exception(f"Failed: {status['error']}")

        # Increase wait time, but cap it
        wait_time = min(wait_time * 1.5, max_wait_time)

Async/Await Pattern

For async Python code:
import asyncio
import aiohttp

async def generate_async(session, api_url, token, payload):
    # 1. Submit request
    async with session.post(
        f"{api_url}/api/v1/documents/generate",
        headers={"Authorization": f"Bearer {token}"},
        json=payload
    ) as response:
        task_id = (await response.json())['task_id']

    # 2. Poll status
    while True:
        await asyncio.sleep(5)

        async with session.get(
            f"{api_url}/api/v1/documents/status/{task_id}",
            headers={"Authorization": f"Bearer {token}"}
        ) as response:
            status = await response.json()

            if status['status'] == 'Complete':
                return task_id
            elif status['status'] == 'Failed':
                raise Exception(f"Failed: {status['error']}")

# Usage
async with aiohttp.ClientSession() as session:
    task_id = await generate_async(session, api_url, token, payload)

Polling Strategies

1. Aggressive Polling

Poll frequently for immediate feedback:
# Poll every 2 seconds
while True:
    status = get_status(task_id)
    if status['status'] in ['Complete', 'Failed']:
        break
    time.sleep(2)
Pros: Fast feedback Cons: Higher server load, more requests

2. Conservative Polling

Poll less frequently to reduce load:
# Poll every 30 seconds
while True:
    status = get_status(task_id)
    if status['status'] in ['Complete', 'Failed']:
        break
    time.sleep(30)
Pros: Lower server load Cons: Slower feedback Start fast, gradually slow down:
# Start fast, slow down over time
wait = 1
while True:
    status = get_status(task_id)
    if status['status'] in ['Complete', 'Failed']:
        break
    wait = min(wait * 1.5, 30)  # Cap at 30 seconds
    time.sleep(wait)
Pros: Good balance of responsiveness and load Cons: More complex code

Error Handling

Task Failed

If a task fails during processing:
GET /api/v1/documents/status/task-id
 status: "Failed"
 error: "Missing required section: Safety Analysis"
Handle failures:
status = get_status(task_id)

if status['status'] == 'Failed':
    error = status['error']
    print(f"Generation failed: {error}")

    # Log error for debugging
    logger.error(f"Document generation failed", extra={
        "task_id": task_id,
        "error": error
    })

    # Retry or notify user
    if should_retry():
        generate_and_wait(api_url, token, payload)
    else:
        notify_user_of_failure()

Timeout

Task takes longer than expected:
def generate_with_timeout(task_id, max_wait=600):
    start = time.time()

    while time.time() - start < max_wait:
        status = get_status(task_id)

        if status['status'] in ['Complete', 'Failed']:
            return status

        time.sleep(5)

    raise TimeoutError(
        f"Task {task_id} exceeded {max_wait}s timeout"
    )

Webhook Alternative (Future)

Currently polling is the only way to track status. In the future, Artos may support webhooks:
# Future: Register webhook endpoint
requests.post(
    f"{api_url}/api/v1/documents/generate",
    json={
        **payload,
        "webhook": "https://myapp.com/webhooks/document-complete"
    }
)

# Webhook callback receives completion notification
@app.post("/webhooks/document-complete")
def on_document_complete(event):
    task_id = event['task_id']
    status = event['status']
    document_id = event['document_id']
    # Process completion
Until webhooks are available, use polling.

Best Practices

1. Implement Proper Timeout

Always set a reasonable timeout:
MAX_WAIT = 600  # 10 minutes max

task_id = submit_generation(payload)
result = poll_status(task_id, max_wait=MAX_WAIT)

2. Use Exponential Backoff

Start fast, slow down over time:
wait_time = 1
while True:
    status = get_status(task_id)
    if status['status'] != 'Generating':
        break
    wait_time = min(wait_time * 1.5, 30)
    time.sleep(wait_time)

3. Handle Errors Gracefully

Always handle failure cases:
try:
    task_id = submit_generation(payload)
except Exception as e:
    logger.error(f"Generation submission failed: {e}")
    notify_user_of_failure()

try:
    result = poll_status(task_id)
except TimeoutError:
    logger.error(f"Task {task_id} timed out")
    notify_user_of_timeout()

4. Log Task IDs

Keep task IDs for debugging and support:
logger.info(f"Document generation started", extra={
    "task_id": task_id,
    "document_type": payload['document_type'],
    "user_id": current_user.id
})

5. Cache Status

Avoid polling the same task multiple times:
# Use caching to avoid duplicate requests
status_cache = {}

def get_cached_status(task_id, cache_ttl=10):
    if task_id in status_cache:
        cached_time, cached_status = status_cache[task_id]
        if time.time() - cached_time < cache_ttl:
            return cached_status

    status = requests.get(f"/status/{task_id}").json()
    status_cache[task_id] = (time.time(), status)
    return status