Skip to content

Connection

The Connection classes coordinate a decoder and state machine together, providing a higher-level sans-I/O API for the PostgreSQL wire protocol.

Overview

Without Connection, you manage a decoder and state machine separately:

"""Connection: Overview - Without Connection."""

import socket

from pygwire import BackendMessageDecoder, FrontendStateMachine
from pygwire.messages import StartupMessage

decoder = BackendMessageDecoder()
sm = FrontendStateMachine()
sock = socket.create_connection(("localhost", 5432))
startup_msg = StartupMessage(params={"user": "postgres", "database": "postgres"})

sm.send(startup_msg)  # Update the state machine
decoder.phase = sm.phase  # Sync decoder with state machine
sock.send(startup_msg.to_wire())

decoder.feed(sock.recv(4096))
for msg in decoder:
    sm.receive(msg)  # Update the state machine
    decoder.phase = sm.phase  # Sync decoder with state machine
    print(msg)

With Connection, both are coordinated in a single object:

"""Connection: Overview - With Connection."""

import socket

from pygwire import FrontendConnection
from pygwire.messages import StartupMessage

conn = FrontendConnection()
sock = socket.create_connection(("localhost", 5432))
startup_msg = StartupMessage(params={"user": "postgres", "database": "postgres"})

sock.send(conn.send(startup_msg))

for msg in conn.receive(sock.recv(4096)):
    # state machine is updated automatically
    print(msg)

Connection types

Class Role Decoder State Machine
FrontendConnection Client BackendMessageDecoder FrontendStateMachine
BackendConnection Server FrontendMessageDecoder BackendStateMachine

Connection (abstract base)

Base class. Use FrontendConnection or BackendConnection.

Attributes

Attribute Type
decoder BackendMessageDecoder \| FrontendMessageDecoder
state_machine FrontendStateMachine \| BackendStateMachine

send(msg) -> bytes

Validate the message against the state machine, encode it to wire format, and call on_send().

Returns the wire-format bytes.

Raises StateMachineError if the message is not valid for the current phase.

receive(data) -> Iterator[PGMessage]

Feed raw bytes to the decoder and yield decoded messages. Each message is validated against the state machine and passed to on_receive().

Raises ProtocolError if message framing is invalid. Raises StateMachineError if a decoded message is not valid for the current phase.

on_send(data) -> None

Hook called after encoding a message. Override to add I/O (write to socket) or logging.

Default implementation does nothing.

on_receive(msg) -> None

Hook called after decoding and validating a message. Override to add logging, metrics, or custom handling.

Default implementation does nothing.

Properties

Property Type Description
phase ConnectionPhase Current connection phase (delegates to state machine)
is_active bool True if connection has not terminated or failed
is_ready bool True if connection is ready to accept queries
pending_syncs int Number of pending Sync responses (for pipelined extended queries)

FrontendConnection

Client-side connection. Uses BackendMessageDecoder + FrontendStateMachine.

from pygwire import FrontendConnection

conn = FrontendConnection()

Parameters:

  • initial_phase (ConnectionPhase, default STARTUP): Starting connection phase. Use a later phase (e.g., READY) for connection pooling or proxying where startup is already complete.
  • strict (bool, default True): If True, state machine violations raise StateMachineError. If False, violations are logged as warnings and the connection continues.

Client example

"""Connection: Complete client example."""

import hashlib
import socket

from pygwire import ConnectionPhase, FrontendConnection, messages

conn = FrontendConnection()
sock = socket.create_connection(("localhost", 5432))

# Send startup
sock.send(conn.send(messages.StartupMessage(params={"user": "postgres", "database": "postgres"})))


def compute_md5_password(password: str, username: str, salt: bytes) -> str:
    inner = hashlib.md5(f"{password}{username}".encode()).hexdigest()
    outer = hashlib.md5(f"{inner}".encode() + salt).hexdigest()
    return f"md5{outer}"


# Handle authentication
while conn.phase != ConnectionPhase.READY:
    for msg in conn.receive(sock.recv(4096)):
        if isinstance(msg, messages.AuthenticationMD5Password):
            md5_hash = compute_md5_password(password="postgres", username="postgres", salt=msg.salt)
            sock.send(conn.send(messages.PasswordMessage(password=md5_hash)))
        print(msg)

