Skip to content

Log Infrastructure

HTTP log drain server and async log streaming components.

LogDrainServer

LogDrainServer(collector, *, host='0.0.0.0', port=0, include_stderr=False)

Minimal HTTP server that receives Fly.io log drain POSTs.

Fly.io sends logs as NDJSON via HTTP POST to a configured endpoint. This server:

  1. Listens on the specified host/port
  2. Accepts POST requests on any path
  3. Parses NDJSON bodies into log entries
  4. Filters for stdout from subscribed machines
  5. Pushes matching lines into the LogCollector queues

The server is implemented using stdlib asyncio to avoid extra dependencies. It handles HTTP/1.1 at a minimal level sufficient for Fly log drains.

Initialize the log drain server.

Parameters:

Name Type Description Default
collector LogCollector

The :class:LogCollector that receives and routes log entries.

required
host str

Network interface to bind to. Defaults to 0.0.0.0 (all interfaces).

'0.0.0.0'
port int

TCP port to listen on. Use 0 for OS-assigned ephemeral port.

0
include_stderr bool

If True, stderr lines are forwarded to collectors in addition to stdout. Defaults to False.

False
Source code in flaude/log_drain.py
def __init__(
    self,
    collector: LogCollector,
    *,
    host: str = "0.0.0.0",
    port: int = 0,
    include_stderr: bool = False,
) -> None:
    """Initialize the log drain server.

    Args:
        collector: The :class:`LogCollector` that receives and routes log entries.
        host: Network interface to bind to. Defaults to ``0.0.0.0``
            (all interfaces).
        port: TCP port to listen on. Use ``0`` for OS-assigned ephemeral port.
        include_stderr: If True, stderr lines are forwarded to collectors in
            addition to stdout. Defaults to False.
    """
    self.collector = collector
    self.host = host
    self.port = port
    self.include_stderr = include_stderr
    self._server: asyncio.Server | None = None
    self._actual_port: int | None = None

actual_port property

actual_port

The port the server is actually listening on (after start).

url property

url

The base URL of the running server, or None if not started.

start async

start()

Start the HTTP server.

Source code in flaude/log_drain.py
async def start(self) -> None:
    """Start the HTTP server."""
    self._server = await asyncio.start_server(
        self._handle_connection, self.host, self.port
    )
    # Resolve the actual port (useful when port=0 for auto-assignment)
    sockets = self._server.sockets
    if sockets:
        self._actual_port = sockets[0].getsockname()[1]
    logger.info("Log drain server listening on %s:%s", self.host, self._actual_port)

stop async

stop()

Stop the HTTP server and signal all collectors.

Source code in flaude/log_drain.py
async def stop(self) -> None:
    """Stop the HTTP server and signal all collectors."""
    if self._server is not None:
        self._server.close()
        await self._server.wait_closed()
        self._server = None
        self._actual_port = None
    logger.info("Log drain server stopped")

LogCollector

LogCollector()

Thread-safe registry mapping machine IDs to asyncio queues.

Each machine gets its own queue. Log entries are routed by machine ID.

Source code in flaude/log_drain.py
def __init__(self) -> None:
    self._queues: dict[str, asyncio.Queue[str | None]] = {}
    self._lock = asyncio.Lock()

machine_ids property

machine_ids

Return list of currently subscribed machine IDs.

subscribe async

subscribe(machine_id)

Get or create a queue for the given machine ID.

The queue yields str log lines. A None sentinel indicates that the machine has stopped and no more logs will arrive.

Source code in flaude/log_drain.py
async def subscribe(self, machine_id: str) -> asyncio.Queue[str | None]:
    """Get or create a queue for the given machine ID.

    The queue yields ``str`` log lines. A ``None`` sentinel indicates
    that the machine has stopped and no more logs will arrive.
    """
    async with self._lock:
        if machine_id not in self._queues:
            self._queues[machine_id] = asyncio.Queue()
        return self._queues[machine_id]

push async

push(machine_id, line)

Push a log line to the queue for machine_id.

If no subscriber exists for this machine, the line is silently dropped.

Parameters:

Name Type Description Default
machine_id str

The Fly machine ID whose queue should receive the line.

required
line str

The log line string to enqueue.

required
Source code in flaude/log_drain.py
async def push(self, machine_id: str, line: str) -> None:
    """Push a log line to the queue for *machine_id*.

    If no subscriber exists for this machine, the line is silently dropped.

    Args:
        machine_id: The Fly machine ID whose queue should receive the line.
        line: The log line string to enqueue.
    """
    async with self._lock:
        q = self._queues.get(machine_id)
    if q is not None:
        await q.put(line)

finish async

finish(machine_id)

Signal that no more logs will arrive for machine_id.

Pushes a None sentinel and removes the queue from the registry.

Parameters:

Name Type Description Default
machine_id str

The Fly machine ID whose queue should be finalized.

