Skip to content

pygwire

A low-level PostgreSQL wire protocol codec for Python.

CI Python 3.11+ License: MIT

Beta

Pygwire is under active development. The API may change between minor releases until 1.0. See the changelog for migration notes.


Pygwire is a sans-I/O PostgreSQL wire protocol (v3.0 and v3.2) codec. All codec and state machine logic is I/O-independent, making it portable across asyncio, trio, synchronous sockets, or any other transport.

Features

  • Sans-I/O design. No I/O dependencies. Bring your own transport.
  • Zero-copy parsing. Uses memoryview for buffer slicing.
  • Complete protocol coverage. All PostgreSQL v3.0 and v3.2 wire protocol messages with connection phase tracking.
  • Zero dependencies. No runtime dependencies.
  • Fully typed. Ships with py.typed marker for PEP 561 support.

Architecture

Pygwire is organized into four layers, from low-level to high-level:

Layer Module Purpose
Messages pygwire.messages Encode and decode all PostgreSQL protocol messages
Codec pygwire.codec Incremental stream decoder with zero-copy framing
State Machine pygwire.state_machine Connection phase tracking for framing, disambiguation, and lifecycle
Connection pygwire.connection Coordinated decoder + state machine (sans-I/O)

Use the lower layers independently for maximum control, or use Connection for a higher-level API that coordinates them together.

PostgreSQL naming convention

Pygwire follows PostgreSQL's naming convention: backend = server, frontend = client.

Quick example

FrontendConnection coordinates a decoder and state machine together:

"""Index: Using Connection (recommended).

This example will run if you have a PostgreSQL server running on localhost:5432
with trust authentication configured for the 'postgres' user.
"""

import socket

from pygwire import ConnectionPhase, FrontendConnection
from pygwire.messages import DataRow, Query, StartupMessage

conn = FrontendConnection()
sock = socket.create_connection(("localhost", 5432))

# Send startup
sock.send(conn.send(StartupMessage(params={"user": "postgres", "database": "postgres"})))

# Handle authentication (requires trust auth)
while conn.phase != ConnectionPhase.READY:
    for msg in conn.receive(sock.recv(4096)):
        print(msg)  # handle auth messages

# Send a query and read results
sock.send(conn.send(Query(query_string="SELECT 1")))
for msg in conn.receive(sock.recv(4096)):
    if isinstance(msg, DataRow):
        print(msg.columns)

Using the low-level API

For maximum control, use the codec, messages, and state machine independently:

"""Index: Using the low-level API."""

from pygwire import BackendMessageDecoder
from pygwire.messages import ParameterStatus, Query

# Decode server messages
decoder = BackendMessageDecoder()
ps = ParameterStatus(name="foo", value="bar")
data_from_server = ps.to_wire()

decoder.feed(data_from_server)
for msg in decoder:
    print(f"{type(msg).__name__}: {msg}")

# Encode client messages
query = Query(query_string="SELECT 1")
wire_bytes = query.to_wire()

print(wire_bytes)

What is sans-I/O?

Pygwire's core never reads from or writes to sockets, files, or any other I/O source. Instead, you:

  1. Feed raw bytes into the decoder (from whatever transport you use)
  2. Read decoded message objects out
  3. Encode messages to bytes and send them yourself

This means pygwire works identically with asyncio, trio, plain sockets, or even in-memory buffers for testing. The sans-I/O manifesto describes this pattern in detail.

The Connection classes follow the same principle. They coordinate protocol state internally but never perform I/O. Subclass and override on_send() and on_receive() to integrate with your transport layer:

"""Index: Subclassing for I/O integration."""

import socket

from pygwire import ConnectionPhase, FrontendConnection
from pygwire.messages import PGMessage, Query, StartupMessage

sock = socket.create_connection(("localhost", 5432))


class SocketConnection(FrontendConnection):
    def __init__(self, sock: socket.socket) -> None:
        super().__init__()
        self.sock = sock

    def on_send(self, data: bytes) -> None:
        sock.send(data)

    def on_receive(self, msg: PGMessage) -> None:
        print(f"Received: {msg}")


conn = SocketConnection(sock)
conn.send(StartupMessage(params={"user": "postgres", "database": "postgres"}))
while conn.phase != ConnectionPhase.READY:
    for _ in conn.receive(sock.recv(4096)):
        ...

conn.send(Query(query_string="SELECT 1"))  # automatically sends to socket

for _ in conn.receive(sock.recv(4096)):
    pass