Skip to main content

NatsPubsub Architecture Guide

This comprehensive guide explores the architecture of NatsPubsub, including system design, component breakdown, message flow, reliability patterns, and design decisions.

Table of Contents


Overview

NatsPubsub is a production-ready pub/sub library built on NATS JetStream, designed with reliability, scalability, and developer experience as core principles.

Architecture Principles

  1. Reliability First: Built-in patterns for message delivery guarantees
  2. Declarative API: Clean, intuitive interfaces for publishers and subscribers
  3. SOLID Principles: Modular, testable, and maintainable code
  4. Polyglot Design: Ruby and JavaScript implementations with full interoperability
  5. Cloud-Native: Designed for distributed microservices architectures

High-Level Architecture


System Architecture

Layered Architecture

NatsPubsub follows a layered architecture pattern:

Layer Responsibilities

  1. Application Layer

    • Business logic
    • Message handlers
    • Domain models
  2. API Layer

    • Publisher interface
    • Subscriber interface
    • Fluent APIs
  3. Business Logic Layer

    • Message validation
    • Schema enforcement
    • Business rules
  4. Middleware Layer

    • Logging
    • Metrics
    • Tracing
    • Error handling
  5. Reliability Layer

    • Inbox/Outbox patterns
    • DLQ handling
    • Retry logic
    • Circuit breakers
  6. Transport Layer

    • Connection management
    • Subject routing
    • Message serialization
  7. Storage Layer

    • Database persistence
    • Cache management
    • State storage

Component Architecture

Core Components

Publisher Component

Responsibilities:

  • Message publishing
  • Subject building
  • Envelope creation
  • Validation
  • Error handling

Design Pattern: Strategy Pattern

// Publisher uses strategy pattern for different publish types
class Publisher {
private connectionManager: ConnectionManager;
private envelopeBuilder: EnvelopeBuilder;
private subjectBuilder: SubjectBuilder;
private validator: PublishValidator;

// Strategy: Publish to topic
async publishToTopic(topic: string, message: any): Promise<PublishResult>;

// Strategy: Publish to multiple topics
async publishToMultipleTopics(
params: MultiTopicParams,
): Promise<MultiTopicPublishResult>;

// Strategy: Publish with domain/resource/action
async publishDomainResourceAction(
params: DomainResourceActionParams,
): Promise<PublishResult>;
}

Key Features:

  • Dependency injection for testability
  • Fluent API for batch operations
  • Automatic subject building
  • Message envelope wrapping
  • Validation before publish

Subscriber Component

Responsibilities:

  • Message consumption
  • Message processing
  • Error handling
  • Acknowledgment
  • Retry logic

Design Pattern: Template Method Pattern

// Subscriber uses template method pattern
abstract class Subscriber {
// Template method
async call(
event: Record<string, unknown>,
metadata: EventMetadata,
): Promise<void> {
// 1. Pre-processing (implemented by base class)
await this.preProcess(metadata);

// 2. Handle (implemented by subclass)
await this.handle(event, metadata);

// 3. Post-processing (implemented by base class)
await this.postProcess(metadata);
}

// Hook method - must be implemented by subclass
abstract handle(
event: Record<string, unknown>,
metadata: EventMetadata,
): Promise<void>;
}

Key Features:

  • Declarative subscription syntax
  • Automatic consumer creation
  • Concurrent message processing
  • Middleware support
  • Error boundaries

Consumer Component

Responsibilities:

  • JetStream consumer management
  • Message fetching
  • Concurrency control
  • Backpressure handling
  • Graceful shutdown

Design Pattern: Observer Pattern

class Consumer {
private subscriptions: Map<string, ConsumerSubscription>;
private processing: Set<string>;
private concurrency: number;

// Observer pattern - notify on message
async startConsuming(subscriber: Subscriber): Promise<void> {
// Create consumer
const consumer = await this.createConsumer(subscriber);

// Observe messages
for await (const msg of consumer) {
await this.processMessage(msg, subscriber);
}
}
}

Configuration Component

Responsibilities:

  • Configuration management
  • Preset application
  • Validation
  • Environment detection

Design Pattern: Singleton Pattern

// Configuration singleton
class Config {
private static instance: Config;
private config: NatsPubsubConfig;

public static getInstance(): Config {
if (!Config.instance) {
Config.instance = new Config();
}
return Config.instance;
}

public configure(options: Partial<NatsPubsubConfig>): void;
public configureWithPreset(
preset: PresetName,
overrides?: Partial<NatsPubsubConfig>,
): void;
public get(): NatsPubsubConfig;
public validate(): void;
}

Connection Manager

Responsibilities:

  • NATS connection lifecycle
  • Connection pooling
  • Reconnection logic
  • Health checking

Design Pattern: Object Pool Pattern

class ConnectionManager {
private connection: NatsConnection | null;
private reconnecting: boolean;
private healthCheckInterval: NodeJS.Timeout;

async connect(): Promise<void>;
async disconnect(): Promise<void>;
async reconnect(): Promise<void>;
getJetStream(): JetStreamClient;
isHealthy(): boolean;
}

Topology Manager