required
Source code in flaude/log_drain.py
async def finish(self, machine_id: str) -> None:
    """Signal that no more logs will arrive for *machine_id*.

    Pushes a ``None`` sentinel and removes the queue from the registry.

    Args:
        machine_id: The Fly machine ID whose queue should be finalized.
    """
    async with self._lock:
        q = self._queues.pop(machine_id, None)
    if q is not None:
        await q.put(_SENTINEL)

finish_all async

finish_all()

Signal completion for all registered machines.

Source code in flaude/log_drain.py
async def finish_all(self) -> None:
    """Signal completion for all registered machines."""
    async with self._lock:
        machine_ids = list(self._queues.keys())
    for mid in machine_ids:
        await self.finish(mid)

LogStream

LogStream(queue, *, item_timeout=None, total_timeout=None)

Async iterator wrapper that yields parsed log lines from a queue.

Provides backpressure handling, per-item and overall timeouts, and clean shutdown when the machine completes (sentinel received) or on cancellation.

Usage::

stream = LogStream(queue, item_timeout=30.0, total_timeout=3600.0)
async for line in stream:
    print(line)

# Check final state
assert stream.done
print(f"Received {stream.lines_yielded} lines")

Parameters:

Name Type Description Default
queue Queue[str | None]

The asyncio.Queue to read from (from LogCollector.subscribe). A None sentinel signals end-of-stream.

required
item_timeout float | None

Max seconds to wait for each individual log line. None means wait indefinitely for each item.

None
total_timeout float | None

Max seconds for the entire iteration. None means no overall time limit.

None
Source code in flaude/log_drain.py
def __init__(
    self,
    queue: asyncio.Queue[str | None],
    *,
    item_timeout: float | None = None,
    total_timeout: float | None = None,
) -> None:
    self._queue = queue
    self._item_timeout = item_timeout
    self._total_timeout = total_timeout

    # Mutable state
    self._done = False
    self._timed_out = False
    self._lines_yielded = 0
    self._deadline: float | None = None
    self._started = False

done property

done

True if the stream has finished (sentinel received or timed out).

timed_out property

timed_out

True if the stream ended due to a timeout.

lines_yielded property

lines_yielded

Number of log lines yielded so far.

collect async

collect()

Consume all remaining lines and return them as a list.

Convenience method equivalent to [line async for line in self].

Source code in flaude/log_drain.py
async def collect(self) -> list[str]:
    """Consume all remaining lines and return them as a list.

    Convenience method equivalent to ``[line async for line in self]``.
    """
    lines: list[str] = []
    async for line in self:
        lines.append(line)
    return lines

LogEntry dataclass

LogEntry(machine_id, message, stream, timestamp='', app_name='', raw=dict())

Parsed representation of a single Fly.io log drain entry.

Attributes:

Name Type Description
machine_id str

The Fly machine ID that produced this log entry.

message str

The log line content.

stream str

Output stream — "stdout", "stderr", or "system" (Fly infrastructure messages).

timestamp str

ISO 8601 timestamp string from the log drain payload (may be empty).

app_name str

The Fly.io application name (may be empty if not present in payload).

raw dict[str, Any]

The original parsed JSON dict from the log drain request.

parse_log_entry

parse_log_entry(raw)

Parse a single Fly.io log drain JSON object into a LogEntry.

Fly log drain NDJSON format varies, but typically contains: - fly.app.instance or fly.machine.id: machine identifier - message, log, or msg: the log line content - stream: "stdout" or "stderr" for user process output - source: log origin — "app" for user code, "fly"/"proxy"/ "machine" for Fly infrastructure messages

Stream determination logic:

  1. If the stream field is explicitly set, use it as-is.
  2. If no stream field but source indicates Fly infrastructure ("fly", "proxy", or "machine"), the entry is classified as "system" — a Fly-generated lifecycle or health message that should not appear in user-facing log output.
  3. If neither field is set, default to "stdout" (user application log without explicit stream tagging).

Returns None if the entry cannot be parsed or is missing required fields.

