Custom Repository Implementation Guide
This comprehensive guide covers implementing custom repositories for NatsPubsub's Inbox and Outbox patterns, including interfaces, implementations for various databases, and best practices.
Table of Contents
- Overview
- Repository Interfaces
- Outbox Repository Implementation
- Inbox Repository Implementation
- PostgreSQL Implementation
- MySQL Implementation
- MongoDB Implementation
- Redis Implementation
- DynamoDB Implementation
- Testing Custom Repositories
- Performance Optimization
- Best Practices
Overview
NatsPubsub provides repository interfaces for Inbox and Outbox patterns, allowing you to implement custom storage backends for your specific needs.
Why Custom Repositories?
- Database Choice: Use your preferred database (PostgreSQL, MongoDB, Redis, etc.)
- Performance: Optimize queries for your workload
- Cost: Use managed services or cost-effective solutions
- Compliance: Meet specific data residency requirements
- Integration: Integrate with existing database infrastructure
Architecture
Repository Interfaces
Outbox Repository Interface
export interface OutboxEvent {
eventId: string;
subject: string;
payload: string;
headers?: string;
status: "pending" | "publishing" | "sent" | "failed";
enqueuedAt: Date;
sentAt?: Date;
errorMessage?: string;
}
export interface OutboxRepository {
/**
* Find or create an outbox event
* Should be idempotent - if event_id exists, return existing
*/
findOrCreate(params: Partial<OutboxEvent>): Promise<OutboxEvent>;
/**
* Mark event as sent
*/
markAsSent(eventId: string): Promise<void>;
/**
* Mark event as publishing (in-progress)
*/
markAsPublishing(eventId: string): Promise<void>;
/**
* Mark event as failed
*/
markAsFailed(eventId: string, errorMessage: string): Promise<void>;
/**
* Find pending events
*/
findPending(options?: { limit?: number }): Promise<OutboxEvent[]>;
/**
* Cleanup old sent events
*/
cleanup(olderThanDays: number): Promise<number>;
/**
* Reset stale events (stuck in 'publishing')
*/
resetStale(staleMinutes: number): Promise<number>;
/**
* Get count by status (for monitoring)
*/
getCountByStatus?(): Promise<Record<string, number>>;
}
Inbox Repository Interface
export interface InboxEvent {
eventId: string;
subject: string;
payload: string;
headers?: string;
stream?: string;
streamSeq?: number;
status: "processing" | "processed" | "failed";
receivedAt: Date;
processedAt?: Date;
deliveries: number;
errorMessage?: string;
}
export interface InboxRepository {
/**
* Find or create an inbox event
* Should be idempotent - if event_id exists, return existing
*/
findOrCreate(params: Partial<InboxEvent>): Promise<InboxEvent>;
/**
* Mark event as processed
*/
markAsProcessed(eventId: string): Promise<void>;
/**
* Mark event as failed
*/
markAsFailed(eventId: string, errorMessage: string): Promise<void>;
/**
* Check if event is already processed
*/
isProcessed(eventId: string): Promise<boolean>;
/**
* Cleanup old processed events
*/
cleanup(olderThanDays: number): Promise<number>;
/**
* Reset stale events (stuck in 'processing')
*/
resetStale(staleMinutes: number): Promise<number>;
/**
* Get count by status (for monitoring)
*/
getCountByStatus?(): Promise<Record<string, number>>;
}
Outbox Repository Implementation
Base Implementation Pattern
import { OutboxRepository, OutboxEvent } from "nats-pubsub";
export class CustomOutboxRepository implements OutboxRepository {
constructor(
private db: DatabaseConnection,
private logger?: Logger,
) {}
async findOrCreate(params: Partial<OutboxEvent>): Promise<OutboxEvent> {
// 1. Try to insert new event
try {
return await this.insert(params);
} catch (error) {
// 2. If duplicate key, fetch existing
if (this.isDuplicateKeyError(error)) {
return await this.findByEventId(params.eventId!);
}
throw error;
}
}
async markAsSent(eventId: string): Promise<void> {
await this.updateStatus(eventId, "sent", { sentAt: new Date() });
}
async markAsPublishing(eventId: string): Promise<void> {
await this.updateStatus(eventId, "publishing");
}
async markAsFailed(eventId: string, errorMessage: string): Promise<void> {
await this.updateStatus(eventId, "failed", { errorMessage });
}
async findPending(options?: { limit?: number }): Promise<OutboxEvent[]> {
const limit = options?.limit || 100;
return await this.query({
status: "pending",
orderBy: "enqueued_at",
limit,
});
}
async cleanup(olderThanDays: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
return await this.delete({
status: "sent",
sentAtBefore: cutoffDate,
});
}
async resetStale(staleMinutes: number): Promise<number> {
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - staleMinutes);
return await this.update(
{
status: "publishing",
enqueuedAtBefore: cutoffTime,
},
{ status: "pending" },
);
}
async getCountByStatus(): Promise<Record<string, number>> {
return await this.groupCount("status");
}
// Helper methods (implement based on your database)
private async insert(params: Partial<OutboxEvent>): Promise<OutboxEvent> {
throw new Error("Not implemented");
}
private async findByEventId(eventId: string): Promise<OutboxEvent> {
throw new Error("Not implemented");
}
private async updateStatus(
eventId: string,
status: string,
extra?: Record<string, any>,
): Promise<void> {
throw new Error("Not implemented");
}
private async query(criteria: any): Promise<OutboxEvent[]> {
throw new Error("Not implemented");
}
private async delete(criteria: any): Promise<number> {
throw new Error("Not implemented");
}
private async update(criteria: any, updates: any): Promise<number> {
throw new Error("Not implemented");
}
private async groupCount(field: string): Promise<Record<string, number>> {
throw new Error("Not implemented");
}
private isDuplicateKeyError(error: any): boolean {
throw new Error("Not implemented");
}
}
Inbox Repository Implementation
Base Implementation Pattern
import { InboxRepository, InboxEvent } from "nats-pubsub";
export class CustomInboxRepository implements InboxRepository {
constructor(
private db: DatabaseConnection,
private logger?: Logger,
) {}
async findOrCreate(params: Partial<InboxEvent>): Promise<InboxEvent> {
// 1. Try to insert new event
try {
return await this.insert({
...params,
status: "processing",
receivedAt: new Date(),
});
} catch (error) {
// 2. If duplicate key, fetch existing
if (this.isDuplicateKeyError(error)) {
return await this.findByEventId(params.eventId!);
}
throw error;
}
}
async markAsProcessed(eventId: string): Promise<void> {
await this.updateStatus(eventId, "processed", {
processedAt: new Date(),
});
}
async markAsFailed(eventId: string, errorMessage: string): Promise<void> {
await this.updateStatus(eventId, "failed", { errorMessage });
}
async isProcessed(eventId: string): Promise<boolean> {
const event = await this.findByEventId(eventId);
return event?.status === "processed";
}
async cleanup(olderThanDays: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
return await this.delete({
status: "processed",
processedAtBefore: cutoffDate,
});
}
async resetStale(staleMinutes: number): Promise<number> {
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - staleMinutes);
return await this.update(
{
status: "processing",
receivedAtBefore: cutoffTime,
},
{
status: "failed",
errorMessage: "Processing timeout",
},
);
}
async getCountByStatus(): Promise<Record<string, number>> {
return await this.groupCount("status");
}
// Helper methods (implement based on your database)
private async insert(params: Partial<InboxEvent>): Promise<InboxEvent> {
throw new Error("Not implemented");
}
private async findByEventId(eventId: string): Promise<InboxEvent | null> {
throw new Error("Not implemented");
}
private async updateStatus(
eventId: string,
status: string,
extra?: Record<string, any>,
): Promise<void> {
throw new Error("Not implemented");
}
private async delete(criteria: any): Promise<number> {
throw new Error("Not implemented");
}
private async update(criteria: any, updates: any): Promise<number> {
throw new Error("Not implemented");
}
private async groupCount(field: string): Promise<Record<string, number>> {
throw new Error("Not implemented");
}
private isDuplicateKeyError(error: any): boolean {
throw new Error("Not implemented");
}
}
PostgreSQL Implementation
Schema
-- Outbox table
CREATE TABLE nats_outbox_events (
event_id VARCHAR(255) PRIMARY KEY,
subject VARCHAR(500) NOT NULL,
payload TEXT NOT NULL,
headers TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
enqueued_at TIMESTAMP NOT NULL DEFAULT NOW(),
sent_at TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_outbox_status_enqueued ON nats_outbox_events(status, enqueued_at);
CREATE INDEX idx_outbox_sent_at ON nats_outbox_events(sent_at) WHERE sent_at IS NOT NULL;
-- Inbox table
CREATE TABLE nats_inbox_events (
event_id VARCHAR(255) PRIMARY KEY,
subject VARCHAR(500) NOT NULL,
payload TEXT NOT NULL,
headers TEXT,
stream VARCHAR(255),
stream_seq BIGINT,
status VARCHAR(50) NOT NULL DEFAULT 'processing',
received_at TIMESTAMP NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP,
deliveries INTEGER DEFAULT 1,
error_message TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX idx_inbox_stream_seq ON nats_inbox_events(stream, stream_seq)
WHERE stream IS NOT NULL;
CREATE INDEX idx_inbox_status_processed ON nats_inbox_events(status, processed_at);
CREATE INDEX idx_inbox_received_at ON nats_inbox_events(received_at);
Implementation with pg
import { Pool } from "pg";
import {
OutboxRepository,
OutboxEvent,
InboxRepository,
InboxEvent,
} from "nats-pubsub";
export class PostgresOutboxRepository implements OutboxRepository {
constructor(private pool: Pool) {}
async findOrCreate(params: Partial<OutboxEvent>): Promise<OutboxEvent> {
const query = `
INSERT INTO nats_outbox_events (
event_id, subject, payload, headers, status, enqueued_at
) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (event_id) DO NOTHING
RETURNING *
`;
const values = [
params.eventId,
params.subject,
params.payload,
params.headers,
"pending",
new Date(),
];
const result = await this.pool.query(query, values);
if (result.rows.length > 0) {
return this.mapRow(result.rows[0]);
}
// Event already exists, fetch it
const fetchQuery = "SELECT * FROM nats_outbox_events WHERE event_id = $1";
const fetchResult = await this.pool.query(fetchQuery, [params.eventId]);
return this.mapRow(fetchResult.rows[0]);
}
async markAsSent(eventId: string): Promise<void> {
const query = `
UPDATE nats_outbox_events
SET status = 'sent', sent_at = $1, updated_at = $1
WHERE event_id = $2
`;
await this.pool.query(query, [new Date(), eventId]);
}
async markAsPublishing(eventId: string): Promise<void> {
const query = `
UPDATE nats_outbox_events
SET status = 'publishing', updated_at = $1
WHERE event_id = $2
`;
await this.pool.query(query, [new Date(), eventId]);
}
async markAsFailed(eventId: string, errorMessage: string): Promise<void> {
const query = `
UPDATE nats_outbox_events
SET status = 'failed', error_message = $1, updated_at = $2
WHERE event_id = $3
`;
await this.pool.query(query, [errorMessage, new Date(), eventId]);
}
async findPending(options?: { limit?: number }): Promise<OutboxEvent[]> {
const limit = options?.limit || 100;
const query = `
SELECT * FROM nats_outbox_events
WHERE status = 'pending'
ORDER BY enqueued_at ASC
LIMIT $1
`;
const result = await this.pool.query(query, [limit]);
return result.rows.map((row) => this.mapRow(row));
}
async cleanup(olderThanDays: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
const query = `
DELETE FROM nats_outbox_events
WHERE status = 'sent' AND sent_at < $1
`;
const result = await this.pool.query(query, [cutoffDate]);
return result.rowCount || 0;
}
async resetStale(staleMinutes: number): Promise<number> {
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - staleMinutes);
const query = `
UPDATE nats_outbox_events
SET status = 'pending', updated_at = $1
WHERE status = 'publishing' AND enqueued_at < $2
`;
const result = await this.pool.query(query, [new Date(), cutoffTime]);
return result.rowCount || 0;
}
async getCountByStatus(): Promise<Record<string, number>> {
const query = `
SELECT status, COUNT(*) as count
FROM nats_outbox_events
GROUP BY status
`;
const result = await this.pool.query(query);
const counts: Record<string, number> = {};
for (const row of result.rows) {
counts[row.status] = parseInt(row.count);
}
return counts;
}
private mapRow(row: any): OutboxEvent {
return {
eventId: row.event_id,
subject: row.subject,
payload: row.payload,
headers: row.headers,
status: row.status,
enqueuedAt: row.enqueued_at,
sentAt: row.sent_at,
errorMessage: row.error_message,
};
}
}
export class PostgresInboxRepository implements InboxRepository {
constructor(private pool: Pool) {}
async findOrCreate(params: Partial<InboxEvent>): Promise<InboxEvent> {
const query = `
INSERT INTO nats_inbox_events (
event_id, subject, payload, headers, stream, stream_seq,
status, received_at, deliveries
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (event_id) DO NOTHING
RETURNING *
`;
const values = [
params.eventId,
params.subject,
params.payload,
params.headers,
params.stream,
params.streamSeq,
"processing",
new Date(),
params.deliveries || 1,
];
const result = await this.pool.query(query, values);
if (result.rows.length > 0) {
return this.mapRow(result.rows[0]);
}
// Event already exists, fetch it
const fetchQuery = "SELECT * FROM nats_inbox_events WHERE event_id = $1";
const fetchResult = await this.pool.query(fetchQuery, [params.eventId]);
return this.mapRow(fetchResult.rows[0]);
}
async markAsProcessed(eventId: string): Promise<void> {
const query = `
UPDATE nats_inbox_events
SET status = 'processed', processed_at = $1, updated_at = $1
WHERE event_id = $2
`;
await this.pool.query(query, [new Date(), eventId]);
}
async markAsFailed(eventId: string, errorMessage: string): Promise<void> {
const query = `
UPDATE nats_inbox_events
SET status = 'failed', error_message = $1, updated_at = $2
WHERE event_id = $3
`;
await this.pool.query(query, [errorMessage, new Date(), eventId]);
}
async isProcessed(eventId: string): Promise<boolean> {
const query = `
SELECT 1 FROM nats_inbox_events
WHERE event_id = $1 AND status = 'processed'
`;
const result = await this.pool.query(query, [eventId]);
return result.rows.length > 0;
}
async cleanup(olderThanDays: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
const query = `
DELETE FROM nats_inbox_events
WHERE status = 'processed' AND processed_at < $1
`;
const result = await this.pool.query(query, [cutoffDate]);
return result.rowCount || 0;
}
async resetStale(staleMinutes: number): Promise<number> {
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - staleMinutes);
const query = `
UPDATE nats_inbox_events
SET status = 'failed', error_message = 'Processing timeout', updated_at = $1
WHERE status = 'processing' AND received_at < $2
`;
const result = await this.pool.query(query, [new Date(), cutoffTime]);
return result.rowCount || 0;
}
async getCountByStatus(): Promise<Record<string, number>> {
const query = `
SELECT status, COUNT(*) as count
FROM nats_inbox_events
GROUP BY status
`;
const result = await this.pool.query(query);
const counts: Record<string, number> = {};
for (const row of result.rows) {
counts[row.status] = parseInt(row.count);
}
return counts;
}
private mapRow(row: any): InboxEvent {
return {
eventId: row.event_id,
subject: row.subject,
payload: row.payload,
headers: row.headers,
stream: row.stream,
streamSeq: row.stream_seq,
status: row.status,
receivedAt: row.received_at,
processedAt: row.processed_at,
deliveries: row.deliveries,
errorMessage: row.error_message,
};
}
}
MySQL Implementation
Schema
-- Outbox table
CREATE TABLE nats_outbox_events (
event_id VARCHAR(255) PRIMARY KEY,
subject VARCHAR(500) NOT NULL,
payload LONGTEXT NOT NULL,
headers TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
enqueued_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
sent_at TIMESTAMP NULL,
error_message TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_outbox_status_enqueued (status, enqueued_at),
INDEX idx_outbox_sent_at (sent_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- Inbox table
CREATE TABLE nats_inbox_events (
event_id VARCHAR(255) PRIMARY KEY,
subject VARCHAR(500) NOT NULL,
payload LONGTEXT NOT NULL,
headers TEXT,
stream VARCHAR(255),
stream_seq BIGINT,
status VARCHAR(50) NOT NULL DEFAULT 'processing',
received_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP NULL,
deliveries INT DEFAULT 1,
error_message TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY idx_inbox_stream_seq (stream, stream_seq),
INDEX idx_inbox_status_processed (status, processed_at),
INDEX idx_inbox_received_at (received_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Implementation with mysql2
import mysql from "mysql2/promise";
import { OutboxRepository, OutboxEvent } from "nats-pubsub";
export class MySQLOutboxRepository implements OutboxRepository {
constructor(private pool: mysql.Pool) {}
async findOrCreate(params: Partial<OutboxEvent>): Promise<OutboxEvent> {
const conn = await this.pool.getConnection();
try {
// Try insert with INSERT IGNORE
const insertQuery = `
INSERT IGNORE INTO nats_outbox_events (
event_id, subject, payload, headers, status, enqueued_at
) VALUES (?, ?, ?, ?, 'pending', NOW())
`;
await conn.execute(insertQuery, [
params.eventId,
params.subject,
params.payload,
params.headers,
]);
// Fetch the event (either just inserted or already existing)
const [rows] = await conn.execute(
"SELECT * FROM nats_outbox_events WHERE event_id = ?",
[params.eventId],
);
return this.mapRow((rows as any[])[0]);
} finally {
conn.release();
}
}
async markAsSent(eventId: string): Promise<void> {
const query = `
UPDATE nats_outbox_events
SET status = 'sent', sent_at = NOW()
WHERE event_id = ?
`;
await this.pool.execute(query, [eventId]);
}
async markAsPublishing(eventId: string): Promise<void> {
const query = `
UPDATE nats_outbox_events
SET status = 'publishing'
WHERE event_id = ?
`;
await this.pool.execute(query, [eventId]);
}
async markAsFailed(eventId: string, errorMessage: string): Promise<void> {
const query = `
UPDATE nats_outbox_events
SET status = 'failed', error_message = ?
WHERE event_id = ?
`;
await this.pool.execute(query, [errorMessage, eventId]);
}
async findPending(options?: { limit?: number }): Promise<OutboxEvent[]> {
const limit = options?.limit || 100;
const query = `
SELECT * FROM nats_outbox_events
WHERE status = 'pending'
ORDER BY enqueued_at ASC
LIMIT ?
`;
const [rows] = await this.pool.execute(query, [limit]);
return (rows as any[]).map((row) => this.mapRow(row));
}
async cleanup(olderThanDays: number): Promise<number> {
const query = `
DELETE FROM nats_outbox_events
WHERE status = 'sent'
AND sent_at < DATE_SUB(NOW(), INTERVAL ? DAY)
`;
const [result] = await this.pool.execute(query, [olderThanDays]);
return (result as any).affectedRows || 0;
}
async resetStale(staleMinutes: number): Promise<number> {
const query = `
UPDATE nats_outbox_events
SET status = 'pending'
WHERE status = 'publishing'
AND enqueued_at < DATE_SUB(NOW(), INTERVAL ? MINUTE)
`;
const [result] = await this.pool.execute(query, [staleMinutes]);
return (result as any).affectedRows || 0;
}
async getCountByStatus(): Promise<Record<string, number>> {
const query = `
SELECT status, COUNT(*) as count
FROM nats_outbox_events
GROUP BY status
`;
const [rows] = await this.pool.execute(query);
const counts: Record<string, number> = {};
for (const row of rows as any[]) {
counts[row.status] = parseInt(row.count);
}
return counts;
}
private mapRow(row: any): OutboxEvent {
return {
eventId: row.event_id,
subject: row.subject,
payload: row.payload,
headers: row.headers,
status: row.status,
enqueuedAt: row.enqueued_at,
sentAt: row.sent_at,
errorMessage: row.error_message,
};
}
}