pygwire¶
A low-level PostgreSQL wire protocol codec for Python.
Beta
Pygwire is under active development. The API may change between minor releases until 1.0. See the changelog for migration notes.
Pygwire is a sans-I/O PostgreSQL wire protocol (v3.0 and v3.2) codec. All codec and state machine logic is I/O-independent, making it portable across asyncio, trio, synchronous sockets, or any other transport.
Features¶
- Sans-I/O design. No I/O dependencies. Bring your own transport.
- Zero-copy parsing. Uses
memoryviewfor buffer slicing. - Complete protocol coverage. All PostgreSQL v3.0 and v3.2 wire protocol messages with connection phase tracking.
- Zero dependencies. No runtime dependencies.
- Fully typed. Ships with
py.typedmarker for PEP 561 support.
Architecture¶
Pygwire is organized into four layers, from low-level to high-level:
| Layer | Module | Purpose |
|---|---|---|
| Messages | pygwire.messages |
Encode and decode all PostgreSQL protocol messages |
| Codec | pygwire.codec |
Incremental stream decoder with zero-copy framing |
| State Machine | pygwire.state_machine |
Connection phase tracking for framing, disambiguation, and lifecycle |
| Connection | pygwire.connection |
Coordinated decoder + state machine (sans-I/O) |
Use the lower layers independently for maximum control, or use Connection for a higher-level API that coordinates them together.
PostgreSQL naming convention
Pygwire follows PostgreSQL's naming convention: backend = server, frontend = client.
Quick example¶
Using Connection (recommended)¶
FrontendConnection coordinates a decoder and state machine together:
"""Index: Using Connection (recommended).
This example will run if you have a PostgreSQL server running on localhost:5432
with trust authentication configured for the 'postgres' user.
"""
import socket
from pygwire import ConnectionPhase, FrontendConnection
from pygwire.messages import DataRow, Query, StartupMessage
conn = FrontendConnection()
sock = socket.create_connection(("localhost", 5432))
# Send startup
sock.send(conn.send(StartupMessage(params={"user": "postgres", "database": "postgres"})))
# Handle authentication (requires trust auth)
while conn.phase != ConnectionPhase.READY:
for msg in conn.receive(sock.recv(4096)):
print(msg) # handle auth messages
# Send a query and read results
sock.send(conn.send(Query(query_string="SELECT 1")))
for msg in conn.receive(sock.recv(4096)):
if isinstance(msg, DataRow):
print(msg.columns)
Using the low-level API¶
For maximum control, use the codec, messages, and state machine independently:
"""Index: Using the low-level API."""
from pygwire import BackendMessageDecoder
from pygwire.messages import ParameterStatus, Query
# Decode server messages
decoder = BackendMessageDecoder()
ps = ParameterStatus(name="foo", value="bar")
data_from_server = ps.to_wire()
decoder.feed(data_from_server)
for msg in decoder:
print(f"{type(msg).__name__}: {msg}")
# Encode client messages
query = Query(query_string="SELECT 1")
wire_bytes = query.to_wire()
print(wire_bytes)
What is sans-I/O?¶
Pygwire's core never reads from or writes to sockets, files, or any other I/O source. Instead, you:
- Feed raw bytes into the decoder (from whatever transport you use)
- Read decoded message objects out
- Encode messages to bytes and send them yourself
This means pygwire works identically with asyncio, trio, plain sockets, or even in-memory buffers for testing. The sans-I/O manifesto describes this pattern in detail.
The Connection classes follow the same principle. They coordinate protocol state internally but never perform I/O. Subclass and override on_send() and on_receive() to integrate with your transport layer:
"""Index: Subclassing for I/O integration."""
import socket
from pygwire import ConnectionPhase, FrontendConnection
from pygwire.messages import PGMessage, Query, StartupMessage
sock = socket.create_connection(("localhost", 5432))
class SocketConnection(FrontendConnection):
def __init__(self, sock: socket.socket) -> None:
super().__init__()
self.sock = sock
def on_send(self, data: bytes) -> None:
sock.send(data)
def on_receive(self, msg: PGMessage) -> None:
print(f"Received: {msg}")
conn = SocketConnection(sock)
conn.send(StartupMessage(params={"user": "postgres", "database": "postgres"}))
while conn.phase != ConnectionPhase.READY:
for _ in conn.receive(sock.recv(4096)):
...
conn.send(Query(query_string="SELECT 1")) # automatically sends to socket
for _ in conn.receive(sock.recv(4096)):
pass