Skip to main content

Observability

Comprehensive guide to observability in NatsPubsub applications, covering logs, metrics, and distributed tracing with OpenTelemetry integration for production-grade monitoring.

Table of Contents


Overview

Observability is the ability to understand the internal state of your system by examining its external outputs. For distributed messaging systems like NatsPubsub, observability is critical for:

  • Debugging: Tracing message flow across services
  • Performance: Identifying bottlenecks and latency issues
  • Reliability: Detecting failures and anomalies
  • Business Insights: Understanding usage patterns and trends

The Three Pillars


Observability Pillars

Logs

What: Text records of discrete events that occurred in your system

When to Use:

  • Debugging specific errors
  • Audit trails
  • Development troubleshooting

Example Log Entry:

{
"timestamp": "2025-01-17T10:30:45.123Z",
"level": "info",
"message": "Message published successfully",
"service": "order-service",
"trace_id": "abc-123",
"span_id": "def-456",
"topic": "order.created",
"event_id": "evt-789",
"duration_ms": 12
}

Metrics

What: Numerical measurements of system behavior over time

When to Use:

  • Performance monitoring
  • Capacity planning
  • Alerting on thresholds

Example Metrics:

  • messages_published_total - Counter
  • message_processing_duration_seconds - Histogram
  • consumer_lag_messages - Gauge

Traces

What: Records of request paths through your distributed system

When to Use:

  • Understanding request flow
  • Finding bottlenecks
  • Analyzing latency

Example Trace:

Root Span: HTTP POST /orders
├─ Span: Publish order.created (5ms)
├─ Span: Database Insert (15ms)
└─ Child Trace: order.created consumer
├─ Span: Reserve inventory (50ms)
└─ Span: Send notification (25ms)

OpenTelemetry Integration

OpenTelemetry is the industry-standard observability framework that provides vendor-neutral APIs and SDKs.

JavaScript Setup

Installation

npm install @opentelemetry/api \
@opentelemetry/sdk-node \
@opentelemetry/auto-instrumentations-node \
@opentelemetry/exporter-trace-otlp-http \
@opentelemetry/exporter-metrics-otlp-http

Basic Configuration

// instrumentation.ts
import { NodeSDK } from "@opentelemetry/sdk-node";
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http";
import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics";

const sdk = new NodeSDK({
serviceName: "order-service",
traceExporter: new OTLPTraceExporter({
url:
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ||
"http://localhost:4318/v1/traces",
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url:
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ||
"http://localhost:4318/v1/metrics",
}),
exportIntervalMillis: 60000, // Export every 60 seconds
}),
instrumentations: [
getNodeAutoInstrumentations({
"@opentelemetry/instrumentation-fs": {
enabled: false, // Disable noisy file system instrumentation
},
}),
],
});

sdk.start();

// Graceful shutdown
process.on("SIGTERM", async () => {
await sdk.shutdown();
process.exit(0);
});

export default sdk;

NatsPubsub Integration

// src/observability/nats-instrumentation.ts
import { trace, context, SpanStatusCode } from "@opentelemetry/api";
import NatsPubsub from "nats-pubsub";

const tracer = trace.getTracer("nats-pubsub");

// Instrument publishing
export function instrumentPublish() {
const originalPublish = NatsPubsub.publish.bind(NatsPubsub);

NatsPubsub.publish = async function (
topic: string,
message: any,
options?: any,
) {
const span = tracer.startSpan("nats.publish", {
attributes: {
"messaging.system": "nats",
"messaging.destination": topic,
"messaging.operation": "publish",
},
});

try {
// Inject trace context into message metadata
const carrier = {};
trace.getSpan(context.active())?.spanContext();

const result = await originalPublish(topic, message, {
...options,
trace_id: span.spanContext().traceId,
span_id: span.spanContext().spanId,
});

span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
throw error;
} finally {
span.end();
}
};
}

// Middleware for subscribers
import { Middleware, TopicMetadata } from "nats-pubsub";

export class TracingMiddleware implements Middleware {
async call(
message: any,
metadata: TopicMetadata,
next: () => Promise<void>,
): Promise<void> {
const span = tracer.startSpan("nats.consume", {
attributes: {
"messaging.system": "nats",
"messaging.destination": metadata.topic,
"messaging.operation": "consume",
"messaging.message_id": metadata.event_id,
},
});

// Extract trace context from metadata
if (metadata.trace_id) {
// Link to parent trace
span.addLink({
traceId: metadata.trace_id,
spanId: metadata.span_id || "",
});
}

try {
await context.with(trace.setSpan(context.active(), span), async () => {
await next();
});

span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
throw error;
} finally {
span.end();
}
}
}