Responsibilities:

  • Stream creation
  • Consumer setup
  • Subject routing
  • Overlap detection

Design Pattern: Builder Pattern

class TopologyManager {
private streamManager: StreamManager;
private consumerManager: ConsumerManager;
private overlapGuard: OverlapGuard;

async ensureStream(config: StreamConfig): Promise<void>;
async ensureConsumer(config: ConsumerConfig): Promise<void>;
async validateTopology(): Promise<ValidationResult>;
}

Message Flow Architecture

Publishing Flow

Subscribing Flow

Batch Publishing Flow


Reliability Patterns

Outbox Pattern Architecture

Components:

  1. Outbox Repository: Database access layer
  2. Outbox Publisher: Publishing coordinator
  3. Background Worker: Processes pending events
  4. Cleanup Service: Maintains table health

Guarantees:

  • At-least-once delivery
  • Transactional consistency
  • Ordering within transaction
  • Automatic retry on failure

Inbox Pattern Architecture

Components:

  1. Inbox Repository: Database access layer
  2. Inbox Processor: Deduplication coordinator
  3. Cleanup Service: Maintains table health

Guarantees:

  • Exactly-once processing
  • Idempotent operations
  • Duplicate detection
  • Processing history

DLQ Architecture

Components:

  1. DLQ Publisher: Sends failed messages to DLQ
  2. DLQ Consumer: Processes DLQ messages
  3. DLQ Handler: Custom failure handling
  4. DLQ Storage: Persistent failed message storage

Features:

  • Automatic retry exhaustion detection
  • Metadata preservation
  • Failure reason tracking
  • Manual replay capability

Stream and Consumer Architecture

Stream Hierarchy

Consumer Types

NatsPubsub Uses:

  • Pull Consumers: For controlled message fetching
  • Durable: For persistent subscriptions
  • Batch Fetching: For high throughput

Stream Configuration

interface StreamConfig {
name: string; // Stream name
subjects: string[]; // Subject patterns
retention: "limits" | "interest" | "workqueue";
storage: "file" | "memory";
replicas: number; // Replication factor
maxMsgs: number; // Max messages
maxBytes: number; // Max storage
maxAge: number; // Message TTL (nanoseconds)
maxMsgSize: number; // Max message size
duplicateWindow: number; // Duplicate detection window
}

Example:

const streamConfig: StreamConfig = {
name: "production-events-stream",
subjects: ["production.app.>"],
retention: "interest", // Delete after all consumers ACK
storage: "file", // Persistent storage
replicas: 3, // High availability
maxMsgs: 1000000, // 1M messages max
maxBytes: 10 * 1024 * 1024 * 1024, // 10GB
maxAge: 7 * 24 * 3600 * 1e9, // 7 days
maxMsgSize: 1024 * 1024, // 1MB per message
duplicateWindow: 2 * 60 * 1e9, // 2 minutes
};

Consumer Configuration

interface ConsumerConfig {
durable_name: string; // Consumer name
filter_subject: string; // Subject filter
deliver_policy:
| "all"
| "last"
| "new"
| "by_start_sequence"
| "by_start_time";
ack_policy: "explicit" | "none" | "all";
ack_wait: number; // ACK timeout (nanoseconds)
max_deliver: number; // Max delivery attempts
max_ack_pending: number; // Max unacked messages
replay_policy: "instant" | "original";
}

Example:

const consumerConfig: ConsumerConfig = {
durable_name: "order-processor",
filter_subject: "production.app.order.*",
deliver_policy: "all", // Deliver all messages
ack_policy: "explicit", // Explicit ACK required
ack_wait: 30 * 1e9, // 30 seconds
max_deliver: 5, // 5 attempts
max_ack_pending: 100, // 100 concurrent messages
replay_policy: "instant", // Deliver as fast as possible
};

Connection Management

Connection Lifecycle

Connection Pool

Features:

  • Round-robin connection selection
  • Automatic health checking
  • Connection recovery
  • Graceful degradation

Health Check System

class HealthCheckSystem {
private checks: Map<string, HealthCheck>;

// Register health checks
registerCheck(name: string, check: HealthCheck): void;

// Run all health checks
async runChecks(): Promise<HealthStatus>;

// Individual checks
async checkConnection(): Promise<boolean>;
async checkJetStream(): Promise<boolean>;
async checkDatabase(): Promise<boolean>;
async checkDependencies(): Promise<boolean>;
}

Health Check Types:

  1. Liveness: Is the service running?
  2. Readiness: Can the service handle requests?
  3. Startup: Has initialization completed?

Concurrency Model

Message Processing Concurrency

Concurrency Control

class ConcurrencyController {
private concurrency: number;
private processing: number = 0;
private queue: Array<() => Promise<void>> = [];

async acquire(): Promise<void> {
while (this.processing >= this.concurrency) {
await this.waitForSlot();
}
this.processing++;
}

release(): void {
this.processing--;
this.processQueue();
}

async execute<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}

Backpressure Handling


Topology Management

Topology Initialization

Overlap Detection

Examples:

