Documentation Index
Fetch the complete documentation index at: https://docs.artosai.com/llms.txt
Use this file to discover all available pages before exploring further.
Async Operations
Artos uses asynchronous processing for long-running tasks like document generation. This guide explains how async operations work and best practices for handling them.
Why Async Operations?
Document generation involves multiple time-consuming steps:
- Content Extraction - Read and parse source documents
- Classification - Categorize and organize content
- Rule Processing - Apply extraction and validation rules
- Document Generation - Create formatted output
- Post-Processing - Optimize and finalize document
This can take several minutes to complete, making synchronous processing impractical for web APIs.
Async operations enable:
- Immediate Response - Client gets task ID immediately
- Non-Blocking - Client can continue other work
- Progress Tracking - Poll status at any time
- Scalability - Server can process multiple requests
- Reliability - Failed tasks can be retried
How Async Works in Artos
1. Request Submitted
Client submits document generation request:
POST /api/v1/documents/generate
Content-Type: application/json
Authorization: Bearer TOKEN
{
"document_type": "CSR",
"file_paths": ["org-id/documents/protocol.pdf"],
...
}
2. Task Queued (202 Accepted)
Request is accepted and queued as a Celery task:
HTTP/1.1 202 Accepted
{
"message": "Document generation started",
"task_id": "celery-task-uuid-abc123"
}
Key Points:
- Response returned immediately (not waiting for completion)
- HTTP status is 202 (Accepted), not 200 (OK)
- Client receives
task_id for tracking
3. Processing
The task is processed asynchronously in the background:
Time: 0s - Request received, task queued
Time: 0-5s - Extracting content from source documents
Time: 5-10s - Classifying and organizing content
Time: 10-20s - Applying extraction rules
Time: 20-30s - Generating final document
Time: 30s - Task complete
Client doesn’t wait - they can continue immediately.
4. Status Polling
Client polls the status endpoint to track progress:
GET /api/v1/documents/status/{task_id}
Authorization: Bearer TOKEN
Response includes current status:
{
"task_id": "celery-task-abc123",
"status": "Generating",
"progress": null,
"error": null
}
Status Values:
- Generating - Task is currently processing
- Complete - Task finished successfully
- Failed - Task encountered an error
5. Result Retrieval
Once complete, retrieve the document:
GET /api/v1/documents/{document_id}
Authorization: Bearer TOKEN
Implementation Pattern
Basic Polling Pattern
import requests
import time
def generate_and_wait(api_url, token, payload, max_wait=600):
# 1. Submit generation request
response = requests.post(
f"{api_url}/api/v1/documents/generate",
headers={"Authorization": f"Bearer {token}"},
json=payload
)
if response.status_code != 202:
raise Exception(f"Failed to start generation: {response.json()}")
task_id = response.json()['task_id']
print(f"Task started: {task_id}")
# 2. Poll status until complete
start_time = time.time()
while time.time() - start_time < max_wait:
status_response = requests.get(
f"{api_url}/api/v1/documents/status/{task_id}",
headers={"Authorization": f"Bearer {token}"}
)
status = status_response.json()
print(f"Status: {status['status']}")
if status['status'] == 'Complete':
print("Document generation complete!")
return task_id
elif status['status'] == 'Failed':
raise Exception(f"Generation failed: {status['error']}")
# Wait before polling again
time.sleep(5)
raise TimeoutError(f"Document generation timed out after {max_wait}s")
# Usage
task_id = generate_and_wait(
"https://api.artosai.com",
token,
payload
)
Polling with Exponential Backoff
Optimize polling by increasing wait time:
def generate_with_backoff(api_url, token, payload):
# 1. Submit request
response = requests.post(...)
task_id = response.json()['task_id']
# 2. Poll with exponential backoff
wait_time = 1 # Start with 1 second
max_wait_time = 30 # Cap at 30 seconds
while True:
time.sleep(wait_time)
status = requests.get(...)
if status['status'] == 'Complete':
return task_id
elif status['status'] == 'Failed':
raise Exception(f"Failed: {status['error']}")
# Increase wait time, but cap it
wait_time = min(wait_time * 1.5, max_wait_time)
Async/Await Pattern
For async Python code:
import asyncio
import aiohttp
async def generate_async(session, api_url, token, payload):
# 1. Submit request
async with session.post(
f"{api_url}/api/v1/documents/generate",
headers={"Authorization": f"Bearer {token}"},
json=payload
) as response:
task_id = (await response.json())['task_id']
# 2. Poll status
while True:
await asyncio.sleep(5)
async with session.get(
f"{api_url}/api/v1/documents/status/{task_id}",
headers={"Authorization": f"Bearer {token}"}
) as response:
status = await response.json()
if status['status'] == 'Complete':
return task_id
elif status['status'] == 'Failed':
raise Exception(f"Failed: {status['error']}")
# Usage
async with aiohttp.ClientSession() as session:
task_id = await generate_async(session, api_url, token, payload)
Polling Strategies
1. Aggressive Polling
Poll frequently for immediate feedback:
# Poll every 2 seconds
while True:
status = get_status(task_id)
if status['status'] in ['Complete', 'Failed']:
break
time.sleep(2)
Pros: Fast feedback
Cons: Higher server load, more requests
2. Conservative Polling
Poll less frequently to reduce load:
# Poll every 30 seconds
while True:
status = get_status(task_id)
if status['status'] in ['Complete', 'Failed']:
break
time.sleep(30)
Pros: Lower server load
Cons: Slower feedback
3. Exponential Backoff (Recommended)
Start fast, gradually slow down:
# Start fast, slow down over time
wait = 1
while True:
status = get_status(task_id)
if status['status'] in ['Complete', 'Failed']:
break
wait = min(wait * 1.5, 30) # Cap at 30 seconds
time.sleep(wait)
Pros: Good balance of responsiveness and load
Cons: More complex code
Error Handling
Task Failed
If a task fails during processing:
GET /api/v1/documents/status/task-id
→ status: "Failed"
→ error: "Missing required section: Safety Analysis"
Handle failures:
status = get_status(task_id)
if status['status'] == 'Failed':
error = status['error']
print(f"Generation failed: {error}")
# Log error for debugging
logger.error(f"Document generation failed", extra={
"task_id": task_id,
"error": error
})
# Retry or notify user
if should_retry():
generate_and_wait(api_url, token, payload)
else:
notify_user_of_failure()
Timeout
Task takes longer than expected:
def generate_with_timeout(task_id, max_wait=600):
start = time.time()
while time.time() - start < max_wait:
status = get_status(task_id)
if status['status'] in ['Complete', 'Failed']:
return status
time.sleep(5)
raise TimeoutError(
f"Task {task_id} exceeded {max_wait}s timeout"
)
Webhook Alternative (Future)
Currently polling is the only way to track status. In the future, Artos may support webhooks:
# Future: Register webhook endpoint
requests.post(
f"{api_url}/api/v1/documents/generate",
json={
**payload,
"webhook": "https://myapp.com/webhooks/document-complete"
}
)
# Webhook callback receives completion notification
@app.post("/webhooks/document-complete")
def on_document_complete(event):
task_id = event['task_id']
status = event['status']
document_id = event['document_id']
# Process completion
Until webhooks are available, use polling.
Best Practices
1. Implement Proper Timeout
Always set a reasonable timeout:
MAX_WAIT = 600 # 10 minutes max
task_id = submit_generation(payload)
result = poll_status(task_id, max_wait=MAX_WAIT)
2. Use Exponential Backoff
Start fast, slow down over time:
wait_time = 1
while True:
status = get_status(task_id)
if status['status'] != 'Generating':
break
wait_time = min(wait_time * 1.5, 30)
time.sleep(wait_time)
3. Handle Errors Gracefully
Always handle failure cases:
try:
task_id = submit_generation(payload)
except Exception as e:
logger.error(f"Generation submission failed: {e}")
notify_user_of_failure()
try:
result = poll_status(task_id)
except TimeoutError:
logger.error(f"Task {task_id} timed out")
notify_user_of_timeout()
4. Log Task IDs
Keep task IDs for debugging and support:
logger.info(f"Document generation started", extra={
"task_id": task_id,
"document_type": payload['document_type'],
"user_id": current_user.id
})
5. Cache Status
Avoid polling the same task multiple times:
# Use caching to avoid duplicate requests
status_cache = {}
def get_cached_status(task_id, cache_ttl=10):
if task_id in status_cache:
cached_time, cached_status = status_cache[task_id]
if time.time() - cached_time < cache_ttl:
return cached_status
status = requests.get(f"/status/{task_id}").json()
status_cache[task_id] = (time.time(), status)
return status