// Initialize
export function initializeObservability() {
instrumentPublish();
NatsPubsub.use(new TracingMiddleware());
}

Ruby Setup

Installation

# Gemfile
gem 'opentelemetry-sdk'
gem 'opentelemetry-exporter-otlp'
gem 'opentelemetry-instrumentation-all'

Configuration

# config/initializers/opentelemetry.rb
require 'opentelemetry/sdk'
require 'opentelemetry/exporter/otlp'
require 'opentelemetry/instrumentation/all'

OpenTelemetry::SDK.configure do |c|
c.service_name = 'order-service'
c.service_version = '1.0.0'

# Configure OTLP exporter
c.add_span_processor(
OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.new(
OpenTelemetry::Exporter::OTLP::Exporter.new(
endpoint: ENV['OTEL_EXPORTER_OTLP_ENDPOINT'] || 'http://localhost:4318/v1/traces'
)
)
)

# Auto-instrument common libraries
c.use_all
end

NatsPubsub Integration

# lib/observability/nats_instrumentation.rb
module Observability
module NatsInstrumentation
def self.tracer
@tracer ||= OpenTelemetry.tracer_provider.tracer('nats-pubsub')
end

# Middleware for tracing
class TracingMiddleware
def call(message, context)
span = Observability::NatsInstrumentation.tracer.start_span(
'nats.consume',
attributes: {
'messaging.system' => 'nats',
'messaging.destination' => context.topic,
'messaging.operation' => 'consume',
'messaging.message_id' => context.event_id
}
)

# Extract trace context
if context.trace_id
span.add_link(
OpenTelemetry::Trace::Link.new(
OpenTelemetry::Trace::SpanContext.new(
trace_id: context.trace_id,
span_id: context.span_id || '0'
)
)
)
end

begin
OpenTelemetry::Trace.with_span(span) do
yield
end
span.status = OpenTelemetry::Trace::Status.ok
rescue StandardError => e
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error(e.message)
raise
ensure
span.finish
end
end
end
end
end

# Register middleware
NatsPubsub.configure do |config|
config.middleware << Observability::NatsInstrumentation::TracingMiddleware.new
end

Distributed Tracing

Trace Context Propagation

Trace context must be propagated across service boundaries to maintain visibility into end-to-end request flows.

JavaScript Context Propagation

// Publisher: Inject trace context
import { trace, context as otelContext } from "@opentelemetry/api";

async function publishWithTracing(topic: string, message: any) {
const span = trace.getActiveSpan();

await NatsPubsub.publish(topic, message, {
trace_id: span?.spanContext().traceId,
span_id: span?.spanContext().spanId,
trace_flags: span?.spanContext().traceFlags,
});
}

// Subscriber: Extract and continue trace
class OrderSubscriber extends Subscriber {
async handle(message: any, metadata: TopicMetadata) {
// Create new span linked to parent
const span = tracer.startSpan("process-order", {
links: [
{
context: {
traceId: metadata.trace_id,
spanId: metadata.span_id,
traceFlags: metadata.trace_flags,
},
},
],
});

try {
await otelContext.with(
trace.setSpan(otelContext.active(), span),
async () => {
await this.processOrder(message);
},
);
} finally {
span.end();
}
}
}

Ruby Context Propagation

# Publisher: Inject trace context
def publish_with_tracing(topic, message)
span = OpenTelemetry::Trace.current_span

NatsPubsub.publish(
topic: topic,
message: message,
trace_id: span.context.hex_trace_id,
span_id: span.context.hex_span_id
)
end

# Subscriber: Extract and continue trace
class OrderSubscriber < NatsPubsub::Subscriber
subscribe_to 'order.created'

def handle(message, context)
tracer = OpenTelemetry.tracer_provider.tracer('order-subscriber')

span = tracer.start_span(
'process-order',
links: [
OpenTelemetry::Trace::Link.new(
OpenTelemetry::Trace::SpanContext.new(
trace_id: context.trace_id
)
)
]
)

begin
OpenTelemetry::Trace.with_span(span) do
process_order(message)
end
ensure
span.finish
end
end
end

Spans and Attributes

Creating Custom Spans

// JavaScript
import { trace, SpanStatusCode } from "@opentelemetry/api";

const tracer = trace.getTracer("order-service");

