Skip to content

Execution

Core functions for running Claude Code prompts on Fly.io machines.

run async

run(app_name, config, *, name=None, token=None, wait_timeout=3600.0)

Execute a Claude Code prompt on a Fly machine with guaranteed cleanup.

Creates a Fly machine, waits for the Claude Code process to exit, and always destroys the machine afterwards — regardless of success, failure, or cancellation.

Parameters:

Name Type Description Default
app_name str

The Fly app to run in.

required
config MachineConfig

Machine configuration including prompt, repos, credentials.

required
name str | None

Optional human-readable machine name.

None
token str | None

Explicit Fly API token.

None
wait_timeout float

Max seconds to wait for machine to exit.

3600.0

Returns:

Name Type Description
A RunResult

class:RunResult with exit details.

Raises:

Type Description
FlyAPIError

If machine creation fails.

TimeoutError

If the machine doesn't exit within wait_timeout.

Source code in flaude/runner.py
async def run(
    app_name: str,
    config: MachineConfig,
    *,
    name: str | None = None,
    token: str | None = None,
    wait_timeout: float = 3600.0,
) -> RunResult:
    """Execute a Claude Code prompt on a Fly machine with guaranteed cleanup.

    Creates a Fly machine, waits for the Claude Code process to exit, and
    **always** destroys the machine afterwards — regardless of success, failure,
    or cancellation.

    Args:
        app_name: The Fly app to run in.
        config: Machine configuration including prompt, repos, credentials.
        name: Optional human-readable machine name.
        token: Explicit Fly API token.
        wait_timeout: Max seconds to wait for machine to exit.

    Returns:
        A :class:`RunResult` with exit details.

    Raises:
        FlyAPIError: If machine creation fails.
        TimeoutError: If the machine doesn't exit within *wait_timeout*.
    """
    machine: FlyMachine | None = None
    destroyed = False
    state: str = ""
    exit_code: int | None = None

    try:
        machine = await create_machine(app_name, config, name=name, token=token)
        logger.info("Machine %s created, waiting for exit…", machine.id)

        state, exit_code = await wait_for_machine_exit(
            app_name,
            machine.id,
            token=token,
            timeout=wait_timeout,
        )

        logger.info(
            "Machine %s exited: state=%s exit_code=%s",
            machine.id,
            state,
            exit_code,
        )
    finally:
        if machine is not None:
            logger.info("Destroying machine %s (finally block)", machine.id)
            destroyed = await _cleanup_machine(app_name, machine.id, token=token)
            logger.info(
                "Machine %s cleanup %s",
                machine.id,
                "succeeded" if destroyed else "FAILED",
            )

    return RunResult(
        machine_id=machine.id,
        exit_code=exit_code,
        state=state,
        destroyed=destroyed,
    )

run_and_destroy async

run_and_destroy(app_name, config, *, name=None, token=None, wait_timeout=3600.0, raise_on_failure=True)

Execute a Claude Code prompt with automatic cleanup, optionally raising on failure.

This is the recommended entry point. It wraps :func:run and optionally raises :class:MachineExitError when the process exits with a non-zero code.

Parameters:

Name Type Description Default
app_name str

The Fly app to run in.

required
config MachineConfig

Machine configuration.

required
name str | None

Optional machine name.

None
token str | None

Explicit Fly API token.

None
wait_timeout float

Max seconds to wait for exit.

3600.0
raise_on_failure bool

If True (default), raise on non-zero exit codes.

True

Returns:

Name Type Description
A RunResult

class:RunResult with exit details.

Source code in flaude/runner.py
async def run_and_destroy(
    app_name: str,
    config: MachineConfig,
    *,
    name: str | None = None,
    token: str | None = None,
    wait_timeout: float = 3600.0,
    raise_on_failure: bool = True,
) -> RunResult:
    """Execute a Claude Code prompt with automatic cleanup, optionally raising on
    failure.

    This is the recommended entry point. It wraps :func:`run` and optionally
    raises :class:`MachineExitError` when the process exits with a non-zero
    code.

    Args:
        app_name: The Fly app to run in.
        config: Machine configuration.
        name: Optional machine name.
        token: Explicit Fly API token.
        wait_timeout: Max seconds to wait for exit.
        raise_on_failure: If True (default), raise on non-zero exit codes.

    Returns:
        A :class:`RunResult` with exit details.
    """
    result = await run(
        app_name,
        config,
        name=name,
        token=token,
        wait_timeout=wait_timeout,
    )

    if raise_on_failure and _is_failure(result.exit_code, result.state):
        raise MachineExitError(
            machine_id=result.machine_id,
            exit_code=result.exit_code,
            state=result.state,
        )

    return result