// ✅ Valid: Same stream, different specificity
const subscribers = [
{ subject: "production.app.order.*", stream: "orders" },
{ subject: "production.app.order.created", stream: "orders" },
];

// ❌ Invalid: Different streams, overlapping subjects
const subscribers = [
{ subject: "production.app.order.*", stream: "orders" },
{ subject: "production.app.order.created", stream: "events" },
];
// Error: Subject overlap detected across different streams

Design Decisions

Why JetStream?

Decision: Use NATS JetStream instead of core NATS

Rationale:

  1. Persistence: Messages persisted to disk/memory
  2. Delivery Guarantees: At-least-once, exactly-once capable
  3. Stream Replay: Consumers can replay message history
  4. Acknowledgments: Explicit message acknowledgment
  5. Consumer Management: Durable consumers survive restarts

Tradeoffs:

  • ✅ Reliability and guarantees
  • ✅ Message persistence
  • ✅ Consumer state management
  • ❌ Slightly higher latency
  • ❌ More complex setup

Why Pull Consumers?

Decision: Use pull consumers instead of push consumers

Rationale:

  1. Backpressure Control: Application controls fetch rate
  2. Horizontal Scaling: Easy to scale consumers
  3. Fair Distribution: Even load across consumers
  4. Batch Fetching: Efficient bulk processing

Tradeoffs:

  • ✅ Better control and scaling
  • ✅ Simpler load balancing
  • ✅ No callback complexity
  • ❌ Application must poll
  • ❌ More application code

Why Inbox/Outbox?

Decision: Built-in support for Inbox/Outbox patterns

Rationale:

  1. Reliability: Solve dual-write problem
  2. Idempotency: Exactly-once processing guarantees
  3. Best Practice: Industry-standard patterns
  4. Database Agnostic: Works with any database

Tradeoffs:

  • ✅ Strong guarantees
  • ✅ Production-ready
  • ✅ Well-understood patterns
  • ❌ Additional database tables
  • ❌ Background workers needed

Why Declarative API?

Decision: Declarative subscriber syntax with decorators

Rationale:

  1. Developer Experience: Clean, intuitive syntax
  2. Discoverability: Easy to find subscriptions in codebase
  3. Type Safety: Full TypeScript support
  4. Convention: Follows framework conventions (NestJS, Rails)

Example:

// Declarative - Clear and concise
@topicSubscriber("order.created")
class OrderSubscriber {
async handle(message: OrderMessage) {
// Process order
}
}

// vs Imperative - More boilerplate
const subscription = await nc.subscribe("order.created");
for await (const msg of subscription) {
const message = JSON.parse(msg.data);
// Process order
msg.ack();
}

Why Environment Prefixing?

Decision: Automatic environment prefixing of subjects

Format: {env}.{app}.{topic}

Rationale:

  1. Isolation: Prevent cross-environment message leaks
  2. Multi-Tenancy: Multiple apps on same NATS cluster
  3. Safety: Dev/staging can't affect production
  4. Organization: Clear ownership of subjects

Example:

// Configuration
NatsPubsub.configure({
env: "production",
appName: "orders",
});

// Publishing to 'order.created' becomes:
// Subject: production.orders.order.created

Why Middleware?

Decision: Middleware chain for cross-cutting concerns

Rationale:

  1. Separation of Concerns: Keep handlers focused
  2. Composability: Mix and match middleware
  3. Reusability: Share logic across subscribers
  4. Testability: Test middleware in isolation

Example:

// Middleware composition
NatsPubsub.use(new LoggingMiddleware());
NatsPubsub.use(new MetricsMiddleware());
NatsPubsub.use(new TracingMiddleware());
NatsPubsub.use(new ValidationMiddleware());

// Clean subscriber
@topicSubscriber("order.created")
class OrderSubscriber {
// No logging, metrics, or tracing code
async handle(message: OrderMessage) {
// Pure business logic
}
}

Scaling Architecture

Horizontal Scaling

Scaling Publishers:

  • Add more publisher instances
  • Each instance publishes independently
  • NATS handles load distribution

Scaling Subscribers:

  • Add more subscriber instances
  • Pull consumers automatically distribute work
  • Each instance processes subset of messages

Vertical Scaling

// Increase concurrency per instance
NatsPubsub.configure({
concurrency: 50, // More concurrent message processing
perMessageConcurrency: 10, // More concurrent operations per message
batchSize: 100, // Larger batches
});

// Increase database pool
const pool = new Pool({
max: 50, // More database connections
min: 10,
});

Geographic Distribution

Features:

  • NATS super-cluster for global distribution
  • Local processing reduces latency
  • Data sovereignty compliance
  • Disaster recovery

Deployment Architectures

Single Application

Use Case:

  • Small applications
  • Development/testing
  • Single service architecture

Microservices

Use Case:

  • Distributed systems
  • Service isolation
  • Independent scaling

Event-Driven Architecture

Use Case:

  • CQRS pattern
  • Event sourcing
  • Real-time analytics
  • Audit logging

Serverless Architecture

Use Case:

  • Serverless applications
  • Cost optimization
  • Auto-scaling workloads

Core Guides

Integration Guides

Pattern Guides


Navigation: