NatsPubsub Internals
This comprehensive guide provides a deep dive into the internal workings of NatsPubsub, including publishing mechanisms, subscribing mechanics, message formats, acknowledgment protocols, and performance optimizations.
Table of Contents
- Overview
- Publishing Internals
- Subscribing Internals
- Message Envelope Structure
- Acknowledgment Protocol
- Retry Mechanism
- Connection Pooling Internals
- Memory Management
- Performance Optimizations
- Serialization and Deserialization
- Error Handling Internals
- Middleware Pipeline
Overview
NatsPubsub is built on several layers of abstraction, each optimized for performance, reliability, and developer experience.
Architecture Layers
Publishing Internals
Publishing Pipeline
Subject Building Algorithm
Input:
- Environment:
production - App name:
orders - Topic:
order.created
Algorithm:
function buildSubject(env: string, app: string, topic: string): string {
// 1. Normalize inputs
const normalizedEnv = env.toLowerCase().trim();
const normalizedApp = app
.toLowerCase()
.trim()
.replace(/[^a-z0-9-]/g, "-");
const normalizedTopic = topic.toLowerCase().trim();
// 2. Validate components
if (!normalizedEnv || !normalizedApp || !normalizedTopic) {
throw new Error("Invalid subject components");
}
// 3. Handle wildcards
// Single level wildcard: * -> *
// Multi level wildcard: > -> >
const topicParts = normalizedTopic.split(".");
const validatedParts = topicParts.map((part) => {
if (part === "*" || part === ">") return part;
if (!/^[a-z0-9-]+$/.test(part)) {
throw new Error(`Invalid topic part: ${part}`);
}
return part;
});
// 4. Build subject
return `${normalizedEnv}.${normalizedApp}.${validatedParts.join(".")}`;
}
// Example:
buildSubject("production", "orders", "order.created");
// Returns: "production.orders.order.created"
Envelope Construction
Envelope Structure:
interface MessageEnvelope {
event_id: string; // Unique identifier
occurred_at: string; // ISO 8601 timestamp
message: Record<string, unknown>; // Actual payload
metadata?: {
trace_id?: string; // Distributed tracing
correlation_id?: string; // Request correlation
causation_id?: string; // Causal relationship
user_id?: string; // User context
[key: string]: unknown; // Custom metadata
};
}
Building Algorithm:
class EnvelopeBuilder {
constructor(private appName: string) {}
buildEnvelope(
message: Record<string, unknown>,
options?: {
eventId?: string;
occurredAt?: Date;
traceId?: string;
metadata?: Record<string, unknown>;
},
): MessageEnvelope {
// 1. Generate or use provided event ID
const eventId = options?.eventId || this.generateEventId();
// 2. Set timestamp
const occurredAt = options?.occurredAt || new Date();
// 3. Extract trace context (from global context, headers, etc.)
const traceId = options?.traceId || this.extractTraceId();
// 4. Build envelope
const envelope: MessageEnvelope = {
event_id: eventId,
occurred_at: occurredAt.toISOString(),
message,
};
// 5. Add metadata if present
if (options?.metadata || traceId) {
envelope.metadata = {
...options?.metadata,
trace_id: traceId,
app_name: this.appName,
};
}
return envelope;
}
private generateEventId(): string {
// Use UUIDv4 or custom ID generation
return `${this.appName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
private extractTraceId(): string | undefined {
// Extract from async local storage, continuation-local-storage, etc.
return undefined;
}
}
Message Serialization
class MessageSerializer {
serialize(envelope: MessageEnvelope): Uint8Array {
// 1. Convert to JSON
const json = JSON.stringify(envelope);
// 2. Compress if enabled
if (this.compressionEnabled && json.length > this.compressionThreshold) {
return this.compress(json);
}
// 3. Convert to bytes
return new TextEncoder().encode(json);
}
deserialize(data: Uint8Array): MessageEnvelope {
// 1. Check if compressed
if (this.isCompressed(data)) {
data = this.decompress(data);
}
// 2. Convert to string
const json = new TextDecoder().decode(data);
// 3. Parse JSON
return JSON.parse(json) as MessageEnvelope;
}
private compress(data: string): Uint8Array {
// Use gzip, zlib, or other compression
return gzip(Buffer.from(data));
}
private decompress(data: Uint8Array): Uint8Array {
// Decompress
return gunzip(data);
}
private isCompressed(data: Uint8Array): boolean {
// Check magic bytes for gzip
return data[0] === 0x1f && data[1] === 0x8b;
}
}
JetStream Publishing
async publishToJetStream(
subject: string,
data: Uint8Array,
options: {
msgID?: string;
headers?: Record<string, string>;
timeout?: number;
}
): Promise<PubAck> {
const js = this.connection.jetstream();
// 1. Build headers
const headers = this.buildHeaders(options.headers, options.msgID);
// 2. Publish with timeout
const pubAck = await Promise.race([
js.publish(subject, data, {
msgID: options.msgID,
headers,
}),
this.timeout(options.timeout || 5000),
]);
// 3. Verify acknowledgment
if (!pubAck.stream || !pubAck.seq) {
throw new Error('Invalid publish acknowledgment');
}
return pubAck;
}
private buildHeaders(
customHeaders?: Record<string, string>,
msgID?: string
): Headers {
const headers = headers();
// Add message ID for deduplication
if (msgID) {
headers.set('Nats-Msg-Id', msgID);
}
// Add custom headers
if (customHeaders) {
Object.entries(customHeaders).forEach(([key, value]) => {
headers.set(key, value);
});
}
// Add metadata
headers.set('Nats-Published-At', new Date().toISOString());
return headers;
}
Subscribing Internals
Consumer Creation
Message Fetching Loop
class Consumer {
private processing = new Set<string>();
private concurrency: number;
private batchSize: number;
async startConsuming(subscriber: Subscriber): Promise<void> {
const consumer = await this.createPullConsumer(subscriber);
// Main fetch loop
while (!this.stopping) {
try {
// 1. Check available capacity
const availableSlots = this.concurrency - this.processing.size;
if (availableSlots <= 0) {
await this.waitForCapacity();
continue;
}
// 2. Fetch batch of messages
const batch = Math.min(availableSlots, this.batchSize);
const messages = await consumer.fetch({
max_messages: batch,
expires: 30000, // 30 seconds
});
// 3. Process messages concurrently
for await (const msg of messages) {
// Don't await - process in background
this.processMessage(msg, subscriber);
}
} catch (error) {
this.handleFetchError(error);
await this.backoff();
}
}
}
private async processMessage(
msg: JsMsg,
subscriber: Subscriber,
): Promise<void> {
const msgId = this.getMessageId(msg);
this.processing.add(msgId);
try {
// 1. Deserialize message
const envelope = this.deserialize(msg.data);
// 2. Build metadata
const metadata = this.buildMetadata(msg, envelope);
// 3. Execute middleware chain
await this.middlewareChain.execute(
envelope.message,
metadata,
async () => {
// 4. Call subscriber handler
await subscriber.handle(envelope.message, metadata);
},
);
// 5. ACK message
msg.ack();
} catch (error) {
// 6. Handle error (retry or DLQ)
await this.handleProcessingError(msg, error);
} finally {
this.processing.delete(msgId);
}
}
private buildMetadata(msg: JsMsg, envelope: MessageEnvelope): EventMetadata {
const info = msg.info;
return {
subject: msg.subject,
eventId: envelope.event_id,
occurredAt: new Date(envelope.occurred_at),
receivedAt: new Date(),
stream: info.stream,
streamSeq: info.streamSequence,
consumerSeq: info.consumerSequence,
deliveries: info.delivered,
pending: info.pending,
headers: this.extractHeaders(msg),
...envelope.metadata,
};
}
}
Message Processing Pipeline
Concurrent Processing
class ConcurrentProcessor {
private slots: Semaphore;
private processing: Map<string, Promise<void>>;
constructor(concurrency: number) {
this.slots = new Semaphore(concurrency);
this.processing = new Map();
}
async process(
messageId: string,
handler: () => Promise<void>,
): Promise<void> {
// 1. Acquire slot
await this.slots.acquire();
// 2. Create processing promise
const promise = (async () => {
try {
await handler();
} finally {
// 3. Release slot
this.slots.release();
this.processing.delete(messageId);
}
})();
// 4. Track processing
this.processing.set(messageId, promise);
return promise;
}
async waitForCompletion(): Promise<void> {
await Promise.all(Array.from(this.processing.values()));
}
getProcessingCount(): number {
return this.processing.size;
}
}
class Semaphore {
private available: number;
private waiting: Array<() => void> = [];
constructor(private capacity: number) {
this.available = capacity;
}
async acquire(): Promise<void> {
if (this.available > 0) {
this.available--;
return;
}
// Wait for availability
return new Promise((resolve) => {
this.waiting.push(resolve);
});
}
release(): void {
const next = this.waiting.shift();
if (next) {
next();
} else {
this.available++;
}
}
}
Message Envelope Structure
Envelope Format
interface MessageEnvelope {
// Core fields
event_id: string; // Unique identifier for idempotency
occurred_at: string; // ISO 8601 timestamp
message: Record<string, unknown>; // Actual payload
// Optional metadata
metadata?: {
// Tracing
trace_id?: string; // Distributed tracing ID
span_id?: string; // Span ID for tracing
parent_span_id?: string; // Parent span ID
// Correlation
correlation_id?: string; // Request correlation
causation_id?: string; // Event causation
// Context
user_id?: string; // User context
tenant_id?: string; // Multi-tenancy
session_id?: string; // Session context
// Application
app_name?: string; // Source application
app_version?: string; // Application version
environment?: string; // Environment (prod, dev, etc.)
// Custom
[key: string]: unknown;
};
}
Example Envelopes
Simple Message:
{
"event_id": "order-123-1234567890",
"occurred_at": "2025-01-17T10:30:00.000Z",
"message": {
"order_id": "ORD-123",
"customer_id": "CUST-456",
"total": 99.99,
"status": "pending"
}
}
Rich Message with Metadata:
{
"event_id": "payment-789-1234567891",
"occurred_at": "2025-01-17T10:31:00.000Z",
"message": {
"payment_id": "PAY-789",
"order_id": "ORD-123",
"amount": 99.99,
"currency": "USD",
"status": "completed"
},
"metadata": {
"trace_id": "abc123xyz789",
"span_id": "span-001",
"correlation_id": "req-123",
"causation_id": "order-123-1234567890",
"user_id": "user-456",
"tenant_id": "tenant-001",
"app_name": "payment-service",
"app_version": "1.2.3",
"environment": "production",
"payment_method": "credit_card",
"payment_provider": "stripe"
}
}
Envelope Size Optimization
class EnvelopeOptimizer {
// Estimate envelope size
estimateSize(envelope: MessageEnvelope): number {
return Buffer.from(JSON.stringify(envelope)).length;
}
// Optimize envelope by removing optional fields
optimize(envelope: MessageEnvelope, maxSize: number): MessageEnvelope {
let size = this.estimateSize(envelope);
if (size <= maxSize) {
return envelope;
}
// Remove optional metadata fields
const optimized = { ...envelope };
if (optimized.metadata) {
// Remove in priority order (least important first)
const removable = [
"app_version",
"session_id",
"parent_span_id",
"span_id",
];
for (const field of removable) {
if (size <= maxSize) break;
delete optimized.metadata[field];
size = this.estimateSize(optimized);
}
}
if (size > maxSize) {
throw new Error(`Message too large: ${size} bytes (max: ${maxSize})`);
}
return optimized;
}
}
Acknowledgment Protocol
ACK Types
ACK Implementation
class AckHandler {
async ack(msg: JsMsg): Promise<void> {
try {
// Acknowledge successful processing
await msg.ack();
this.metrics.incrementAcks();
} catch (error) {
this.logger.error("ACK failed", { error });
throw error;
}
}
async nak(msg: JsMsg, delay?: number): Promise<void> {
try {
if (delay) {
// NAK with delay (exponential backoff)
await msg.nakWithDelay(delay * 1000000000); // Convert to nanoseconds
} else {
// NAK immediately (instant retry)
await msg.nak();
}
this.metrics.incrementNaks();
} catch (error) {
this.logger.error("NAK failed", { error });
throw error;
}
}
async term(msg: JsMsg): Promise<void> {
try {
// Terminate processing (permanent failure)
await msg.term();
this.metrics.incrementTerms();
} catch (error) {
this.logger.error("TERM failed", { error });
throw error;
}
}
async inProgress(msg: JsMsg): Promise<void> {
try {
// Extend ACK wait time
await msg.working();
this.metrics.incrementInProgress();
} catch (error) {
this.logger.error("In-progress update failed", { error });
throw error;
}
}
}
ACK Wait Extension
For long-running tasks:
class LongRunningProcessor {
async processWithProgress(
msg: JsMsg,
handler: () => Promise<void>,
): Promise<void> {
const ackWait = 30000; // 30 seconds
const progressInterval = 20000; // 20 seconds
// Start progress heartbeat
const heartbeat = setInterval(async () => {
try {
await msg.working();
this.logger.debug("Extended ACK wait");
} catch (error) {
this.logger.error("Failed to extend ACK wait", { error });
}
}, progressInterval);
try {
// Process message
await handler();
// ACK on success
await msg.ack();
} catch (error) {
// NAK on failure
await msg.nak();
throw error;
} finally {
clearInterval(heartbeat);
}
}
}
Retry Mechanism
Retry Strategy
interface RetryConfig {
maxAttempts: number;
backoff: number[]; // Delays in milliseconds
backoffMultiplier?: number; // For exponential backoff
maxBackoff?: number; // Maximum delay
}
class RetryStrategy {
constructor(private config: RetryConfig) {}
shouldRetry(attempt: number): boolean {
return attempt < this.config.maxAttempts;
}
getDelay(attempt: number): number {
// Use configured backoff if available
if (attempt < this.config.backoff.length) {
return this.config.backoff[attempt];
}
// Calculate exponential backoff
const base = this.config.backoff[this.config.backoff.length - 1] || 1000;
const multiplier = this.config.backoffMultiplier || 2;
const delay =
base * Math.pow(multiplier, attempt - this.config.backoff.length);
// Apply max backoff cap
return Math.min(delay, this.config.maxBackoff || 60000);
}
}
// Default configuration
const defaultRetry: RetryConfig = {
maxAttempts: 5,
backoff: [1000, 2000, 5000, 10000, 30000], // 1s, 2s, 5s, 10s, 30s
};
Retry Implementation
class RetryHandler {
private strategy: RetryStrategy;
async handleWithRetry(
msg: JsMsg,
handler: () => Promise<void>,
): Promise<void> {
const info = msg.info;
const attempt = info.delivered;
try {
await handler();
await msg.ack();
} catch (error) {
if (this.strategy.shouldRetry(attempt)) {
// Calculate delay
const delay = this.strategy.getDelay(attempt);
this.logger.warn("Processing failed, will retry", {
attempt,
delay,
error: error.message,
});
// NAK with delay
await msg.nakWithDelay(delay * 1000000); // Convert to nanoseconds
} else {
this.logger.error("Max retries exceeded", {
attempt,
error: error.message,
});
// Send to DLQ
await this.sendToDLQ(msg, error);
// ACK original message
await msg.ack();
}
}
}
}
Retry Visualization
Connection Pooling Internals
Pool Architecture
class ConnectionPool {
private connections: NatsConnection[] = [];
private available: NatsConnection[] = [];
private inUse: Set<NatsConnection> = new Set();
private waiting: Array<(conn: NatsConnection) => void> = [];
private readonly minSize: number;
private readonly maxSize: number;
private size: number = 0;
constructor(config: PoolConfig) {
this.minSize = config.minSize || 2;
this.maxSize = config.maxSize || 10;
}
async initialize(): Promise<void> {
// Create minimum connections
for (let i = 0; i < this.minSize; i++) {
const conn = await this.createConnection();
this.connections.push(conn);
this.available.push(conn);
this.size++;
}
}
async acquire(): Promise<NatsConnection> {
// Return available connection
if (this.available.length > 0) {
const conn = this.available.pop()!;
this.inUse.add(conn);
return conn;
}
// Create new connection if under max
if (this.size < this.maxSize) {
const conn = await this.createConnection();
this.connections.push(conn);
this.inUse.add(conn);
this.size++;
return conn;
}
// Wait for available connection
return new Promise((resolve) => {
this.waiting.push(resolve);
});
}
release(conn: NatsConnection): void {
this.inUse.delete(conn);
// Give to waiting requester
const next = this.waiting.shift();
if (next) {
this.inUse.add(conn);
next(conn);
return;
}
// Add back to available
this.available.push(conn);
}
async destroy(): Promise<void> {
// Close all connections
await Promise.all(this.connections.map((conn) => conn.close()));
this.connections = [];
this.available = [];
this.inUse.clear();
this.size = 0;
}
}
Connection Health Monitoring
class HealthMonitor {
private healthChecks: Map<NatsConnection, HealthStatus> = new Map();
private checkInterval: NodeJS.Timeout;
startMonitoring(pool: ConnectionPool): void {
this.checkInterval = setInterval(async () => {
await this.checkAllConnections(pool);
}, 10000); // Check every 10 seconds
}
private async checkAllConnections(pool: ConnectionPool): Promise<void> {
for (const conn of pool.getConnections()) {
const healthy = await this.checkConnection(conn);
if (!healthy) {
await this.handleUnhealthyConnection(conn, pool);
}
}
}
private async checkConnection(conn: NatsConnection): Promise<boolean> {
try {
// Check connection state
if (conn.isClosed()) {
return false;
}
// Ping NATS server
await conn.flush();
return true;
} catch (error) {
this.logger.error("Health check failed", { error });
return false;
}
}
private async handleUnhealthyConnection(
conn: NatsConnection,
pool: ConnectionPool,
): Promise<void> {
this.logger.warn("Unhealthy connection detected, reconnecting");
// Remove from pool
pool.removeConnection(conn);
try {
// Close old connection
await conn.close();
} catch (error) {
// Ignore close errors
}
// Create replacement
try {
const newConn = await pool.createConnection();
pool.addConnection(newConn);
} catch (error) {
this.logger.error("Failed to create replacement connection", { error });
}
}
stopMonitoring(): void {
clearInterval(this.checkInterval);
}
}
Memory Management
Memory Pool for Messages
class MessagePool {
private pool: Array<Uint8Array> = [];
private readonly maxSize: number;
private readonly bufferSize: number;
constructor(maxSize: number = 100, bufferSize: number = 64 * 1024) {
this.maxSize = maxSize;
this.bufferSize = bufferSize;
}
acquire(): Uint8Array {
// Reuse from pool
const buffer = this.pool.pop();
if (buffer) {
return buffer;
}
// Allocate new buffer
return new Uint8Array(this.bufferSize);
}
release(buffer: Uint8Array): void {
if (this.pool.length < this.maxSize) {
// Clear buffer
buffer.fill(0);
// Return to pool
this.pool.push(buffer);
}
// Else let GC handle it
}
clear(): void {
this.pool = [];
}
}
Garbage Collection Optimization
class GCOptimizer {
private readonly heapThreshold = 0.85; // 85% of max heap
monitorMemory(): void {
setInterval(() => {
const usage = process.memoryUsage();
const heapUsed = usage.heapUsed;
const heapTotal = usage.heapTotal;
const ratio = heapUsed / heapTotal;
if (ratio > this.heapThreshold) {
this.triggerGC();
}
}, 30000); // Check every 30 seconds
}
private triggerGC(): void {
if (global.gc) {
global.gc();
this.logger.info("Triggered garbage collection");
}
}
}
Memory-Efficient Processing
class StreamProcessor {
async processLargeDataset(query: string): Promise<void> {
// Use streaming instead of loading all at once
const stream = await this.database.stream(query);
for await (const row of stream) {
// Process one row at a time
await this.processRow(row);
// Allow GC between rows
if (this.shouldYield()) {
await this.yield();
}
}
}
private shouldYield(): boolean {
// Yield every N iterations
return this.iteration++ % 100 === 0;
}
private async yield(): Promise<void> {
// Give event loop a chance to run
await new Promise((resolve) => setImmediate(resolve));
}
}
Performance Optimizations
Message Batching
class BatchOptimizer {
private batch: Array<Message> = [];
private timer?: NodeJS.Timeout;
private readonly maxBatchSize: number;
private readonly maxWaitMs: number;
constructor(maxBatchSize = 100, maxWaitMs = 1000) {
this.maxBatchSize = maxBatchSize;
this.maxWaitMs = maxWaitMs;
}
add(message: Message): void {
this.batch.push(message);
if (this.batch.length >= this.maxBatchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.maxWaitMs);
}
}
private async flush(): Promise<void> {
if (this.batch.length === 0) return;
clearTimeout(this.timer);
this.timer = undefined;
const toProcess = this.batch.splice(0);
try {
await this.processBatch(toProcess);
} catch (error) {
this.logger.error("Batch processing failed", { error });
}
}
private async processBatch(messages: Message[]): Promise<void> {
// Process all messages in parallel
await Promise.all(messages.map((msg) => this.processMessage(msg)));
}
}
Caching Layer
class CachingLayer {
private cache: Map<string, CacheEntry> = new Map();
private readonly maxSize: number;
private readonly ttl: number;
constructor(maxSize = 1000, ttl = 60000) {
this.maxSize = maxSize;
this.ttl = ttl;
this.startCleanup();
}
get(key: string): any | null {
const entry = this.cache.get(key);
if (!entry) {
return null;
}
// Check expiry
if (Date.now() - entry.timestamp > this.ttl) {
this.cache.delete(key);
return null;
}
// Update access time for LRU
entry.lastAccess = Date.now();
return entry.value;
}
set(key: string, value: any): void {
// Evict if at capacity
if (this.cache.size >= this.maxSize) {
this.evictOldest();
}
this.cache.set(key, {
value,
timestamp: Date.now(),
lastAccess: Date.now(),
});
}
private evictOldest(): void {
let oldest: string | null = null;
let oldestTime = Infinity;
for (const [key, entry] of this.cache.entries()) {
if (entry.lastAccess < oldestTime) {
oldest = key;
oldestTime = entry.lastAccess;
}
}
if (oldest) {
this.cache.delete(oldest);
}
}
private startCleanup(): void {
setInterval(() => {
const now = Date.now();
for (const [key, entry] of this.cache.entries()) {
if (now - entry.timestamp > this.ttl) {
this.cache.delete(key);
}
}
}, this.ttl);
}
}
Serialization and Deserialization
JSON Serialization
class JSONSerializer {
serialize(obj: any): Uint8Array {
// Fast JSON stringify
const json = JSON.stringify(obj);
return new TextEncoder().encode(json);
}
deserialize(data: Uint8Array): any {
const json = new TextDecoder().decode(data);
return JSON.parse(json);
}
}
Protocol Buffers (Optional)
// For high-performance scenarios
class ProtobufSerializer {
serialize(message: any): Uint8Array {
// Use protobuf for efficient serialization
return MessageProto.encode(message).finish();
}
deserialize(data: Uint8Array): any {
return MessageProto.decode(data);
}
}
Error Handling Internals
Error Classification
enum ErrorCategory {
TRANSIENT = "transient", // Temporary, retry
PERMANENT = "permanent", // Permanent, don't retry
VALIDATION = "validation", // Invalid input
INFRASTRUCTURE = "infrastructure", // System failure
}
class ErrorClassifier {
classify(error: Error): ErrorCategory {
// Network errors - transient
if (this.isNetworkError(error)) {
return ErrorCategory.TRANSIENT;
}
// Validation errors - permanent
if (this.isValidationError(error)) {
return ErrorCategory.PERMANENT;
}
// Database errors - check if transient
if (this.isDatabaseError(error)) {
return this.classifyDatabaseError(error);
}
// Default to permanent
return ErrorCategory.PERMANENT;
}
private isNetworkError(error: Error): boolean {
return (
error.message.includes("ECONNREFUSED") ||
error.message.includes("ETIMEDOUT") ||
error.message.includes("ENOTFOUND")
);
}
}
Middleware Pipeline
Middleware Execution
class MiddlewareChain {
private middlewares: Middleware[] = [];
use(middleware: Middleware): void {
this.middlewares.push(middleware);
}
async execute(
message: any,
metadata: any,
handler: () => Promise<void>,
): Promise<void> {
let index = 0;
const next = async (): Promise<void> => {
if (index < this.middlewares.length) {
const middleware = this.middlewares[index++];
await middleware.call(message, metadata, next);
} else {
await handler();
}
};
await next();
}
}
Related Documentation
- Architecture Guide - System architecture
- Custom Repositories - Custom storage implementation
- Security Guide - Security best practices
- Performance Guide - Performance optimization
Navigation: