Log Drain Infrastructure¶
flaude's real-time log streaming relies on Fly.io's log drain mechanism combined with a lightweight local HTTP server. This page explains how the pieces fit together.
How Fly.io delivers logs¶
Fly.io machines emit logs via stdout and stderr. When a log drain is configured on an app, Fly delivers those logs to an HTTP endpoint as NDJSON (newline-delimited JSON):
POST http://your-drain-url/
Content-Type: application/x-ndjson
{"fly":{"app":{"instance":"abc123"}},"message":"Starting Claude Code...","stream":"stdout"}
{"fly":{"app":{"instance":"abc123"}},"message":"Reading files...","stream":"stdout"}
Each line is a JSON object. Multiple log lines may arrive in a single POST body. Fly sends batches every few hundred milliseconds.
LogDrainServer¶
LogDrainServer is a minimal HTTP server built with stdlib asyncio.start_server — no
web framework, no ASGI, no extra dependencies.
LogDrainServer
├── asyncio.start_server (TCP server)
├── _handle_connection (one coroutine per connection)
│ └── _process_request
│ ├── Read HTTP request line + headers
│ ├── Read body (Content-Length bytes)
│ ├── parse_ndjson(body) → list of dicts
│ └── _route_entries → LogCollector.push(machine_id, message)
└── 200 OK response (no body)
The server binds to 0.0.0.0:0 by default — port 0 tells the OS to assign an available
ephemeral port. After server.start(), the actual port is available as server.actual_port.
Stream classification¶
Not all log entries that Fly delivers are user application output. flaude classifies each entry into one of three streams:
| Stream | Source | Handling |
|---|---|---|
stdout |
User process stdout | Forwarded to LogCollector |
stderr |
User process stderr | Forwarded only if include_stderr=True |
system |
Fly infrastructure (lifecycle events, health checks) | Always filtered out |
The classification logic:
- If the entry has an explicit
streamfield, use it directly. - Otherwise, check the
sourcefield — values"fly","proxy", or"machine"indicate Fly infrastructure and are classified as"system". - If neither field is present, default to
"stdout"(user code without stream metadata).
System log entries — machine boot events, health check results, proxy messages — are silently discarded so they never appear in your application's log stream.
LogCollector¶
LogCollector is a registry that maps machine IDs to asyncio.Queue instances:
LogCollector
└── _queues: dict[machine_id → asyncio.Queue[str | None]]
subscribe(machine_id) → asyncio.Queue # create queue for this machine
push(machine_id, line) # route line to correct queue
finish(machine_id) # push None sentinel, remove queue
finish_all() # finish all registered machines
When LogDrainServer routes a log entry, it calls collector.push(machine_id, message).
If no queue is registered for that machine (e.g. subscription hasn't been set up yet), the
line is silently dropped. This is why run_with_logs subscribes to the collector
before creating the machine — to avoid losing early log lines.
The sentinel pattern¶
The end of a machine's log stream is signalled by pushing None (the sentinel) into the
queue:
Queue contents during normal operation:
"Cloning repo..."
"Running Claude Code..."
"Found 3 issues."
None ← sentinel: stream is done
LogStream watches for None and raises StopAsyncIteration when it sees it, which
cleanly ends the async for loop.
LogCollector.finish(machine_id) pushes the sentinel and removes the queue from the
registry. It is called from the _wait_signal_destroy background task in lifecycle.py
after the machine exits — regardless of success or failure, via a finally block.
LogStream¶
LogStream is the async iterator wrapper that your code consumes:
stream = LogStream(queue, item_timeout=30.0, total_timeout=3600.0)
async for line in stream:
print(line)
Internally it calls asyncio.wait_for(queue.get(), timeout=effective_timeout) for each
item. The effective timeout is the minimum of item_timeout and the remaining
total_timeout budget:
When a timeout fires, LogStream sets stream.timed_out = True, marks itself as done,
and raises StopAsyncIteration — ending the async for loop without raising an exception
to the caller. Use asyncio.TimeoutError (from wait_timeout on run_and_destroy /
run_with_logs) for hard wall-clock limits on the entire execution.
End-to-end flow¶
Machine stdout
│
▼ Fly.io log drain
LogDrainServer (HTTP POST, NDJSON)
│ parse_ndjson + parse_log_entry
│ filter: stream != "system"
▼
LogCollector.push(machine_id, message)
│
▼ asyncio.Queue[str | None]
LogStream.__anext__()
│ asyncio.wait_for(queue.get(), timeout)
▼
async for line in stream:
print(line) ← your code
When the machine exits: