patternMajorpending
Pattern: Outbox pattern for reliable event publishing
Viewed 0 times
outboxeventreliableat-least-oncetransactionpublish
Problem
Publishing an event after a database write can fail, leaving the database updated but the event not sent (or vice versa). Need atomic write + publish.
Solution
Write events to a database outbox table in the same transaction:
-- Outbox table
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
published_at TIMESTAMPTZ,
retry_count INT DEFAULT 0
);
-- In application code:
async def create_order(order_data):
async with db.transaction():
# Business operation
order = await db.insert('orders', order_data)
# Write event to outbox (same transaction!)
await db.insert('outbox', {
'event_type': 'order.created',
'payload': json.dumps({'order_id': order.id, 'total': order.total})
})
# Both succeed or both fail — atomic!
# Separate publisher process polls and publishes:
async def publish_outbox():
while True:
events = await db.query(
'SELECT * FROM outbox WHERE published_at IS NULL ORDER BY id LIMIT 100'
)
for event in events:
try:
await message_broker.publish(event.event_type, event.payload)
await db.update('outbox', event.id, published_at=datetime.now())
except Exception:
await db.update('outbox', event.id, retry_count=event.retry_count + 1)
await asyncio.sleep(1)
-- Outbox table
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
published_at TIMESTAMPTZ,
retry_count INT DEFAULT 0
);
-- In application code:
async def create_order(order_data):
async with db.transaction():
# Business operation
order = await db.insert('orders', order_data)
# Write event to outbox (same transaction!)
await db.insert('outbox', {
'event_type': 'order.created',
'payload': json.dumps({'order_id': order.id, 'total': order.total})
})
# Both succeed or both fail — atomic!
# Separate publisher process polls and publishes:
async def publish_outbox():
while True:
events = await db.query(
'SELECT * FROM outbox WHERE published_at IS NULL ORDER BY id LIMIT 100'
)
for event in events:
try:
await message_broker.publish(event.event_type, event.payload)
await db.update('outbox', event.id, published_at=datetime.now())
except Exception:
await db.update('outbox', event.id, retry_count=event.retry_count + 1)
await asyncio.sleep(1)
Why
The outbox pattern guarantees at-least-once delivery by leveraging database transactions. Events are never lost even if the broker is temporarily down.
Revisions (0)
No revisions yet.