run_with_logs async

run_with_logs(app_name, config, *, name=None, token=None, wait_timeout=3600.0, item_timeout=None, total_timeout=None, collector=None, server=None, server_port=0, include_stderr=False)

Launch a Claude Code execution and return a streaming log handle.

Sets up log drain infrastructure before creating the machine so that no early log lines are lost. The returned :class:StreamingRun yields log lines as they arrive and guarantees machine cleanup.

Parameters:

Name Type Description Default
app_name str

The Fly app to run in.

required
config MachineConfig

Machine configuration (prompt, repos, credentials, …).

required
name str | None

Optional human-readable machine name.

None
token str | None

Explicit Fly API token.

None
wait_timeout float

Max seconds to wait for machine to exit.

3600.0
item_timeout float | None

Per-line timeout for the log stream (None = no limit).

None
total_timeout float | None

Overall timeout for the log stream (None = no limit).

None
collector LogCollector | None

Existing :class:LogCollector to reuse (e.g. for concurrent runs sharing one server). If None, a new one is created.

None
server LogDrainServer | None

Existing :class:LogDrainServer to reuse. If None, a new server is started and will be stopped on cleanup.

None
server_port int

Port for the auto-created server (0 = auto-assign).

0
include_stderr bool

Whether the log stream should include stderr lines.

False

Returns:

Name Type Description
A StreamingRun

class:StreamingRun that can be iterated for log lines.

Raises:

Type Description
FlyAPIError

If machine creation fails.

Source code in flaude/lifecycle.py
async def run_with_logs(
    app_name: str,
    config: MachineConfig,
    *,
    name: str | None = None,
    token: str | None = None,
    wait_timeout: float = 3600.0,
    item_timeout: float | None = None,
    total_timeout: float | None = None,
    collector: LogCollector | None = None,
    server: LogDrainServer | None = None,
    server_port: int = 0,
    include_stderr: bool = False,
) -> StreamingRun:
    """Launch a Claude Code execution and return a streaming log handle.

    Sets up log drain infrastructure **before** creating the machine so
    that no early log lines are lost.  The returned :class:`StreamingRun`
    yields log lines as they arrive and guarantees machine cleanup.

    Args:
        app_name: The Fly app to run in.
        config: Machine configuration (prompt, repos, credentials, …).
        name: Optional human-readable machine name.
        token: Explicit Fly API token.
        wait_timeout: Max seconds to wait for machine to exit.
        item_timeout: Per-line timeout for the log stream (``None`` = no limit).
        total_timeout: Overall timeout for the log stream (``None`` = no limit).
        collector: Existing :class:`LogCollector` to reuse (e.g. for concurrent
            runs sharing one server).  If ``None``, a new one is created.
        server: Existing :class:`LogDrainServer` to reuse.  If ``None``, a new
            server is started and will be stopped on cleanup.
        server_port: Port for the auto-created server (0 = auto-assign).
        include_stderr: Whether the log stream should include stderr lines.

    Returns:
        A :class:`StreamingRun` that can be iterated for log lines.

    Raises:
        FlyAPIError: If machine creation fails.
    """
    owns_server = server is None

    # --- 1. Set up log drain infrastructure BEFORE machine creation --------
    if collector is None:
        collector = LogCollector()

    if server is None:
        server = LogDrainServer(
            collector,
            port=server_port,
            include_stderr=include_stderr,
        )
        await server.start()
        logger.info("Log drain server started on port %s", server.actual_port)

    # --- 2. Create the machine --------------------------------------------
    machine: FlyMachine | None = None
    try:
        machine = await create_machine(app_name, config, name=name, token=token)
        logger.info("Machine %s created for streaming run", machine.id)
    except Exception:
        # If machine creation fails and we own the server, clean up
        if owns_server:
            await server.stop()
        raise

    # --- 3. Subscribe to logs for this machine ----------------------------
    queue = await collector.subscribe(machine.id)

    # --- 4. Wrap in a LogStream -------------------------------------------
    log_stream = LogStream(
        queue,
        item_timeout=item_timeout,
        total_timeout=total_timeout,
    )

    # --- 5. Kick off background wait + destroy task -----------------------
    result_task = asyncio.create_task(
        _wait_signal_destroy(
            app_name,
            machine,
            collector,
            token=token,
            wait_timeout=wait_timeout,
        ),
        name=f"flaude-wait-{machine.id}",
    )

    return StreamingRun(
        log_stream=log_stream,
        result_future=result_task,
        collector=collector,
        server=server,
        machine=machine,
        owns_server=owns_server,
    )

