Reliably stream Postgres table changes to external systems (queues, webhooks, etc.) with automatic failover and zero event loss.
- User manages subscriptions directly in the database (
pgstream.subscriptionstable) - Subscription changes automatically create database triggers on the target tables
- Application events fire the managed triggers, which insert into a partitioned
eventstable - Postgres Stream streams events via logical replication and delivers them to your sink
flowchart LR
subgraph Postgres
A[subscriptions<br/>table] -->|creates<br/>triggers| B[trigger<br/>on users]
B -->|insert<br/>into| C[events<br/>partitioned<br/>20250112..]
C -->|logical<br/>replication| D[replication<br/>slot]
end
D -->|streams| E[Postgres<br/>Stream]
E -->|delivers| F[sink<br/>queue/http]
style Postgres fill:#dbeafe,stroke:#2563eb,stroke-width:3px
style A fill:#f3e8ff,stroke:#9333ea,stroke-width:2px,color:#581c87
style B fill:#fed7aa,stroke:#ea580c,stroke-width:2px,color:#7c2d12
style C fill:#bfdbfe,stroke:#2563eb,stroke-width:2px,color:#1e40af
style D fill:#99f6e4,stroke:#14b8a6,stroke-width:2px,color:#115e59
style E fill:#fecdd3,stroke:#e11d48,stroke-width:3px,color:#881337
style F fill:#bbf7d0,stroke:#16a34a,stroke-width:3px,color:#14532d
Example of generated trigger function (simplified for readability)
-- Auto-generated when you insert into subscriptions table
create or replace function pgstream._publish_after_insert_on_users()
returns trigger as $$
declare
v_jsonb_output jsonb := '[]'::jsonb;
v_base_payload jsonb := jsonb_build_object(
'tg_op', tg_op,
'tg_table_name', tg_table_name,
'tg_table_schema', tg_table_schema,
'timestamp', (extract(epoch from now()) * 1000)::bigint
);
begin
-- Check subscription "user-signup" condition
if new.email_verified = true then
v_jsonb_output := v_jsonb_output || (jsonb_build_object(
'tg_name', 'user-signup',
'new', jsonb_build_object('id', new.id, 'email', new.email),
'old', null
) || v_base_payload || '{}'::jsonb); -- payload_extensions would go here
end if;
-- Write to events table if any subscriptions matched
if jsonb_array_length(v_jsonb_output) > 0 then
insert into pgstream.events (payload, stream_id)
select elem, 1
from jsonb_array_elements(v_jsonb_output) as t(elem);
end if;
return new;
end;
$$ language plpgsql
set search_path = ''
security definer;
-- Trigger that calls the function
create constraint trigger pgstream_insert
after insert on public.users
deferrable initially deferred
for each row
when ((new.email_verified = true))
execute procedure pgstream._publish_after_insert_on_users();Note: The actual generated code handles multiple subscriptions per table, merges
whenclauses with OR logic, and includes allpayload_extensions. This example shows the structure for a single subscription.
- Single binary - No complex infrastructure or high-availability destinations required
- Postgres-native durability - Events are stored in the database, WAL can be released immediately
- No data loss - As long as downtime is less than partition retention (7 days by default)
Postgres Stream automatically handles two failure scenarios without operator intervention:
Sink failure (queue unavailable, webhook fails, etc.)
- Checkpoints the failed event and continues consuming the replication stream
- Periodically retries delivering the checkpoint event
- When sink recovers, replays all missed events via
COPYfrom the events table
Slot invalidation (WAL exceeded max_slot_wal_keep_size)
- Detects the "can no longer get changes from replication slot" error
- Queries
confirmed_flush_lsnfrom the invalidated slot to find the last processed position - Sets a failover checkpoint and creates a new slot
- When replication resumes, replays missed events from the checkpoint
Both scenarios use the same replay mechanism: events are read directly from the events table (not WAL), guaranteeing no data loss as long as events are within partition retention.
- Small overhead - Additional INSERT into
eventstable on every subscribed operation - Partition management - Need to monitor partition growth if event volume is very high
- Not for dynamic subscriptions - Each subscription change recreates database triggers (expensive operation)
- Postgres 15+ with
wal_level=logical - User with
replicationprivilege
Create config.yaml:
stream:
id: 1 # Unique stream ID
pg_connection:
host: localhost
port: 5432
name: mydb
username: postgres
password: postgres
tls:
enabled: false
batch:
max_size: 1000 # Events per batch
max_fill_secs: 5 # Max time to fill batch
sink:
type: memory # Built-in test sink# Start Postgres Stream
postgres-stream
# Or with Docker
docker run -v $(pwd)/config.yaml:/config.yaml postgres-streamSubscriptions define which events to capture. Simply insert into the subscriptions table:
-- Subscribe to verified user signups
insert into pgstream.subscriptions (
key,
stream_id,
operation,
schema_name,
table_name,
when_clause,
column_names,
payload_extensions
) values (
'user-signup', -- Unique identifier
1, -- Stream ID from config
'INSERT', -- Operation: INSERT, UPDATE, or DELETE
'public', -- Schema name
'users', -- Table name
'new.email_verified = true', -- Optional filter (SQL expression)
array['id', 'email', 'created_at'], -- Columns to include in payload
'[]'::jsonb -- Payload extensions (see below)
);This automatically creates the trigger on public.users. Now when you insert a verified user, the event is captured.
To reduce no-op trigger recreation (important for production), use merge to only update when values actually change:
Click to show merge-based helper function
-- Helper function that only recreates triggers when subscription actually changes
create or replace function set_subscriptions(
p_stream_id bigint,
p_subscriptions pgstream.subscriptions[]
)
returns void
language plpgsql
security definer
set search_path to ''
as $$
begin
create temporary table temp_subscriptions as
select * from unnest(p_subscriptions);
-- Only update if values actually changed (avoids trigger recreation)
merge into pgstream.subscriptions as target
using temp_subscriptions as source
on (target.key = source.key and target.stream_id = p_stream_id)
when matched and (
target.operation is distinct from source.operation or
target.schema_name is distinct from source.schema_name or
target.table_name is distinct from source.table_name or
target.when_clause is distinct from source.when_clause or
target.column_names is distinct from source.column_names or
target.metadata is distinct from source.metadata or
target.payload_extensions is distinct from source.payload_extensions or
target.metadata_extensions is distinct from source.metadata_extensions
) then update set
operation = source.operation,
schema_name = source.schema_name,
table_name = source.table_name,
when_clause = source.when_clause,
column_names = source.column_names,
metadata = source.metadata,
payload_extensions = source.payload_extensions,
metadata_extensions = source.metadata_extensions
when not matched then insert (
key, stream_id, operation, schema_name, table_name,
when_clause, column_names, metadata, payload_extensions, metadata_extensions
) values (
source.key, p_stream_id, source.operation, source.schema_name,
source.table_name, source.when_clause, source.column_names,
source.metadata, source.payload_extensions, source.metadata_extensions
);
-- Remove subscriptions not in input
delete from pgstream.subscriptions
where stream_id = p_stream_id
and not exists (
select 1 from temp_subscriptions
where pgstream.subscriptions.key = temp_subscriptions.key
);
drop table temp_subscriptions;
end;
$$;Now when a user signs up with verified email:
insert into users (email, email_verified) values ('[email protected]', true);The sink receives:
{
"tg_name": "user-signup",
"tg_op": "INSERT",
"tg_table_name": "users",
"tg_table_schema": "public",
"timestamp": 1703001234567,
"new": {
"id": 123,
"email": "[email protected]",
"created_at": "2024-12-12T10:30:00Z"
},
"old": null
}You can also insert events directly into the pgstream.events table instead of using subscriptions. This is useful for custom events not tied to table changes, background jobs, or events from external sources.
insert into pgstream.events (payload, stream_id, metadata)
values (
'{"type": "job-completed", "job_id": 123, "result": "success"}'::jsonb,
1, -- Stream ID from config
'{"topic": "background-jobs"}'::jsonb
);Required fields:
payload(jsonb) - The event datastream_id(bigint) - Must match the stream ID from your config
Optional:
metadata(jsonb) - Routing information (topic, partition key, etc.)
The id and created_at fields are auto-generated.
Use payload_extensions to add computed fields to the event payload:
insert into pgstream.subscriptions (
key, stream_id, operation, schema_name, table_name,
when_clause, column_names, payload_extensions
) values (
'order-notification',
1,
'INSERT',
'public',
'orders',
null,
array['id', 'user_id', 'total'],
'[
{"json_path": "order_date", "expression": "new.created_at::date::text"},
{"json_path": "total_formatted", "expression": "''$'' || new.total::text"}
]'::jsonb
);Result:
{
"tg_name": "order-notification",
"new": {"id": 456, "user_id": 123, "total": 99.99},
"order_date": "2024-12-12",
"total_formatted": "$99.99"
}Common use cases:
- Computed fields: Add derived values like
new.created_at::date - Formatted values: Add display-ready strings
- Context info: Add
auth.uid(),current_setting('app.tenant_id')
Note: For routing information (topic, queue, partition key, etc.), use
metadataandmetadata_extensionsinstead.
Use metadata for routing and sink configuration (topic, queue, partition key, index, etc.). Metadata is stored separately from the payload and read by sinks to determine where and how to deliver events.
Use the metadata column for routing values that are the same for every event:
insert into pgstream.subscriptions (
key, stream_id, operation, schema_name, table_name,
column_names, metadata
) values (
'user-events',
1,
'INSERT',
'public',
'users',
array['id', 'email'],
'{"topic": "user-events", "priority": "high"}'::jsonb
);Use metadata_extensions to compute routing values from row data. The format is identical to payload_extensions. You can use any SQL expression, including Supabase auth functions:
insert into pgstream.subscriptions (
key, stream_id, operation, schema_name, table_name,
column_names, metadata_extensions
) values (
'user-events',
1,
'INSERT',
'public',
'users',
array['id', 'email'],
'[
{"json_path": "partition_key", "expression": "new.user_id::text"},
{"json_path": "topic", "expression": "''users-'' || new.region"}
]'::jsonb
);Use dot notation in json_path to create nested objects:
'[
{"json_path": "auth.user_id", "expression": "auth.uid()::text"},
{"json_path": "auth.role", "expression": "auth.role()"}
]'::jsonbThis produces:
{
"auth": {
"user_id": "d0c12345-abcd-1234-efgh-567890abcdef",
"role": "authenticated"
}
}You can use both static and dynamic metadata together:
insert into pgstream.subscriptions (
key, stream_id, operation, schema_name, table_name,
column_names, metadata, metadata_extensions
) values (
'user-events',
1,
'INSERT',
'public',
'users',
array['id', 'email'],
'{"priority": "high"}'::jsonb,
'[
{"json_path": "topic", "expression": "''users-'' || new.region"},
{"json_path": "partition_key", "expression": "new.user_id::text"}
]'::jsonb
);The resulting event metadata merges static and dynamic values:
{
"priority": "high",
"topic": "users-eu-west-1",
"partition_key": "123"
}When the sink fails (e.g., queue goes down), Postgres Stream:
- Saves the failed event's ID as a checkpoint
- Continues consuming the replication stream (events still written to table)
- Periodically retries delivering the checkpoint event
- On success, uses
COPYto stream all events between checkpoint and current position - Replays missed events in order, then returns to normal streaming
Guarantees:
- No events lost (as long as downtime < partition retention)
- Events delivered at least once
- Order preserved within partitions
- No WAL retention required (events stored in table)
When the replication slot is invalidated (WAL exceeded max_slot_wal_keep_size), Postgres Stream:
- Detects the "can no longer get changes from replication slot" error
- Queries
confirmed_flush_lsnfrom the invalidated slot (Postgres preserves this) - Finds the first event with
lsn > confirmed_flush_lsn - Sets a failover checkpoint at that event
- Drops the invalidated slot
- Restarts the pipeline (ETL creates a fresh slot)
- When replication events arrive, triggers failover replay from checkpoint
Guarantees:
- Automatic recovery without operator intervention
- No events lost (events stored in table, not dependent on WAL)
- Works as long as events are within partition retention
Postgres Stream automatically manages daily partitions in the background:
Retention policy:
- Creates partitions 7 days in advance
- Drops partitions older than 7 days
- Runs on startup and then daily
Create custom sinks to deliver events to any destination (HTTP, Kafka, RabbitMQ, etc.).
Important: Sink dependencies and implementations should be behind feature flags to avoid bloating the binary. Users should only compile the sinks they actually use.
Update Cargo.toml:
[dependencies]
# Existing dependencies...
# Optional sink dependencies
reqwest = { version = "0.11", features = ["json"], optional = true }
rdkafka = { version = "0.36", optional = true }
[features]
# Sink feature flags
sink-http = ["dep:reqwest"]
sink-kafka = ["dep:rdkafka"]Create a new file src/sink/http.rs:
use etl::error::EtlResult;
use reqwest::Client;
use serde::Deserialize;
use tracing::info;
use crate::sink::Sink;
use crate::types::TriggeredEvent;
#[derive(Clone, Debug, Deserialize)]
pub struct HttpSinkConfig {
pub url: String,
#[serde(default)]
pub headers: std::collections::HashMap<String, String>,
}
#[derive(Clone)]
pub struct HttpSink {
config: HttpSinkConfig,
client: Client,
}
impl HttpSink {
pub fn new(config: HttpSinkConfig) -> Self {
let client = Client::new();
Self { config, client }
}
}
impl Sink for HttpSink {
fn name() -> &'static str {
"http"
}
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
for event in events {
let mut request = self.client
.post(&self.config.url)
.json(&event.payload);
// Add custom headers
for (key, value) in &self.config.headers {
request = request.header(key, value);
}
let response = request.send().await?;
if !response.status().is_success() {
return Err(etl::etl_error!(
etl::error::ErrorKind::Network,
"HTTP request failed: {}",
response.status()
));
}
info!("published event {} to {}", event.id.id, self.config.url);
}
Ok(())
}
}mod base;
pub mod memory;
#[cfg(feature = "sink-http")]
pub mod http;
pub use base::Sink;Update src/config/sink.rs:
use serde::Deserialize;
#[cfg(feature = "sink-http")]
use crate::sink::http::HttpSinkConfig;
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum SinkConfig {
Memory,
#[cfg(feature = "sink-http")]
Http(HttpSinkConfig),
}Update src/core.rs:
#[cfg(feature = "sink-http")]
use crate::sink::http::HttpSink;
// In start_pipeline_with_config():
let sink = match &config.sink {
SinkConfig::Memory => MemorySink::new(),
#[cfg(feature = "sink-http")]
SinkConfig::Http(cfg) => HttpSink::new(cfg.clone()),
};Build with the HTTP sink feature:
cargo build --release --features sink-httpUse in config.yaml:
sink:
type: http
url: https://webhook.example.com/events
headers:
Authorization: Bearer token123
X-Custom-Header: value