A status-based state machine beats a traditional task queue for multi-stage AI pipelines because AI workloads have unpredictable durations, fail in ways that require human-readable state, and need automatic recovery from crashes — three things Celery and RQ handle poorly out of the box.

When your pipeline has 7 stages, each taking anywhere from 30 seconds to 2 hours, and a server restart at any point needs to resume exactly where it left off, the architectural choice between a state machine and a task queue determines whether your system is recoverable or fragile.

What makes AI pipelines different from normal background jobs

Traditional task queues were designed for jobs that take seconds — send an email, resize an image, process a webhook. The job starts, runs, finishes, done. If it fails, retry it. The queue manages ordering and concurrency.

AI pipelines break this model in three ways. First, individual stages can run for minutes to hours (video generation, batch image generation). Second, the pipeline is sequential — stage 4 can't start until stage 3 finishes. Third, failures are often partial — you might have 28 of 30 images generated when something crashes, and you need to know exactly which two are missing.

Task queues treat jobs as opaque units. They know "running" or "failed" but not "generated 28 of 30 storyboard images and crashed on image 29." A state machine with a database-backed status field captures exactly that level of detail.

How a status-based state machine works in practice

The pattern is straightforward. Each pipeline job has a status field in the database that progresses through a defined sequence of states. A scheduler polls for jobs that need work and advances them one stage at a time.

# Status progression for a multi-stage AI pipeline
PIPELINE_STATUSES = [
    "pending",           # Job created, waiting to start
    "script_analysis",   # Stage 1: LLM analyzing input
    "audio_generation",  # Stage 2: TTS + music generation
    "transcription",     # Stage 3: Word-level timestamp extraction
    "storyboard",        # Stage 4: Image generation (batch)
    "video_generation",  # Stage 5: Image-to-video (batch)
    "assembly",          # Stage 6: Final render and asset generation
    "complete",          # Done
    "failed",            # Terminal failure
    "cancelled"          # User-cancelled
]

Each status tells you exactly what the system is doing and what it did last. When something crashes, the status field is a snapshot of where to resume.

Why task queues struggle with long-running AI stages

Celery and RQ use broker-based job distribution. A worker picks up a task from Redis or RabbitMQ, runs it, and acks completion. If the worker dies mid-task, the broker eventually requeues the job. This works fine for 5-second jobs. It breaks for 45-minute video generation batches.

The problems compound. Visibility timeouts need to exceed your longest possible stage duration — but you don't know what that is when AI models have variable response times. If you set the timeout too low, the broker requeues a job that's still running, and now two workers are hitting the same API concurrently. Set it too high, and a genuinely failed job sits invisible for an hour before anyone notices.

State machines avoid this entirely. The scheduler checks the database, sees "status = video_generation, updated_at = 47 minutes ago," and can make an intelligent decision: is this still running, or is it stale? That decision can use context — checking whether the worker process is still alive, whether partial results exist in storage — instead of relying on a dumb timeout.

How to implement stale job recovery

Stale job recovery is the strongest argument for the state machine approach. When a server restarts, you need every in-progress job to resume from where it stopped — not restart from scratch, and not sit forgotten in a dead worker's memory.

from datetime import datetime, timedelta

STALE_THRESHOLDS = {
    "script_analysis": timedelta(minutes=10),
    "audio_generation": timedelta(minutes=15),
    "transcription": timedelta(minutes=10),
    "storyboard": timedelta(minutes=45),
    "video_generation": timedelta(hours=2),
    "assembly": timedelta(minutes=20),
}

def recover_stale_jobs(db_session):
    """Find and reset jobs stuck in a processing state."""
    for status, threshold in STALE_THRESHOLDS.items():
        cutoff = datetime.utcnow() - threshold
        stale_jobs = db_session.query(Job).filter(
            Job.status == status,
            Job.updated_at < cutoff,
            Job.cancelled == False
        ).all()

        for job in stale_jobs:
            # Check for partial results before resetting
            completed_items = count_completed_items(job, status)
            job.progress_offset = completed_items
            job.retry_count += 1

            if job.retry_count > 3:
                job.status = "failed"
                job.error = f"Exceeded retries at {status}"
            else:
                # Re-enter the same status — scheduler will pick it up
                job.updated_at = datetime.utcnow()

        db_session.commit()

The key detail is progress_offset. When a storyboard generation stage crashed after completing 28 of 30 images, recovery doesn't regenerate all 30. It picks up at image 29. This saves both time and API costs — which matter when each image generation call costs money.

How to prevent duplicate scheduling with file-based locks

In multi-worker deployments (gunicorn with multiple workers, for example), APScheduler can initialize in each worker process. Without a lock, every worker runs the scheduler, and you get duplicate job processing.

import fcntl
import atexit

LOCK_FILE = "/tmp/pipeline_scheduler.lock"

def acquire_scheduler_lock():
    """Ensure only one worker process runs the scheduler."""
    lock_handle = open(LOCK_FILE, "w")
    try:
        fcntl.flock(lock_handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
        atexit.register(lambda: lock_handle.close())
        return True
    except BlockingIOError:
        lock_handle.close()
        return False

# In your Flask app factory
if acquire_scheduler_lock():
    scheduler = APScheduler()
    scheduler.add_job(
        process_pipeline_jobs,
        trigger="interval",
        seconds=15,
        id="pipeline_processor"
    )
    scheduler.start()

The file-based lock using fcntl.flock is non-blocking and automatically releases when the process dies. No external dependencies like Redis required. The scheduler polls every 15 seconds, checks for jobs in actionable states, and advances them.

How to handle thread-safe cancellation mid-pipeline

Users will cancel jobs while they're running. In a task queue, cancelling a running task means revoking it from the broker — but the worker might already be mid-execution with no clean way to stop. In a state machine, cancellation is a status change that every stage checks before proceeding.

def process_storyboard_stage(job):
    """Generate storyboard images with cancellation checks."""
    scenes = get_scenes(job)

    for i, scene in enumerate(scenes):
        # Check cancellation before each expensive API call
        job_fresh = db.session.query(Job).get(job.id)
        if job_fresh.status == "cancelled":
            cleanup_partial_results(job, "storyboard")
            return

        generate_image(scene, job)
        job.progress = f"{i + 1}/{len(scenes)}"
        job.updated_at = datetime.utcnow()
        db.session.commit()

    # Advance to next stage
    job.status = "video_generation"
    db.session.commit()

The cancellation check happens before each expensive operation — not after. This means you stop spending money the moment the user cancels, and the cleanup function removes any partial assets from storage so you don't accumulate orphaned files.

When a task queue is actually the right choice

State machines aren't universally better. Task queues win when you have high-volume, short-duration, independent jobs — thousands of image thumbnails to generate, emails to send, webhooks to deliver. The broker handles distribution and concurrency better than a polling scheduler at that scale.

The crossover point is when jobs become multi-stage, long-running, and need human-readable progress. If someone needs to look at a dashboard and understand "this job is generating video, it's on clip 3 of 7, and it's been running for 12 minutes" — that's a state machine. If the job is "process this thing and tell me when it's done" — that's a task queue.

The complete state machine pattern for AI pipelines

A state machine built on a database status field, APScheduler for polling, file-based locks for single-scheduler enforcement, and stale job recovery with progress offsets gives you everything a multi-stage AI pipeline needs: crash recovery, human-readable state, cancellation safety, and cost-efficient retries. Task queues solve a different problem — high-throughput job distribution — and bolting on the state management an AI pipeline requires means fighting the abstraction instead of using one designed for the workload.