Log Infrastructure¶
HTTP log drain server and async log streaming components.
LogDrainServer
¶
Minimal HTTP server that receives Fly.io log drain POSTs.
Fly.io sends logs as NDJSON via HTTP POST to a configured endpoint. This server:
- Listens on the specified host/port
- Accepts POST requests on any path
- Parses NDJSON bodies into log entries
- Filters for stdout from subscribed machines
- Pushes matching lines into the LogCollector queues
The server is implemented using stdlib asyncio to avoid extra
dependencies. It handles HTTP/1.1 at a minimal level sufficient for
Fly log drains.
Initialize the log drain server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
collector
|
LogCollector
|
The :class: |
required |
host
|
str
|
Network interface to bind to. Defaults to |
'0.0.0.0'
|
port
|
int
|
TCP port to listen on. Use |
0
|
include_stderr
|
bool
|
If True, stderr lines are forwarded to collectors in addition to stdout. Defaults to False. |
False
|
Source code in flaude/log_drain.py
start
async
¶
Start the HTTP server.
Source code in flaude/log_drain.py
stop
async
¶
Stop the HTTP server and signal all collectors.
Source code in flaude/log_drain.py
LogCollector
¶
Thread-safe registry mapping machine IDs to asyncio queues.
Each machine gets its own queue. Log entries are routed by machine ID.
Source code in flaude/log_drain.py
subscribe
async
¶
Get or create a queue for the given machine ID.
The queue yields str log lines. A None sentinel indicates
that the machine has stopped and no more logs will arrive.
Source code in flaude/log_drain.py
push
async
¶
Push a log line to the queue for machine_id.
If no subscriber exists for this machine, the line is silently dropped.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
machine_id
|
str
|
The Fly machine ID whose queue should receive the line. |
required |
line
|
str
|
The log line string to enqueue. |
required |
Source code in flaude/log_drain.py
finish
async
¶
Signal that no more logs will arrive for machine_id.
Pushes a None sentinel and removes the queue from the registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
machine_id
|
str
|
The Fly machine ID whose queue should be finalized. |
required |
Source code in flaude/log_drain.py
finish_all
async
¶
Signal completion for all registered machines.
LogStream
¶
Async iterator wrapper that yields parsed log lines from a queue.
Provides backpressure handling, per-item and overall timeouts, and clean shutdown when the machine completes (sentinel received) or on cancellation.
Usage::
stream = LogStream(queue, item_timeout=30.0, total_timeout=3600.0)
async for line in stream:
print(line)
# Check final state
assert stream.done
print(f"Received {stream.lines_yielded} lines")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue
|
Queue[str | None]
|
The asyncio.Queue to read from (from |
required |
item_timeout
|
float | None
|
Max seconds to wait for each individual log line.
|
None
|
total_timeout
|
float | None
|
Max seconds for the entire iteration.
|
None
|
Source code in flaude/log_drain.py
collect
async
¶
Consume all remaining lines and return them as a list.
Convenience method equivalent to [line async for line in self].
Source code in flaude/log_drain.py
LogEntry
dataclass
¶
Parsed representation of a single Fly.io log drain entry.
Attributes:
| Name | Type | Description |
|---|---|---|
machine_id |
str
|
The Fly machine ID that produced this log entry. |
message |
str
|
The log line content. |
stream |
str
|
Output stream — |
timestamp |
str
|
ISO 8601 timestamp string from the log drain payload (may be empty). |
app_name |
str
|
The Fly.io application name (may be empty if not present in payload). |
raw |
dict[str, Any]
|
The original parsed JSON dict from the log drain request. |
parse_log_entry
¶
Parse a single Fly.io log drain JSON object into a LogEntry.
Fly log drain NDJSON format varies, but typically contains:
- fly.app.instance or fly.machine.id: machine identifier
- message, log, or msg: the log line content
- stream: "stdout" or "stderr" for user process output
- source: log origin — "app" for user code, "fly"/"proxy"/
"machine" for Fly infrastructure messages
Stream determination logic:
- If the
streamfield is explicitly set, use it as-is. - If no
streamfield butsourceindicates Fly infrastructure ("fly","proxy", or"machine"), the entry is classified as"system"— a Fly-generated lifecycle or health message that should not appear in user-facing log output. - If neither field is set, default to
"stdout"(user application log without explicit stream tagging).
Returns None if the entry cannot be parsed or is missing required fields.
Source code in flaude/log_drain.py
parse_ndjson
¶
Parse an NDJSON (newline-delimited JSON) body into a list of dicts.
Silently skips malformed lines.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
body
|
bytes
|
Raw HTTP request body containing newline-delimited JSON. |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of successfully parsed JSON objects. Malformed lines are omitted. |
Source code in flaude/log_drain.py
drain_queue
async
¶
Drain all lines from a queue until the sentinel is received.
This is a convenience function for collecting all output. For streaming,
use async_iter_queue instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue
|
Queue[str | None]
|
The queue to drain (from |
required |
timeout
|
float | None
|
Max seconds to wait for all output. None = wait forever. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of log lines (excluding the sentinel). |
Source code in flaude/log_drain.py
async_iter_queue
async
¶
Async iterator that yields log lines from a queue until sentinel.
Usage::
q = await collector.subscribe("machine-id")
async for line in async_iter_queue(q):
print(line)