StreamingRun

StreamingRun(*, log_stream, result_future, collector, server, machine, owns_server)

Handle for a running machine execution with live log streaming.

Acts as an async iterator that yields log lines from the machine's stdout. After iteration completes (or on early exit), call :meth:result to get the :class:RunResult with exit details.

Usage::

run = await run_with_logs(app, config)
async for line in run:
    print(line)
result = await run.result()

The object can also be used as an async context manager for guaranteed cleanup::

async with await run_with_logs(app, config) as run:
    async for line in run:
        print(line)
# machine destroyed, server stopped
Source code in flaude/lifecycle.py
def __init__(
    self,
    *,
    log_stream: LogStream,
    result_future: asyncio.Task[RunResult],
    collector: LogCollector,
    server: LogDrainServer | None,
    machine: FlyMachine,
    owns_server: bool,
) -> None:
    self._log_stream = log_stream
    self._result_future = result_future
    self._collector = collector
    self._server = server
    self._machine = machine
    self._owns_server = owns_server
    self._cleaned_up = False
    self._collected_logs: list[str] = []

machine_id property

machine_id

The Fly machine ID for this execution.

log_stream property

log_stream

The underlying :class:LogStream instance.

done property

done

True if the log stream has finished.

collected_logs property

collected_logs

Log lines collected so far during async iteration.

result async

result(*, raise_on_failure=True)

Wait for the machine to exit and return the :class:RunResult.

This also triggers cleanup of the log drain if not already done.

The exit code is determined from the Fly Machines API response. When the API does not populate the exit code (e.g. because the machine was force-destroyed or reached failed state without a clean process exit), the collected log lines are searched for a [flaude:exit:N] marker written by entrypoint.sh.

Parameters:

Name Type Description Default
raise_on_failure bool

If True (default), raise :class:MachineExitError when the machine exits with a non-zero code or a failed state, including any collected log lines in the exception.

True

Raises:

Type Description
MachineExitError

If raise_on_failure is True and the run is considered a failure (non-zero exit or failed state).

Source code in flaude/lifecycle.py
async def result(self, *, raise_on_failure: bool = True) -> RunResult:
    """Wait for the machine to exit and return the :class:`RunResult`.

    This also triggers cleanup of the log drain if not already done.

    The exit code is determined from the Fly Machines API response.
    When the API does not populate the exit code (e.g. because the
    machine was force-destroyed or reached ``failed`` state without a
    clean process exit), the collected log lines are searched for a
    ``[flaude:exit:N]`` marker written by *entrypoint.sh*.

    Args:
        raise_on_failure: If True (default), raise :class:`MachineExitError`
            when the machine exits with a non-zero code or a ``failed``
            state, including any collected log lines in the exception.

    Raises:
        MachineExitError: If *raise_on_failure* is True and the run is
            considered a failure (non-zero exit or ``failed`` state).
    """
    try:
        run_result = await self._result_future
    finally:
        await self.cleanup()

    # Prefer the log-based exit code over the Fly API exit code.
    # The Fly API reports the VM-level exit code which includes the
    # kernel shutdown sequence — this can be non-zero even when Claude
    # Code (and the entrypoint) exited cleanly.  entrypoint.sh always
    # writes [flaude:exit:N] with the *real* process exit code.
    log_exit_code = extract_exit_code_from_logs(self._collected_logs)
    effective_exit_code = (
        log_exit_code if log_exit_code is not None else run_result.exit_code
    )

    # Extract workspace manifest from logs
    workspace_files = extract_workspace_manifest_from_logs(self._collected_logs)

    if raise_on_failure and _is_failure(effective_exit_code, run_result.state):
        raise MachineExitError(
            machine_id=run_result.machine_id,
            exit_code=effective_exit_code,
            state=run_result.state,
            logs=self._collected_logs,
        )

    # Return enriched result with workspace files if found
    if workspace_files:
        return RunResult(
            machine_id=run_result.machine_id,
            exit_code=effective_exit_code,
            state=run_result.state,
            destroyed=run_result.destroyed,
            workspace_files=workspace_files,
        )

    return run_result