Source code in flaude/log_drain.py
def parse_log_entry(raw: dict[str, Any]) -> LogEntry | None:
    """Parse a single Fly.io log drain JSON object into a LogEntry.

    Fly log drain NDJSON format varies, but typically contains:
    - ``fly.app.instance`` or ``fly.machine.id``: machine identifier
    - ``message``, ``log``, or ``msg``: the log line content
    - ``stream``: ``"stdout"`` or ``"stderr"`` for user process output
    - ``source``: log origin — ``"app"`` for user code, ``"fly"``/``"proxy"``/
      ``"machine"`` for Fly infrastructure messages

    Stream determination logic:

    1. If the ``stream`` field is explicitly set, use it as-is.
    2. If no ``stream`` field but ``source`` indicates Fly infrastructure
       (``"fly"``, ``"proxy"``, or ``"machine"``), the entry is classified
       as ``"system"`` — a Fly-generated lifecycle or health message that
       should not appear in user-facing log output.
    3. If neither field is set, default to ``"stdout"`` (user application
       log without explicit stream tagging).

    Returns None if the entry cannot be parsed or is missing required fields.
    """
    # Extract machine ID — Fly uses different field names across versions
    machine_id = (
        raw.get("fly", {}).get("app", {}).get("instance")
        or raw.get("fly", {}).get("machine", {}).get("id")
        or raw.get("instance")
        or raw.get("machine_id")
        or ""
    )
    if not machine_id:
        return None

    # Extract message — accept multiple field name aliases
    message = raw.get("message") or raw.get("log") or raw.get("msg") or ""
    if not isinstance(message, str):
        message = str(message)

    # Determine stream type, carefully separating user output from system logs.
    #
    # Fly's "source" field indicates log *origin* (who produced the log), not
    # the output stream. Sources "fly", "proxy", and "machine" are Fly
    # infrastructure; "app" (or absent) means user process output.
    explicit_stream = raw.get("stream") or ""
    if explicit_stream:
        # Explicit stream tag — trust it directly ("stdout", "stderr", etc.)
        stream = explicit_stream
    else:
        source = raw.get("source") or ""
        if source in ("fly", "proxy", "machine"):
            # Fly infrastructure message (lifecycle events, health checks, etc.)
            stream = "system"
        else:
            # No stream indicator and no system source — treat as user stdout.
            # This covers apps that write to stdout without stream metadata.
            stream = "stdout"

    # Extract optional metadata
    timestamp = raw.get("timestamp") or raw.get("time") or raw.get("ts") or ""
    app_name = raw.get("fly", {}).get("app", {}).get("name") or ""

    return LogEntry(
        machine_id=machine_id,
        message=message,
        stream=stream,
        timestamp=str(timestamp),
        app_name=app_name,
        raw=raw,
    )

parse_ndjson

parse_ndjson(body)

Parse an NDJSON (newline-delimited JSON) body into a list of dicts.

Silently skips malformed lines.

Parameters:

Name Type Description Default
body bytes

Raw HTTP request body containing newline-delimited JSON.

required

Returns:

Type Description
list[dict[str, Any]]

List of successfully parsed JSON objects. Malformed lines are omitted.

Source code in flaude/log_drain.py
def parse_ndjson(body: bytes) -> list[dict[str, Any]]:
    """Parse an NDJSON (newline-delimited JSON) body into a list of dicts.

    Silently skips malformed lines.

    Args:
        body: Raw HTTP request body containing newline-delimited JSON.

    Returns:
        List of successfully parsed JSON objects. Malformed lines are omitted.
    """
    entries: list[dict[str, Any]] = []
    for line in body.split(b"\n"):
        line = line.strip()
        if not line:
            continue
        try:
            obj = json.loads(line)
            if isinstance(obj, dict):
                entries.append(obj)
        except (json.JSONDecodeError, ValueError):
            logger.debug("Skipping malformed NDJSON line: %s", line[:200])
    return entries

drain_queue async

drain_queue(queue, *, timeout=None)

Drain all lines from a queue until the sentinel is received.

This is a convenience function for collecting all output. For streaming, use async_iter_queue instead.

Parameters:

Name Type Description Default
queue Queue[str | None]

The queue to drain (from LogCollector.subscribe).

required
timeout float | None

Max seconds to wait for all output. None = wait forever.

None

Returns:

Type Description
list[str]

List of log lines (excluding the sentinel).

Source code in flaude/log_drain.py
async def drain_queue(
    queue: asyncio.Queue[str | None],
    *,
    timeout: float | None = None,
) -> list[str]:
    """Drain all lines from a queue until the sentinel is received.

    This is a convenience function for collecting all output. For streaming,
    use ``async_iter_queue`` instead.

    Args:
        queue: The queue to drain (from ``LogCollector.subscribe``).
        timeout: Max seconds to wait for all output. None = wait forever.

    Returns:
        List of log lines (excluding the sentinel).
    """
    lines: list[str] = []

    async def _drain() -> list[str]:
        while True:
            item = await queue.get()
            if item is None:
                return lines
            lines.append(item)

    if timeout is not None:
        return await asyncio.wait_for(_drain(), timeout=timeout)
    return await _drain()

async_iter_queue async

async_iter_queue(queue)

Async iterator that yields log lines from a queue until sentinel.

Usage::

q = await collector.subscribe("machine-id")
async for line in async_iter_queue(q):
    print(line)
Source code in flaude/log_drain.py
async def async_iter_queue(
    queue: asyncio.Queue[str | None],
) -> Any:  # AsyncIterator[str] — using Any to avoid import issues
    """Async iterator that yields log lines from a queue until sentinel.

    Usage::

        q = await collector.subscribe("machine-id")
        async for line in async_iter_queue(q):
            print(line)
    """
    while True:
        item = await queue.get()
        if item is None:
            return
        yield item