Skip to content

Concurrent Execution

ConcurrentExecutor runs multiple Claude Code prompts in parallel, each on its own ephemeral Fly machine. All machines are cleaned up regardless of individual success or failure.

Basic batch execution

from flaude import ConcurrentExecutor, ExecutionRequest, MachineConfig

async def run_parallel_reviews(app_name: str) -> None:
    executor = ConcurrentExecutor(app_name, max_concurrency=3)

    requests = [
        ExecutionRequest(
            config=MachineConfig(
                claude_code_oauth_token="sk-ant-oat-...",
                prompt="Review the auth module for security issues",
                repos=["https://github.com/your-org/your-repo"],
            ),
            tag="auth-review",
        ),
        ExecutionRequest(
            config=MachineConfig(
                claude_code_oauth_token="sk-ant-oat-...",
                prompt="Review the billing module for security issues",
                repos=["https://github.com/your-org/your-repo"],
            ),
            tag="billing-review",
        ),
        ExecutionRequest(
            config=MachineConfig(
                claude_code_oauth_token="sk-ant-oat-...",
                prompt="Review the API module for security issues",
                repos=["https://github.com/your-org/your-repo"],
            ),
            tag="api-review",
        ),
    ]

    batch = await executor.run_batch(requests)
    print(f"{batch.succeeded}/{batch.total} reviews succeeded")

Tags

The tag field on ExecutionRequest is a free-form string for correlating results back to requests. It is returned as-is in ExecutionResult.tag. Use it to track which request produced which result.

Concurrency limits

max_concurrency caps the number of machines running simultaneously. Without a limit all machines start at once:

# At most 5 machines run at the same time
executor = ConcurrentExecutor(app_name, max_concurrency=5)

# No limit — all machines start immediately
executor = ConcurrentExecutor(app_name)

Tip

Fly.io accounts have machine creation rate limits. If you're running hundreds of concurrent executions, set a max_concurrency to avoid hitting API limits.

Iterating batch results

BatchResult.results is a list of ExecutionResult in the same order as the input requests. Check .success and .error on each:

batch = await executor.run_batch(requests)

for result in batch.results:
    if result.success:
        print(f"[{result.tag}] OK (exit_code={result.run_result.exit_code})")
    elif result.error is not None:
        print(f"[{result.tag}] ERROR: {result.error}")
    else:
        # Completed but non-zero exit code
        exit_code = result.run_result.exit_code if result.run_result else None
        print(f"[{result.tag}] FAILED (exit_code={exit_code})")

Checking overall success

if batch.all_succeeded:
    print("All tasks completed successfully")
else:
    print(f"{batch.failed} tasks failed out of {batch.total}")

Handling partial failures

ConcurrentExecutor never raises an exception from run_batch — failures are captured in ExecutionResult.error. This means the batch always completes even if individual machines fail:

batch = await executor.run_batch(requests)

failed = [r for r in batch.results if not r.success]
if failed:
    for result in failed:
        tag = result.tag
        if result.error:
            print(f"[{tag}] Exception: {result.error}")
        elif result.run_result:
            print(f"[{tag}] Non-zero exit: {result.run_result.exit_code}")

Warning

ExecutionResult.success is True only when exit_code == 0. A machine that exits with code 1 (e.g. Claude Code encountered an error) is treated as failed even though the machine itself ran successfully.

Single-execution convenience

Use run_one when you have a single config but want the ExecutionResult wrapper (for example, to avoid handling MachineExitError directly):

result = await executor.run_one(
    config=MachineConfig(
        claude_code_oauth_token="sk-ant-oat-...",
        prompt="Generate a test suite for the payments module",
        repos=["https://github.com/your-org/your-repo"],
    ),
    tag="generate-tests",
)

if result.success:
    print("Tests generated successfully")
else:
    print(f"Generation failed: {result.error}")