cleanup async

cleanup()

Stop the log drain server (if owned) and release resources.

Safe to call multiple times — subsequent calls are no-ops.

Source code in flaude/lifecycle.py
async def cleanup(self) -> None:
    """Stop the log drain server (if owned) and release resources.

    Safe to call multiple times — subsequent calls are no-ops.
    """
    if self._cleaned_up:
        return
    self._cleaned_up = True

    if self._owns_server and self._server is not None:
        try:
            await self._server.stop()
            logger.debug("Log drain server stopped (owned by StreamingRun)")
        except Exception:
            logger.warning("Error stopping log drain server", exc_info=True)

RunResult dataclass

RunResult(machine_id, exit_code, state, destroyed, workspace_files=())

Result of a completed flaude execution.

Attributes:

Name Type Description
machine_id str

The Fly machine ID that ran the task.

exit_code int | None

Process exit code (0 = success).

state str

Final machine state (e.g. stopped, failed).

destroyed bool

Whether the machine was successfully destroyed.

workspace_files tuple[str, ...]

Tuple of relative file paths in the workspace.

wait_for_machine_exit async

wait_for_machine_exit(app_name, machine_id, *, token=None, poll_interval=_POLL_INTERVAL_SECONDS, timeout=3600.0)

Poll a Fly machine until it reaches a terminal state.

Uses the Fly Machines API GET /machines/{id}/wait endpoint first, falling back to polling GET /machines/{id} if wait is unavailable.

Parameters:

Name Type Description Default
app_name str

The Fly app the machine belongs to.

required
machine_id str

The machine to wait on.

required
token str | None

Explicit API token.

None
poll_interval float

Seconds between poll attempts (fallback only).

_POLL_INTERVAL_SECONDS
timeout float

Maximum seconds to wait before giving up.

3600.0

Returns:

Type Description
str

A tuple of (final_state, exit_code). exit_code may be None if

int | None

the machine was destroyed before we could read it.

Raises:

Type Description
TimeoutError

If the machine doesn't exit within timeout.

Source code in flaude/runner.py
async def wait_for_machine_exit(
    app_name: str,
    machine_id: str,
    *,
    token: str | None = None,
    poll_interval: float = _POLL_INTERVAL_SECONDS,
    timeout: float = 3600.0,
) -> tuple[str, int | None]:
    """Poll a Fly machine until it reaches a terminal state.

    Uses the Fly Machines API ``GET /machines/{id}/wait`` endpoint first,
    falling back to polling ``GET /machines/{id}`` if wait is unavailable.

    Args:
        app_name: The Fly app the machine belongs to.
        machine_id: The machine to wait on.
        token: Explicit API token.
        poll_interval: Seconds between poll attempts (fallback only).
        timeout: Maximum seconds to wait before giving up.

    Returns:
        A tuple of (final_state, exit_code). exit_code may be None if
        the machine was destroyed before we could read it.

    Raises:
        asyncio.TimeoutError: If the machine doesn't exit within *timeout*.
    """
    # Try the blocking wait endpoint first — it long-polls until the machine
    # reaches the requested state, which is more efficient than polling.
    try:
        await fly_get(
            f"/apps/{app_name}/machines/{machine_id}/wait?state=stopped",
            token=token,
            timeout=timeout,
        )
        # Wait succeeded — now fetch final state to get exit code
        data = await fly_get(
            f"/apps/{app_name}/machines/{machine_id}",
            token=token,
        )
        if data and isinstance(data, dict):
            state = data.get("state", "unknown")
            exit_code = _extract_exit_code(data)
            return state, exit_code
        return "stopped", None
    except FlyAPIError:
        # Wait endpoint failed — fall back to polling
        logger.debug("Wait endpoint failed for %s, falling back to polling", machine_id)
    except TimeoutError:
        raise

    # Fallback: poll GET /machines/{id}
    deadline = asyncio.get_event_loop().time() + timeout
    while True:
        if asyncio.get_event_loop().time() > deadline:
            raise TimeoutError(f"Machine {machine_id} did not exit within {timeout}s")

        try:
            data = await fly_get(
                f"/apps/{app_name}/machines/{machine_id}",
                token=token,
            )
        except FlyAPIError as exc:
            if exc.status_code == 404:
                # Machine already gone — treat as destroyed
                return "destroyed", None
            raise

        if data and isinstance(data, dict):
            state = data.get("state", "unknown")
            if state in _TERMINAL_STATES:
                exit_code = _extract_exit_code(data)
                return state, exit_code

        await asyncio.sleep(poll_interval)

