Quick Start¶
Installation¶
Requires Python 3.11+. No runtime dependencies.
Verify the installation:
"""Quickstart: Verify installation."""
from pygwire.messages import Query
query = Query(query_string="SELECT 1")
print(query.to_wire()) # Raw wire protocol bytes
Decoding server messages (client-side)¶
Use BackendMessageDecoder to parse messages from a PostgreSQL server:
"""Quickstart: Decoding server messages (client-side)."""
from pygwire import BackendMessageDecoder
from pygwire.messages import AuthenticationOk
decoder = BackendMessageDecoder()
# Feed raw bytes received from the server
decoder.feed(AuthenticationOk().to_wire())
# Iterate over decoded messages
for msg in decoder:
print(f"Decoded: {type(msg).__name__}") # "Decoded: AuthenticationOk"
The decoder handles partial messages automatically. If you feed half a message, it buffers internally until the rest arrives.
Decoding client messages (server/proxy-side)¶
Use FrontendMessageDecoder to decode incoming client messages. The decoder is phase-aware and handles different framing modes automatically:
"""Quickstart: Decoding client messages (server/proxy-side)."""
from pygwire import ConnectionPhase, FrontendMessageDecoder
from pygwire.messages import Query, StartupMessage
decoder = FrontendMessageDecoder()
# Simulate a client sending a startup message
startup = StartupMessage(params={"user": "postgres", "database": "mydb"})
decoder.feed(startup.to_wire())
for msg in decoder:
if isinstance(msg, StartupMessage):
print(f"Client connecting: user={msg.params.get('user')}")
# Transition to authentication phase
decoder.phase = ConnectionPhase.AUTHENTICATING
# After authentication completes, transition to READY
decoder.phase = ConnectionPhase.READY
# Now decode standard messages
query = Query(query_string="SELECT 1")
decoder.feed(query.to_wire())
for msg in decoder:
if isinstance(msg, Query):
print(f"Query: {msg.query_string}")
Phase-aware decoding
The PostgreSQL wire protocol uses different framing modes depending on the connection phase. The decoder's phase property determines how messages are parsed. When using the decoder standalone, you must update the phase property after each state transition. The Connection classes handle this automatically.
Encoding messages¶
All message classes have a to_wire() method that returns the complete wire-format bytes:
"""Quickstart: Encoding messages."""
from pygwire.messages import Query, StartupMessage, Terminate
# Simple query
query = Query(query_string="SELECT * FROM users WHERE id = 1")
print(f"Query wire bytes: {query.to_wire()!r}")
# Startup message
startup = StartupMessage(params={"user": "postgres", "database": "mydb"})
print(f"Startup wire bytes ({len(startup.to_wire())} bytes)")
# Graceful disconnect
terminate = Terminate()
print(f"Terminate wire bytes: {terminate.to_wire()!r}")
Tracking connection state¶
The state machine validates that messages are sent and received in the correct order:
"""Quickstart: Tracking connection state."""
from pygwire import FrontendStateMachine, TransactionStatus
from pygwire.messages import (
AuthenticationOk,
BackendKeyData,
ParameterStatus,
Query,
ReadyForQuery,
StartupMessage,
)
sm = FrontendStateMachine()
# Track what you send
sm.send(StartupMessage(params={"user": "postgres", "database": "mydb"}))
print(sm.phase) # ConnectionPhase.AUTHENTICATING
# Track what you receive
sm.receive(AuthenticationOk())
sm.receive(ParameterStatus(name="server_version", value="15.0"))
sm.receive(BackendKeyData(process_id=1234, secret_key=b"\x00\x00\x00\x01"))
sm.receive(ReadyForQuery(status=TransactionStatus.IDLE))
print(sm.phase) # ConnectionPhase.READY
# Raises StateMachineError for messages invalid in the current phase
sm.send(Query(query_string="SELECT 1"))
print(sm.phase) # ConnectionPhase.SIMPLE_QUERY
Using Connection (decoder + state machine together)¶
The Connection class coordinates a decoder and state machine into a single object. This removes the boilerplate of managing them separately:
"""Quickstart: Using Connection (decoder + state machine together)."""
from pygwire import FrontendConnection, TransactionStatus
from pygwire.messages import (
AuthenticationOk,
BackendKeyData,
ParameterStatus,
ReadyForQuery,
StartupMessage,
)
conn = FrontendConnection()
# send() validates via state machine and returns wire bytes
wire_bytes = conn.send(StartupMessage(params={"user": "postgres", "database": "mydb"}))
print(conn.phase) # ConnectionPhase.AUTHENTICATING
# receive() feeds bytes to decoder, validates each message, and yields them
server_data = (
AuthenticationOk().to_wire()
+ ParameterStatus(name="server_version", value="15.0").to_wire()
+ BackendKeyData(process_id=1234, secret_key=b"\x00\x00\x00\x01").to_wire()
+ ReadyForQuery(status=TransactionStatus.IDLE).to_wire()
)
for msg in conn.receive(server_data):
print(f"Received: {type(msg).__name__}")
print(conn.phase) # ConnectionPhase.READY
Subclass and override on_send() / on_receive() to integrate with your transport. See the Connection reference for details.
Complete example¶
A client connection with MD5 authentication using FrontendConnection:
sock = socket.create_connection(("localhost", 5432))
conn = SocketConnection(sock)
# 1. Send startup
startup = StartupMessage(params={"user": "postgres", "database": "postgres"})
conn.send(startup)
# 2. Handle authentication
while conn.phase != ConnectionPhase.READY:
for msg in conn.recv_messages():
if isinstance(msg, AuthenticationMD5Password):
md5_hash = compute_md5_password("postgres", "postgres", msg.salt)
pwd_msg = PasswordMessage(password=md5_hash)
conn.send(pwd_msg)
# 3. Send a query
query = Query(query_string="SELECT 1 AS num")
conn.send(query)
# 4. Read results
while conn.phase == ConnectionPhase.SIMPLE_QUERY: # type: ignore[comparison-overlap]
for msg in conn.recv_messages():
if isinstance(msg, DataRow):
print(f"Result: {msg.columns}")
# 5. Disconnect
conn.send(Terminate())
sock.close()
This uses a SocketConnection subclass that sends data via on_send():
class SocketConnection(FrontendConnection):
"""Example of adding I/O directly via hooks.
This subclass overrides the hooks to automatically send/receive via socket.
"""
def __init__(self, sock: socket.socket) -> None:
super().__init__()
self.sock = sock
def on_send(self, data: bytes) -> None:
"""Automatically send data to socket."""
self.sock.send(data)
def recv_messages(self) -> Iterator[PGMessage]:
"""Convenience method: receive data and yield decoded messages."""
data = self.sock.recv(4096)
yield from self.receive(data)
And the MD5 hash helper:
def compute_md5_password(password: str, username: str, salt: bytes) -> str:
"""Compute PostgreSQL MD5 password hash.
PostgreSQL's MD5 authentication requires:
1. Hash the password and username: md5(password + username)
2. Hash the result with the salt: md5(hash1 + salt)
3. Prepend "md5" to the final hex string
"""
inner = hashlib.md5(f"{password}{username}".encode()).hexdigest()
outer = hashlib.md5(f"{inner}".encode() + salt).hexdigest()
return f"md5{outer}"
Authentication modes
This example uses MD5 password authentication. For SCRAM-SHA-256 or other methods, see the authentication proxy example.
Next steps¶
- Connection: coordinated decoder + state machine
- Codec: stream decoder details
- Messages: all message classes and fields
- State Machine: protocol phase tracking
- Constants: enums and identifiers