Skip to content

Streaming Logs

run_with_logs lets you watch Claude Code output line by line as it arrives, rather than waiting for the entire execution to complete. It returns a StreamingRun that works as both an async iterator and an async context manager.

Basic usage

from flaude import MachineConfig, run_with_logs

async def stream_claude(app_name: str) -> None:
    config = MachineConfig(
        claude_code_oauth_token="sk-ant-oat-...",
        prompt="Refactor the auth module to use JWT tokens",
        repos=["https://github.com/your-org/your-repo"],
    )

    async with await run_with_logs(app_name, config) as stream:
        async for line in stream:
            print(line)

    result = await stream.result()
    print(f"Done: exit={result.exit_code}, state={result.state}")

The async with block guarantees machine cleanup when the block exits — even if an exception is raised during iteration.

Note

await run_with_logs(...) returns the StreamingRun object. The async with wraps the already-created object. This is why the pattern is async with await run_with_logs(...).

Timeouts

Per-line timeout

item_timeout sets the maximum number of seconds to wait for each individual log line. If no line arrives within that window, iteration stops silently:

async with await run_with_logs(
    app_name, config, item_timeout=30.0
) as stream:
    async for line in stream:
        print(line)

Warning

A per-line timeout that is too short can cut off long-running Claude Code tasks that produce infrequent output. Use total_timeout if you want a hard wall-clock limit.

Total timeout

total_timeout caps the total time spent iterating the stream, regardless of per-line activity:

async with await run_with_logs(
    app_name, config, total_timeout=3600.0
) as stream:
    async for line in stream:
        print(line)

Both can be combined — whichever limit is hit first stops iteration.

Collecting all logs

If you want all log lines as a list after the stream completes, use stream.collected_logs:

async with await run_with_logs(app_name, config) as stream:
    async for line in stream:
        pass  # or process each line in real time

all_logs = stream.collected_logs
print(f"Total lines: {len(all_logs)}")

collected_logs accumulates every line yielded by the async iterator. It is only populated for lines you have actually iterated over — if you stop iteration early, you get a partial list.

Alternatively, use LogStream.collect() directly:

async with await run_with_logs(app_name, config) as stream:
    all_lines = await stream.log_stream.collect()

Getting the result

Call stream.result() after iteration to get the RunResult. By default it raises MachineExitError on non-zero exits:

from flaude import MachineExitError

async with await run_with_logs(app_name, config) as stream:
    async for line in stream:
        print(line)

try:
    result = await stream.result()
    print(f"Exit code: {result.exit_code}")
except MachineExitError as exc:
    print(f"Failed: exit_code={exc.exit_code}, state={exc.state}")
    print("Captured logs:")
    for line in exc.logs:
        print(f"  {line}")

Pass raise_on_failure=False to get the RunResult without an exception, even on failure:

result = await stream.result(raise_on_failure=False)
if result.exit_code != 0:
    print(f"Non-zero exit: {result.exit_code}")

Tip

When using raise_on_failure=False, check result.state as well as result.exit_code. A machine that reaches the failed state (OOM kill, entrypoint crash) may have exit_code=None.

Structured JSON output

By default, Claude Code outputs human-readable text. Set output_format="stream-json" to receive structured NDJSON events instead — each line is a self-contained JSON object with a type field:

Type Description
system Session init, hook events
assistant Complete assistant message with content blocks
result Final result with total_cost_usd, usage, duration_ms
config = MachineConfig(
    claude_code_oauth_token="sk-ant-oat-...",
    prompt="Refactor the auth module",
    repos=["https://github.com/your-org/your-repo"],
    output_format="stream-json",
)

async with await run_with_logs(app_name, config) as stream:
    async for line in stream:
        import json
        event = json.loads(line)
        if event.get("type") == "result":
            print(f"Cost: ${event['total_cost_usd']:.4f}")
        elif event.get("type") == "assistant":
            print(event["message"]["content"])

Each line in the stream is a JSON string. The log pipeline does not parse it — callers are responsible for json.loads() on each line.

Note

Lines from entrypoint.sh (like [flaude] Starting execution and [flaude:exit:0]) are plain text, not JSON. A robust consumer should handle json.JSONDecodeError for these lines.

API reference

See LogStream and StreamingRun for the full API.