run_session_turn async

run_session_turn(app_name, machine_id, config, *, token=None, wait_timeout=3600.0, raise_on_failure=True)

Execute a single turn of a session on an existing stopped machine.

Updates the machine's config (new prompt, same session ID), starts it, waits for the Claude Code process to exit, and leaves the machine in stopped state for the next turn. Does NOT destroy the machine.

Parameters:

Name Type Description Default
app_name str

The Fly app the session belongs to.

required
machine_id str

The stopped machine to resume.

required
config MachineConfig

Updated config with new prompt (must include session_id).

required
token str | None

Explicit Fly API token.

None
wait_timeout float

Max seconds to wait for machine to exit.

3600.0
raise_on_failure bool

If True, raise on non-zero exit.

True

Returns:

Name Type Description
A RunResult

class:RunResult with exit details. destroyed is always False.

Source code in flaude/runner.py
async def run_session_turn(
    app_name: str,
    machine_id: str,
    config: MachineConfig,
    *,
    token: str | None = None,
    wait_timeout: float = 3600.0,
    raise_on_failure: bool = True,
) -> RunResult:
    """Execute a single turn of a session on an existing stopped machine.

    Updates the machine's config (new prompt, same session ID), starts it,
    waits for the Claude Code process to exit, and leaves the machine in
    ``stopped`` state for the next turn. Does NOT destroy the machine.

    Args:
        app_name: The Fly app the session belongs to.
        machine_id: The stopped machine to resume.
        config: Updated config with new ``prompt`` (must include ``session_id``).
        token: Explicit Fly API token.
        wait_timeout: Max seconds to wait for machine to exit.
        raise_on_failure: If True, raise on non-zero exit.

    Returns:
        A :class:`RunResult` with exit details. ``destroyed`` is always False.
    """
    # 1. Update machine config (injects new FLAUDE_PROMPT + FLAUDE_SESSION_ID)
    await update_machine(app_name, machine_id, config, token=token)

    # 2. Start the stopped machine
    await start_machine(app_name, machine_id, token=token)
    logger.info("Session turn started on machine %s", machine_id)

    # 3. Wait for exit (machine stops itself after claude -p exits)
    state, exit_code = await wait_for_machine_exit(
        app_name,
        machine_id,
        token=token,
        timeout=wait_timeout,
    )

    logger.info(
        "Session turn complete on machine %s: state=%s exit_code=%s",
        machine_id,
        state,
        exit_code,
    )

    result = RunResult(
        machine_id=machine_id,
        exit_code=exit_code,
        state=state,
        destroyed=False,
    )

    if raise_on_failure and _is_failure(result.exit_code, result.state):
        raise MachineExitError(
            machine_id=machine_id,
            exit_code=exit_code,
            state=state,
        )

    return result

extract_exit_code_from_logs

extract_exit_code_from_logs(logs)

Parse the [flaude:exit:N] marker written by entrypoint.sh.

Scans logs in reverse order and returns the first exit code found. Returns None if no marker is present — e.g. when the container was killed before the Claude Code process could write it.

This is used as a fallback when the Fly Machines API does not report an exit code (which can happen if the machine is force-destroyed or reaches the failed state without a clean exit).

Parameters:

Name Type Description Default
logs list[str]

Log lines collected from the machine's stdout/stderr.

required

Returns:

Type Description
int | None

The integer exit code extracted from [flaude:exit:N], or

int | None

None if no such marker is found.

Source code in flaude/runner.py
def extract_exit_code_from_logs(logs: list[str]) -> int | None:
    """Parse the ``[flaude:exit:N]`` marker written by *entrypoint.sh*.

    Scans *logs* in reverse order and returns the first exit code found.
    Returns ``None`` if no marker is present — e.g. when the container was
    killed before the Claude Code process could write it.

    This is used as a fallback when the Fly Machines API does not report
    an exit code (which can happen if the machine is force-destroyed or
    reaches the ``failed`` state without a clean exit).

    Args:
        logs: Log lines collected from the machine's stdout/stderr.

    Returns:
        The integer exit code extracted from ``[flaude:exit:N]``, or
        ``None`` if no such marker is found.
    """
    for line in reversed(logs):
        m = _EXIT_MARKER_RE.search(line)
        if m:
            return int(m.group(1))
    return None