Introduction
Some operations simply can’t finish in the time a user is willing to wait for an HTTP response. Video transcoding, report generation, data migrations, batch emails, ML model training, payment processing across services — these tasks take seconds, minutes, or hours.
The instinct to just “make the endpoint wait” breaks down immediately. HTTP connections time out. Load balancers kill idle requests. Users close browser tabs. Worker processes get recycled. And a single slow task blocks a thread that could serve hundreds of fast requests.
This article covers the patterns that production systems use to reliably manage long-running tasks.
The Fundamental Pattern: Accept, Enqueue, Process
Every long-running task system follows the same core architecture:
- Accept — API validates the request and creates a task record
- Enqueue — Task is placed on a durable message queue
- Process — A worker picks up the task and executes it asynchronously
- Notify — Client is informed of progress and completion
The key insight: decouple submission from execution. The API server’s only job is to say “yes, I’ve accepted your task” and hand back a task ID. Everything else happens in the background.
// Express API -- accept task, return immediately
import { randomUUID } from 'crypto';
import { sendToQueue } from './queue';
import { taskStore } from './store';
app.post('/api/tasks', async (req, res) => {
const { type, params } = req.body;
const userId = req.user.id;
// Validate
if (!VALID_TASK_TYPES.includes(type)) {
return res.status(400).json({ error: 'Invalid task type' });
}
// Create task record
const taskId = randomUUID();
await taskStore.create({
id: taskId,
userId,
type,
params,
status: 'pending',
progress: 0,
createdAt: new Date(),
});
// Enqueue for async processing
await sendToQueue('tasks', {
taskId,
type,
params,
userId,
});
// Return immediately -- HTTP 202 Accepted
res.status(202).json({
taskId,
status: 'pending',
statusUrl: `/api/tasks/${taskId}`,
});
});The client gets a response in milliseconds, regardless of whether the task takes 2 seconds or 2 hours.
Task State Machine
Every task moves through a well-defined set of states. Model this explicitly — it prevents invalid transitions and makes debugging straightforward.
// Task states and valid transitions
type TaskStatus =
| 'pending' // Created, waiting in queue
| 'running' // Worker picked it up
| 'progress' // Actively executing, progress updates
| 'completed' // Finished successfully
| 'failed' // Failed after all retries exhausted
| 'retrying' // Failed, scheduled for retry
| 'cancelled' // User cancelled
| 'timed_out'; // Exceeded max duration
const VALID_TRANSITIONS: Record<TaskStatus, TaskStatus[]> = {
pending: ['running', 'cancelled', 'timed_out'],
running: ['progress', 'completed', 'failed', 'cancelled'],
progress: ['progress', 'completed', 'failed', 'cancelled'],
completed: [], // Terminal
failed: ['retrying'],
retrying: ['running', 'failed'],
cancelled: [], // Terminal
timed_out: ['retrying', 'failed'],
};
async function transitionTask(
taskId: string,
newStatus: TaskStatus,
metadata?: Record<string, unknown>
) {
const task = await taskStore.get(taskId);
if (!VALID_TRANSITIONS[task.status]?.includes(newStatus)) {
throw new Error(
`Invalid transition: ${task.status} -> ${newStatus}`
);
}
await taskStore.update(taskId, {
status: newStatus,
updatedAt: new Date(),
...metadata,
});
}Worker Implementation
Workers are the engines that execute tasks. They must be stateless, crash-safe, and capable of processing any task type.
# Python worker -- consume from queue, execute tasks
import json
import signal
import traceback
from datetime import datetime
class TaskWorker:
def __init__(self, queue_client, task_store, handlers):
self.queue = queue_client
self.store = task_store
self.handlers = handlers # {task_type: handler_fn}
self.running = True
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def _shutdown(self, signum, frame):
"""Graceful shutdown -- finish current task, then exit."""
print(f"Received signal {signum}, shutting down...")
self.running = False
def run(self):
"""Main loop: pull tasks, execute, update state."""
while self.running:
message = self.queue.receive(
wait_time=20, # Long poll
visibility_timeout=300, # 5 min lock
)
if not message:
continue
task_data = json.loads(message.body)
task_id = task_data['taskId']
try:
# Mark as running
self.store.update(task_id, {
'status': 'running',
'startedAt': datetime.utcnow().isoformat(),
'workerId': self.worker_id,
})
# Dispatch to handler
handler = self.handlers.get(task_data['type'])
if not handler:
raise ValueError(
f"Unknown task type: {task_data['type']}"
)
# Execute with progress callback
result = handler(
task_data['params'],
progress_fn=lambda p: self.store.update(
task_id, {'progress': p}
),
)
# Mark completed
self.store.update(task_id, {
'status': 'completed',
'progress': 100,
'result': result,
'completedAt': datetime.utcnow().isoformat(),
})
# Ack the message (removes from queue)
self.queue.ack(message)
except Exception as e:
self.store.update(task_id, {
'status': 'failed',
'error': str(e),
'stackTrace': traceback.format_exc(),
'failedAt': datetime.utcnow().isoformat(),
})
# Don't ack -- message returns to queue
# after visibility timeout (automatic retry)
@property
def worker_id(self):
import socket
return f"{socket.gethostname()}-{os.getpid()}"Example Task Handler
# Video transcoding handler
def handle_video_transcode(params, progress_fn):
"""Transcode video to multiple resolutions."""
source_key = params['sourceKey']
output_prefix = params['outputPrefix']
resolutions = params.get('resolutions', [1080, 720, 480])
local_source = download_from_s3(source_key)
results = {}
for i, res in enumerate(resolutions):
# Report progress per resolution
base_progress = (i / len(resolutions)) * 100
progress_fn(int(base_progress))
output_path = transcode_ffmpeg(
local_source,
resolution=res,
on_progress=lambda p: progress_fn(
int(base_progress + p / len(resolutions))
),
)
key = f"{output_prefix}/{res}p.mp4"
upload_to_s3(key, output_path)
results[f'{res}p'] = key
return resultsProgress Tracking and Client Notification
Clients need to know what’s happening with their task. There are four strategies, each with different tradeoffs.
Strategy 1: Polling
Simplest approach. Client periodically hits the status endpoint.
// Client-side polling
async function waitForTask(taskId, onProgress) {
const POLL_INTERVAL = 2000; // 2 seconds
while (true) {
const res = await fetch(`/api/tasks/${taskId}`);
const task = await res.json();
onProgress?.(task);
if (task.status === 'completed') {
return task.result;
}
if (task.status === 'failed') {
throw new Error(task.error);
}
if (task.status === 'cancelled') {
throw new Error('Task was cancelled');
}
await new Promise(r => setTimeout(r, POLL_INTERVAL));
}
}
// Usage
const result = await waitForTask('abc-123', (task) => {
progressBar.style.width = `${task.progress}%`;
statusText.textContent = task.status;
});Strategy 2: Server-Sent Events (SSE)
Real-time progress without polling overhead. The server pushes updates as they happen.
// Server -- SSE endpoint for task progress
app.get('/api/tasks/:id/stream', (req, res) => {
const { id } = req.params;
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// Subscribe to task updates
const unsubscribe = taskStore.subscribe(id, (task) => {
res.write(`data: ${JSON.stringify(task)}\n\n`);
// Close stream on terminal states
if (['completed', 'failed', 'cancelled'].includes(task.status)) {
res.end();
}
});
req.on('close', () => {
unsubscribe();
});
});// Client -- consume SSE stream
function streamTaskProgress(taskId, onUpdate) {
return new Promise((resolve, reject) => {
const source = new EventSource(
`/api/tasks/${taskId}/stream`
);
source.onmessage = (event) => {
const task = JSON.parse(event.data);
onUpdate(task);
if (task.status === 'completed') {
source.close();
resolve(task.result);
} else if (task.status === 'failed') {
source.close();
reject(new Error(task.error));
}
};
source.onerror = () => {
source.close();
// Fallback to polling on SSE failure
waitForTask(taskId, onUpdate).then(resolve, reject);
};
});
}Strategy 3: Webhook Callback
For server-to-server integrations where no user is waiting interactively:
// Worker sends webhook on completion
async function notifyWebhook(task) {
if (!task.webhookUrl) return;
const payload = {
taskId: task.id,
status: task.status,
result: task.result,
completedAt: task.completedAt,
};
// Sign the payload so receiver can verify authenticity
const signature = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(JSON.stringify(payload))
.digest('hex');
await fetch(task.webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Signature': signature,
},
body: JSON.stringify(payload),
});
}Retry Strategies
Tasks fail. Networks blip, dependencies go down, OOM kills happen. A robust retry strategy is the difference between “it eventually works” and “the data is lost.”
Exponential Backoff with Jitter
The gold standard for retries. Each attempt waits longer, and jitter prevents all failed tasks from retrying at the same instant (thundering herd).
import random
import time
def retry_with_backoff(
fn,
max_retries=5,
base_delay=1.0,
max_delay=60.0,
jitter=True,
):
"""Execute fn with exponential backoff retry."""
for attempt in range(max_retries + 1):
try:
return fn()
except RetryableError as e:
if attempt == max_retries:
raise MaxRetriesExceeded(
f"Failed after {max_retries} retries"
) from e
# Exponential delay
delay = min(max_delay, base_delay * (2 ** attempt))
# Add jitter to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
print(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)Classifying Errors: Retryable vs. Fatal
Not all errors deserve a retry. Retrying a “file not found” error wastes resources. Retrying a “connection timeout” might succeed.
class RetryableError(Exception):
"""Errors that may succeed on retry."""
pass
class FatalError(Exception):
"""Errors that will never succeed on retry."""
pass
def classify_error(error):
"""Decide if an error is worth retrying."""
# Network/transient errors -- retry
if isinstance(error, (
ConnectionError,
TimeoutError,
ThrottlingError,
)):
return RetryableError(str(error))
# Service unavailable -- retry
if hasattr(error, 'status_code') and error.status_code in (
429, # Too Many Requests
502, # Bad Gateway
503, # Service Unavailable
504, # Gateway Timeout
):
return RetryableError(str(error))
# Client errors, validation, auth -- don't retry
if hasattr(error, 'status_code') and 400 <= error.status_code < 500:
return FatalError(str(error))
# Unknown -- retry cautiously
return RetryableError(str(error))Dead Letter Queue (DLQ)
When retries are exhausted, don’t silently drop the task. Move it to a dead letter queue for manual inspection.
def process_with_dlq(message, handler, max_retries=3):
"""Process message, send to DLQ after max retries."""
retry_count = int(
message.attributes.get('ApproximateReceiveCount', 0)
)
try:
handler(message)
queue.ack(message)
except FatalError as e:
# No point retrying -- send to DLQ immediately
dlq.send(message.body, metadata={
'error': str(e),
'error_type': 'fatal',
'original_queue': 'tasks',
})
queue.ack(message) # Remove from main queue
except RetryableError:
if retry_count >= max_retries:
# Exhausted retries -- send to DLQ
dlq.send(message.body, metadata={
'error': str(e),
'error_type': 'retries_exhausted',
'retry_count': retry_count,
})
queue.ack(message)
else:
# Let it return to queue (visibility timeout)
passCheckpointing: Surviving Crashes Mid-Task
For tasks that process large datasets (migrate 10 million rows, process 50,000 images), a crash at 90% shouldn’t mean restarting from 0%. Periodically save a checkpoint so a new worker can resume where the crashed one left off.
class CheckpointedTask:
"""Task that saves progress checkpoints for resumability."""
def __init__(self, task_id, store):
self.task_id = task_id
self.store = store
self.checkpoint_interval = 1000 # Every N items
async def run(self, items):
# Load last checkpoint
checkpoint = await self.store.get_checkpoint(self.task_id)
start_offset = checkpoint.get('offset', 0) if checkpoint else 0
processed = start_offset
print(f"Resuming from offset {start_offset}")
for i, item in enumerate(items[start_offset:], start=start_offset):
# Process single item
await self.process_item(item)
processed = i + 1
# Checkpoint periodically
if processed % self.checkpoint_interval == 0:
await self.save_checkpoint(processed, len(items))
# Final checkpoint
await self.save_checkpoint(processed, len(items))
return {'processed': processed}
async def save_checkpoint(self, offset, total):
await self.store.set_checkpoint(self.task_id, {
'offset': offset,
'total': total,
'progress': int((offset / total) * 100),
'updated_at': datetime.utcnow().isoformat(),
})
async def process_item(self, item):
raise NotImplementedErrorIdempotent Checkpoints
The key requirement: processing the same item twice must produce the same result. If a worker crashes right after processing item 1001 but before checkpointing, the new worker will re-process item 1001. This must be safe.
async def process_item_idempotent(self, item):
"""Process item only if not already processed."""
# Check if already processed (idempotency key)
if await self.store.is_processed(self.task_id, item['id']):
return # Skip -- already done
# Process
result = await do_work(item)
# Mark as processed atomically
await self.store.mark_processed(
self.task_id,
item['id'],
result,
)Saga Pattern: Multi-Service Tasks
When a long-running task spans multiple services (reserve inventory, charge payment, create shipment), you can’t use a traditional database transaction. The saga pattern breaks the task into steps, each with a compensation action that undoes the step if a later step fails.
Orchestrator-Based Saga
class OrderSaga:
"""Orchestrates multi-step order processing."""
def __init__(self):
self.steps = [
SagaStep(
name='reserve_inventory',
execute=self.reserve_inventory,
compensate=self.release_inventory,
),
SagaStep(
name='charge_payment',
execute=self.charge_payment,
compensate=self.refund_payment,
),
SagaStep(
name='create_shipment',
execute=self.create_shipment,
compensate=self.cancel_shipment,
),
SagaStep(
name='send_confirmation',
execute=self.send_confirmation,
compensate=None, # Notification -- no undo needed
),
]
async def execute(self, order):
completed_steps = []
for step in self.steps:
try:
result = await step.execute(order)
completed_steps.append((step, result))
await self.save_step_state(
order.id, step.name, 'completed', result
)
except Exception as e:
await self.save_step_state(
order.id, step.name, 'failed', str(e)
)
# Compensate in reverse order
await self.compensate(order, completed_steps)
raise SagaFailed(
f"Step '{step.name}' failed: {e}"
) from e
async def compensate(self, order, completed_steps):
"""Undo completed steps in reverse order."""
for step, result in reversed(completed_steps):
if step.compensate:
try:
await step.compensate(order, result)
await self.save_step_state(
order.id,
f'compensate_{step.name}',
'completed',
)
except Exception as e:
# Compensation failed -- critical!
# Alert, manual intervention needed
await self.alert_compensation_failure(
order.id, step.name, e
)
# Step implementations
async def reserve_inventory(self, order):
return await inventory_service.reserve(
order.items, ttl_minutes=30
)
async def release_inventory(self, order, reservation):
await inventory_service.release(reservation['id'])
async def charge_payment(self, order):
return await payment_service.charge(
order.user_id,
order.total,
idempotency_key=f"order-{order.id}",
)
async def refund_payment(self, order, charge):
await payment_service.refund(charge['id'])
async def create_shipment(self, order):
return await shipping_service.create(
order.address, order.items
)
async def cancel_shipment(self, order, shipment):
await shipping_service.cancel(shipment['id'])
async def send_confirmation(self, order):
await notification_service.send_email(
order.user_email,
template='order_confirmed',
data=order.to_dict(),
)Task Timeouts and Heartbeats
Long-running tasks can hang silently — waiting on a dead connection, stuck in an infinite loop, or blocked on a resource that will never become available. Timeouts and heartbeats detect this.
import threading
class HeartbeatWorker:
"""Worker that sends heartbeats while processing."""
def __init__(self, task_store, heartbeat_interval=30):
self.store = task_store
self.interval = heartbeat_interval
def process_task(self, task_id, handler):
# Start heartbeat thread
stop_heartbeat = threading.Event()
heartbeat_thread = threading.Thread(
target=self._heartbeat_loop,
args=(task_id, stop_heartbeat),
daemon=True,
)
heartbeat_thread.start()
try:
result = handler()
return result
finally:
stop_heartbeat.set()
heartbeat_thread.join(timeout=5)
def _heartbeat_loop(self, task_id, stop_event):
"""Send periodic heartbeat to prove worker is alive."""
while not stop_event.wait(self.interval):
self.store.update(task_id, {
'lastHeartbeat': datetime.utcnow().isoformat(),
})Reaper: Detecting Dead Tasks
A separate process monitors heartbeats and reclaims tasks from dead workers:
async def reap_stale_tasks(task_store, queue, stale_threshold=120):
"""Find tasks with stale heartbeats and re-enqueue them."""
cutoff = datetime.utcnow() - timedelta(seconds=stale_threshold)
stale_tasks = await task_store.find({
'status': {'$in': ['running', 'progress']},
'lastHeartbeat': {'$lt': cutoff},
})
for task in stale_tasks:
print(f"Task {task['id']} appears dead "
f"(last heartbeat: {task['lastHeartbeat']})")
# Reset and re-enqueue
await task_store.update(task['id'], {
'status': 'retrying',
'error': 'Worker heartbeat timeout',
})
await queue.send({
'taskId': task['id'],
'type': task['type'],
'params': task['params'],
'retryReason': 'heartbeat_timeout',
})Task Scheduling and Priority
Not all tasks are equal. A user waiting for their export should take priority over a background data migration.
# Priority queue with multiple tiers
QUEUES = {
'critical': 'tasks-critical', # Payment processing
'high': 'tasks-high', # User-initiated exports
'normal': 'tasks-normal', # Background processing
'low': 'tasks-low', # Data migrations, cleanup
}
def enqueue_task(task, priority='normal'):
"""Route task to appropriate priority queue."""
queue_name = QUEUES.get(priority, QUEUES['normal'])
queue_client.send(queue_name, task)
# Workers consume from higher-priority queues first
class PriorityWorker:
def poll(self):
"""Check queues in priority order."""
for priority in ['critical', 'high', 'normal', 'low']:
message = queue_client.receive(
QUEUES[priority],
wait_time=1, # Short poll per queue
)
if message:
return message
return NoneRate Limiting and Concurrency Control
Prevent tasks from overwhelming downstream services:
import asyncio
class RateLimitedExecutor:
"""Execute tasks with concurrency and rate limits."""
def __init__(self, max_concurrent=10, max_per_second=50):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.rate_limiter = AsyncRateLimiter(max_per_second)
async def execute(self, task_fn, *args):
async with self.semaphore:
await self.rate_limiter.acquire()
return await task_fn(*args)Database Schema for Task Management
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
priority TEXT NOT NULL DEFAULT 'normal',
params JSONB NOT NULL DEFAULT '{}',
result JSONB,
error TEXT,
progress INTEGER DEFAULT 0 CHECK (progress BETWEEN 0 AND 100),
-- Tracking
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
last_heartbeat TIMESTAMPTZ,
worker_id TEXT,
-- Retry state
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
next_retry_at TIMESTAMPTZ,
-- User context
user_id UUID REFERENCES users(id),
webhook_url TEXT,
-- Checkpoint for resumability
checkpoint JSONB DEFAULT '{}'
);
-- Index for worker polling (find pending tasks)
CREATE INDEX idx_tasks_pending
ON tasks(priority, created_at)
WHERE status = 'pending';
-- Index for reaper (find stale running tasks)
CREATE INDEX idx_tasks_stale
ON tasks(last_heartbeat)
WHERE status IN ('running', 'progress');
-- Index for user's task list
CREATE INDEX idx_tasks_user ON tasks(user_id, created_at DESC);Putting It All Together
Here’s the complete lifecycle of a long-running task in a production system:
User clicks "Export Report"
|
v
API Server: POST /api/tasks
- Validate request
- Create task record (status: pending)
- Enqueue to priority queue
- Return 202 {taskId, statusUrl}
|
v
Message Queue (durable, persistent)
|
v
Worker Pool (auto-scaled)
- Dequeue task
- Set status: running, start heartbeat
- Load checkpoint (if resuming)
- Execute handler:
| - Process batch of items
| - Update progress (40%, 60%, 80%)
| - Save checkpoint every 1000 items
| - If error -> classify retryable vs fatal
v
- On success: status=completed, result={...}
- On failure: status=failed, -> retry queue or DLQ
|
v
Client Notification
- SSE stream pushes progress updates
- Or: client polls GET /api/tasks/{id}
- Or: webhook fires on completion
|
v
Reaper Process (runs every 60s)
- Find tasks with stale heartbeats
- Re-enqueue for another worker to pick upDecision Cheat Sheet
| Question | Answer |
|---|---|
| When to go async? | Any task >500ms or with unpredictable duration |
| Queue technology? | SQS (simple), RabbitMQ (routing), Kafka (ordering + replay) |
| How to track progress? | Dedicated task store (Redis for speed, PostgreSQL for durability) |
| Client notification? | Polling (simple), SSE (real-time), Webhook (server-to-server) |
| Retry strategy? | Exponential backoff + jitter, classify retryable vs fatal |
| Multi-service tasks? | Saga pattern with compensation |
| Large batch tasks? | Checkpoint every N items for resumability |
| Dead workers? | Heartbeat + reaper process |
| Exhausted retries? | Dead letter queue for manual inspection |
| Priority tasks? | Multiple queues, workers poll highest priority first |
Common Pitfalls
Synchronous long tasks. The endpoint that takes 45 seconds to respond works fine in dev, then times out behind an ALB in production. Always async for anything over 500ms.
No idempotency. If a worker processes an item, crashes before acknowledging, and a new worker reprocesses the same item — you’ve charged a customer twice. Every task handler must be idempotent.
Unbounded retries. Retrying forever on a permanently broken task wastes resources. Set a max retry count and use a DLQ.
No visibility timeout. If two workers pick up the same message because visibility isn’t configured, you get duplicate processing. Configure SQS visibility timeout to exceed your maximum task duration.
Ignoring compensation. When step 3 of 5 fails in a saga, steps 1-2 have already taken effect (reserved inventory, charged payment). Without compensation logic, you leak state across services.
No heartbeat or timeout. A worker grabs a task, then dies silently. Without heartbeats, the task sits in “running” forever. The reaper pattern catches these.
Conclusion
Long-running tasks are an inevitability in any system beyond a CRUD app. The patterns are straightforward: decouple submission from execution, track state explicitly, retry intelligently, checkpoint for resilience, and use sagas for multi-service coordination.
The investment in building this infrastructure pays for itself immediately — in user experience (instant response + progress updates), reliability (retries + DLQ + checkpointing), and operational visibility (every task has a clear state, history, and audit trail). Every major platform runs some variation of this architecture, whether it’s Stripe processing payments, YouTube transcoding videos, or GitHub running CI pipelines.










