Skip to content

Building on flaude

This guide covers patterns for building tools and services on top of flaude — things like CI integrations, automated review pipelines, and multi-tenant execution services.

Sharing log infrastructure across concurrent machines

When running many machines in parallel with run_with_logs, creating a separate LogDrainServer per machine wastes resources. Instead, share a single server and collector across all machines:

from flaude import (
    LogCollector,
    LogDrainServer,
    MachineConfig,
    run_with_logs,
)
import asyncio

async def run_parallel_with_shared_drain(app_name: str, tasks: list[dict]) -> None:
    # One server and collector shared across all machines
    collector = LogCollector()
    server = LogDrainServer(collector, port=0)  # port=0 = auto-assign
    await server.start()

    try:
        streams = await asyncio.gather(*[
            run_with_logs(
                app_name,
                MachineConfig(
                    claude_code_oauth_token="sk-ant-oat-...",
                    prompt=task["prompt"],
                    repos=task["repos"],
                ),
                collector=collector,  # reuse shared collector
                server=server,        # reuse shared server
            )
            for task in tasks
        ])

        # Consume all streams concurrently
        async def consume(stream, label: str) -> None:
            async with stream:
                async for line in stream:
                    print(f"[{label}] {line}")

        await asyncio.gather(*[
            consume(stream, tasks[i]["label"])
            for i, stream in enumerate(streams)
        ])
    finally:
        await server.stop()

Note

When you pass collector= and server= to run_with_logs, the StreamingRun does not own the server and will not stop it on cleanup. You are responsible for calling server.stop() when all machines are done.

Using metadata for tracking

The metadata field on MachineConfig attaches arbitrary key-value pairs to the Fly machine. Use it to correlate machines with your own tracking system:

from flaude import MachineConfig, run_and_destroy

async def run_tracked(app_name: str, job_id: str, user_id: str) -> None:
    config = MachineConfig(
        claude_code_oauth_token="sk-ant-oat-...",
        prompt="Generate a comprehensive test suite for the payments module",
        repos=["https://github.com/your-org/your-repo"],
        metadata={
            "job_id": job_id,
            "user_id": user_id,
            "triggered_by": "api",
        },
    )

    result = await run_and_destroy(app_name, config)
    print(f"Job {job_id} completed: exit_code={result.exit_code}")

Metadata appears in the Fly dashboard and can be queried via the Machines API, making it useful for debugging and cleanup of orphaned machines.

Custom environment variables

Pass extra environment variables to the machine via the env field. These are merged with the variables flaude sets automatically (credentials, repos, prompt):

from flaude import MachineConfig, run_and_destroy

async def run_with_env(app_name: str) -> None:
    config = MachineConfig(
        claude_code_oauth_token="sk-ant-oat-...",
        prompt="Run the integration tests against the staging API",
        repos=["https://github.com/your-org/your-repo"],
        env={
            "API_BASE_URL": "https://staging.api.your-org.com",
            "TEST_TIMEOUT": "120",
        },
    )

    result = await run_and_destroy(app_name, config)

Warning

User-supplied env values are merged after flaude's required variables. Avoid using keys like CLAUDE_CODE_OAUTH_TOKEN, FLAUDE_PROMPT, or FLAUDE_REPOS in env — they will override flaude's values and break execution.

CI integration (GitHub Actions)

Run flaude as a step in a GitHub Actions workflow to automate code review or generation on every pull request:

# .github/workflows/claude-review.yml
name: Claude Code Review
on:
  pull_request:
    types: [opened, synchronize]

jobs:
  review:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install flaude
        run: pip install flaude

      - name: Run Claude review
        env:
          FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
        run: python scripts/review.py
# scripts/review.py
import asyncio
import os
from flaude import MachineConfig, MachineExitError, ensure_app, run_and_destroy

async def main() -> None:
    pr_number = os.environ["PR_NUMBER"]
    repo = os.environ["GITHUB_REPOSITORY"]

    app = await ensure_app("my-flaude-ci")

    config = MachineConfig(
        claude_code_oauth_token=os.environ["CLAUDE_CODE_OAUTH_TOKEN"],
        github_username=os.environ["GITHUB_ACTOR"],
        github_token=os.environ["GITHUB_TOKEN"],
        prompt=(
            f"Review the changes in PR #{pr_number}. "
            "Focus on correctness, security, and test coverage. "
            "Leave a GitHub review comment summarizing your findings."
        ),
        repos=[f"https://github.com/{repo}"],
        metadata={"pr_number": pr_number, "repo": repo},
    )

    try:
        result = await run_and_destroy(app.name, config)
        print(f"Review complete: exit_code={result.exit_code}")
    except MachineExitError as exc:
        print(f"Review failed: {exc}")
        raise SystemExit(1)

asyncio.run(main())

Choosing between ConcurrentExecutor and manual asyncio.gather

Use ConcurrentExecutor when: - You have a fixed list of prompts to run in parallel - You do not need per-machine log streaming - You want built-in concurrency limiting and BatchResult aggregation

executor = ConcurrentExecutor(app_name, max_concurrency=5)
batch = await executor.run_batch(requests)

Use manual asyncio.gather with run_with_logs when: - You need real-time log streaming from each machine simultaneously - You want to react to individual machine output as it arrives - You are building a service that routes logs to different destinations per machine

collector = LogCollector()
server = LogDrainServer(collector)
await server.start()

streams = await asyncio.gather(*[
    run_with_logs(app_name, cfg, collector=collector, server=server)
    for cfg in configs
])

# Route each stream to its own handler concurrently
await asyncio.gather(*[handle_stream(s, label) for s, label in zip(streams, labels)])
await server.stop()

The manual approach gives you full control at the cost of more boilerplate.