# Send query and read results
sock.send(conn.send(messages.Query(query_string="SELECT 1")))
while conn.phase == ConnectionPhase.SIMPLE_QUERY:
    for msg in conn.receive(sock.recv(4096)):
        print(msg)

BackendConnection

Server-side connection. Uses FrontendMessageDecoder + BackendStateMachine.

from pygwire import BackendConnection

conn = BackendConnection()

Parameters:

  • initial_phase (ConnectionPhase, default STARTUP): Starting connection phase. Use a later phase (e.g., READY) for connection pooling or proxying where startup is already complete.
  • strict (bool, default True): If True, state machine violations raise StateMachineError. If False, violations are logged as warnings and the connection continues.

Server example

from pygwire import BackendConnection
from pygwire import TransactionStatus
from pygwire.messages import AuthenticationOk, ReadyForQuery, StartupMessage

conn = BackendConnection()

for msg in conn.receive(client_data):
    if isinstance(msg, StartupMessage):
        client_sock.send(conn.send(AuthenticationOk()))
        client_sock.send(conn.send(ReadyForQuery(status=TransactionStatus.IDLE)))

Subclassing for I/O

Override on_send() and on_receive() to integrate with your transport:

"""Connection: Subclassing for I/O - synchronous."""

import socket

from pygwire import ConnectionPhase, FrontendConnection
from pygwire.messages import PGMessage, Query, StartupMessage


class SocketConnection(FrontendConnection):
    def __init__(self, sock: socket.socket) -> None:
        super().__init__()
        self.sock = sock

    def on_send(self, data: bytes) -> None:
        self.sock.send(data)

    def on_receive(self, msg: PGMessage) -> None:
        print(f"Received: {msg}")


sock = socket.create_connection(("localhost", 5432))
conn = SocketConnection(sock)
conn.send(StartupMessage(params={"user": "postgres", "database": "postgres"}))
while conn.phase != ConnectionPhase.READY:
    for _ in conn.receive(sock.recv(4096)):
        ...

conn.send(Query(query_string="SELECT 1"))  # automatically sends to socket

for _ in conn.receive(sock.recv(4096)):
    pass

Async example

"""Connection: Subclassing for I/O - asynchronous."""

import asyncio
from collections.abc import AsyncIterator

from pygwire import FrontendConnection
from pygwire.messages import PGMessage


class AsyncConnection(FrontendConnection):
    def __init__(
        self,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
    ) -> None:
        super().__init__()
        self._reader = reader
        self._writer = writer

    def on_send(self, data: bytes) -> None:
        self._writer.write(data)

    async def send_message(self, msg: PGMessage) -> None:
        self.send(msg)
        await self._writer.drain()

    async def recv_messages(self) -> AsyncIterator[PGMessage]:
        data = await self._reader.read(8192)
        if not data:
            return
        for msg in self.receive(data):
            yield msg

See the authentication proxy example for a complete async proxy using this pattern.


Phase tracking

The phase property delegates to the state machine. Use it to drive protocol loops:

"""Connection: Phase tracking."""

import socket

from pygwire import ConnectionPhase, FrontendConnection
from pygwire.messages import Query, StartupMessage

conn = FrontendConnection()
sock = socket.create_connection(("localhost", 5432))

# Send startup
startup_msg = StartupMessage(params={"user": "postgres", "database": "postgres"})
sock.send(conn.send(startup_msg))

# Authentication loop (Using trust auth)
while conn.phase != ConnectionPhase.READY:
    for msg in conn.receive(sock.recv(4096)):
        print(msg)
        print(conn.phase)

# Query loop
sock.send(conn.send(Query(query_string="SELECT 1")))
while conn.phase == ConnectionPhase.SIMPLE_QUERY:
    for msg in conn.receive(sock.recv(4096)):
        print(msg)
        print(conn.phase)

When to use Connection vs low-level API

Use Connection when:

  • Building a client or server that follows the standard protocol flow
  • You want decoder + state machine coordination without boilerplate
  • You want hooks for I/O integration

Use the low-level API when:

  • You need the decoder without state tracking (e.g., passive protocol analysis)
  • You need to manipulate the decoder or state machine independently
  • You are building a proxy that needs separate state machines for each side