async function processOrder(orderId: string) {
const span = tracer.startSpan("process-order", {
attributes: {
"order.id": orderId,
"service.name": "order-service",
},
});

try {
// Add events to span
span.addEvent("order-validation-started");
await validateOrder(orderId);
span.addEvent("order-validation-completed");

// Add more attributes
span.setAttribute("order.total", 99.99);
span.setAttribute("order.items_count", 3);

await saveOrder(orderId);

span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
throw error;
} finally {
span.end();
}
}
# Ruby
tracer = OpenTelemetry.tracer_provider.tracer('order-service')

def process_order(order_id)
span = tracer.start_span(
'process-order',
attributes: {
'order.id' => order_id,
'service.name' => 'order-service'
}
)

begin
span.add_event('order-validation-started')
validate_order(order_id)
span.add_event('order-validation-completed')

span.set_attribute('order.total', 99.99)
span.set_attribute('order.items_count', 3)

save_order(order_id)

span.status = OpenTelemetry::Trace::Status.ok
rescue StandardError => e
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error(e.message)
raise
ensure
span.finish
end
end

Structured Logging

Log Levels and Formatting

JavaScript Structured Logging

// src/observability/logger.ts
import winston from "winston";
import { trace } from "@opentelemetry/api";

export const logger = winston.createLogger({
level: process.env.LOG_LEVEL || "info",
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json(),
),
defaultMeta: {
service: "order-service",
environment: process.env.NODE_ENV,
},
transports: [
new winston.transports.Console(),
new winston.transports.File({
filename: "logs/error.log",
level: "error",
}),
],
});

// Add trace context to logs
export function logWithTrace(level: string, message: string, meta?: any) {
const span = trace.getActiveSpan();
const spanContext = span?.spanContext();

logger.log(level, message, {
...meta,
trace_id: spanContext?.traceId,
span_id: spanContext?.spanId,
});
}

Ruby Structured Logging

# lib/observability/logger.rb
require 'logger'
require 'json'

module Observability
class StructuredLogger
def initialize
@logger = Logger.new(STDOUT)
@logger.formatter = proc do |severity, datetime, progname, msg|
log_entry = {
timestamp: datetime.iso8601,
level: severity.downcase,
message: msg.is_a?(String) ? msg : msg[:message],
service: ENV['SERVICE_NAME'] || 'app'
}

# Add trace context
span = OpenTelemetry::Trace.current_span
if span
log_entry[:trace_id] = span.context.hex_trace_id
log_entry[:span_id] = span.context.hex_span_id
end

log_entry.merge!(msg) if msg.is_a?(Hash)
"#{log_entry.to_json}\n"
end
end

def info(message, **fields)
@logger.info({ message: message }.merge(fields))
end
end
end

Correlation IDs

// JavaScript Express middleware
app.use((req, res, next) => {
req.correlationId = req.headers["x-correlation-id"] || uuidv4();
res.setHeader("x-correlation-id", req.correlationId);
next();
});

// Publish with correlation ID
await NatsPubsub.publish("order.created", orderData, {
correlation_id: req.correlationId,
});

Metrics Collection

Application Metrics

import { metrics } from "@opentelemetry/api";

const meter = metrics.getMeter("order-service");

const orderCounter = meter.createCounter("orders.created", {
description: "Total orders created",
});

orderCounter.add(1, { payment_method: "credit_card" });

Business Metrics

const revenueCounter = meter.createCounter("revenue.total", {
description: "Total revenue",
unit: "USD",
});

revenueCounter.add(order.total, {
product_category: order.category,
});

Visualization Platforms

Jaeger

# docker-compose.yml
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "4318:4318" # OTLP HTTP

Honeycomb

const traceExporter = new OTLPTraceExporter({
url: "https://api.honeycomb.io/v1/traces",
headers: {
"x-honeycomb-team": process.env.HONEYCOMB_API_KEY,
},
});

Best Practices

1. Sampling Strategies

import { TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base";

const sdk = new NodeSDK({
sampler: new TraceIdRatioBasedSampler(0.1), // 10% sampling
});

2. Structured Logging

// Good
logger.info("Order created", { order_id: "ORD-123", total: 99.99 });

// Bad
logger.info(`Order ORD-123 created with total 99.99`);

3. Context Propagation

await context.with(context.active(), async () => {
await publishMessage();
});

Troubleshooting

Missing Traces

Check exporter configuration and SDK initialization:

sdk
.start()
.then(() => console.log("OpenTelemetry started"))
.catch((error) => console.error("Failed:", error));


Navigation: