Ruby API Reference
Complete API reference for the NatsPubsub Ruby library.
Table of Contents
- Core Module
- Publisher API
- Subscriber API
- Pool API
- Outbox Pattern API
- Middleware API
- Configuration API
- Models
- Utilities
- ActiveRecord Integration
Core Module
NatsPubsub
The main module for all NatsPubsub functionality.
Module Methods
configure(overrides = {}, &block)
Configure the library with custom settings.
Parameters:
overrides- Hash of configuration overridesblock- Optional block for configuration
Returns: Config - Configuration object
Example:
NatsPubsub.configure do |config|
config.env = 'production'
config.app_name = 'my-service'
config.nats_urls = 'nats://nats.example.com:4222'
config.concurrency = 10
config.use_dlq = true
config.use_outbox = true
config.use_inbox = true
end
config
Get the current configuration.
Returns: Config - Current configuration object
Example:
config = NatsPubsub.config
puts "Environment: #{config.env}"
puts "App Name: #{config.app_name}"
reset!
Reset configuration to defaults.
Example:
NatsPubsub.reset!
use_outbox?
Check if outbox pattern is enabled.
Returns: Boolean
use_inbox?
Check if inbox pattern is enabled.
Returns: Boolean
use_dlq?
Check if dead letter queue is enabled.
Returns: Boolean
ensure_topology!
Establish connection and ensure stream topology.
Returns: JetStream context
Example:
NatsPubsub.ensure_topology!
puts "Topology ensured"
publish(topic:, message:, **options)
Publish a message to a topic.
Parameters:
topic- Topic name (e.g., 'orders.created', 'notifications.email.send')message- Message payload (Hash)options- Additional options (event_id, trace_id, correlation_id, occurred_at, message_type)
Returns: PublishResult - Result object with success status and details
Example:
# Simple publish
result = NatsPubsub.publish(
topic: 'orders.created',
message: { order_id: '123', amount: 99.99 }
)
puts "Published: #{result.event_id}" if result.success?
# With metadata
result = NatsPubsub.publish(
topic: 'orders.created',
message: { order_id: '123' },
trace_id: 'trace-123',
correlation_id: 'corr-456'
)
batch(&block)
Create a batch publisher for publishing multiple messages efficiently.
Yields: FluentBatch - Batch publisher instance
Returns: FluentBatch - Batch publisher instance for chaining
Example:
# Block syntax
result = NatsPubsub.batch do |b|
b.add 'user.created', { id: 1, name: 'Alice' }
b.add 'user.created', { id: 2, name: 'Bob' }
b.with_options trace_id: 'batch-123'
end.publish
# Chaining syntax
result = NatsPubsub.batch
.add('user.created', { id: 1 })
.add('notification.sent', { user_id: 1 })
.publish
setup!(&block)
Configure and initialize in one call.
Yields: Config - Configuration block
Returns: JetStream context
Example:
NatsPubsub.setup! do |config|
config.app_name = 'my_app'
config.nats_urls = ['nats://localhost:4222']
config.env = 'production'
end
setup_with_preset!(preset, &block)
Setup with a configuration preset.
Parameters:
preset- Preset name (:development, :production, :testing)
Yields: Config - Configuration block for customization
Returns: JetStream context
Example:
NatsPubsub.setup_with_preset!(:production) do |config|
config.nats_urls = ENV['NATS_URLS']
config.app_name = 'my-app'
end
connect!
Connect to NATS (idempotent).
Returns: JetStream context
Example:
NatsPubsub.connect!
health_check
Perform comprehensive health check.
Returns: Core::HealthCheck::Result - Health check result
Example:
status = NatsPubsub.health_check
puts "Status: #{status.status}"
puts "Healthy: #{status.healthy?}"
quick_health_check
Perform quick health check (connection only).
Returns: Core::HealthCheck::Result - Health check result
Example:
status = NatsPubsub.quick_health_check
puts "Healthy: #{status.healthy?}"
health_check_middleware
Get health check middleware for Rack apps.
Returns: Proc - Rack middleware
Example:
# Sinatra
get '/health' do
status, headers, body = NatsPubsub.health_check_middleware.call(env)
[status, headers, body]
end
# Rails
get '/health', to: NatsPubsub.health_check_middleware
quick_health_check_middleware
Get quick health check middleware for Rack apps.
Returns: Proc - Rack middleware
Publisher API
Publisher Class
Handles message publishing to NATS JetStream.
Constructor
Publisher.new
Example:
publisher = NatsPubsub::Publisher.new
Methods
publish(*args, **kwargs)
Polymorphic publish method supporting multiple patterns.
Signatures:
- Topic-based (positional):
publish(topic, message, **opts) - Topic-based (keyword):
publish(topic:, message:, **opts) - Domain/resource/action:
publish(domain:, resource:, action:, payload:, **opts) - Multi-topic:
publish(topics:, message:, **opts)
Returns: PublishResult or Hash of results
Examples:
# Topic-based (positional)
result = publisher.publish('orders.created', { order_id: '123' })
# Topic-based (keyword)
result = publisher.publish(
topic: 'orders.created',
message: { order_id: '123' }
)
# Domain/resource/action
result = publisher.publish(
domain: 'orders',
resource: 'order',
action: 'created',
payload: { id: '123' }
)
# Multi-topic
results = publisher.publish(
topics: ['orders.created', 'notifications.sent'],
message: { id: '123' }
)
publish_to_topic(topic, message, **options)
Publish to a single topic (internal method).
Parameters:
topic- Topic namemessage- Message payloadoptions- Additional options
Returns: PublishResult
publish_event(domain, resource, action, payload, **options)
Publish using domain/resource/action pattern (internal method).
Parameters:
domain- Business domainresource- Resource typeaction- Event actionpayload- Event payloadoptions- Additional options
Returns: PublishResult
publish_to_topics(topics, message, **options)
Publish to multiple topics (internal method).
Parameters:
topics- Array of topic namesmessage- Message payloadoptions- Additional options
Returns: Hash - Results keyed by topic
FluentBatch Class
Fluent interface for batch publishing.
Methods
add(topic, message)
Add a message to the batch.
Parameters:
topic- Topic namemessage- Message payload
Returns: self - For chaining
with_options(options)
Set options for all messages in the batch.
Parameters:
options- Publish options
Returns: self - For chaining
publish
Publish all messages in the batch.
Returns: FluentBatch::Result - Result with statistics
Example:
result = NatsPubsub.batch
.add('user.created', { id: 1, name: 'Alice' })
.add('user.created', { id: 2, name: 'Bob' })
.add('notification.sent', { user_id: 1 })
.with_options(trace_id: 'batch-123')
.publish
puts "Success: #{result.success_count}, Failed: #{result.failure_count}"
Subscriber API
Subscriber Module
Include this module in your subscriber classes to handle NATS JetStream messages.
Class Methods
subscribe_to(*topics, **options)
Subscribe to one or more topics.
Parameters:
topics- Topic names to subscribe tooptions- Additional subscription options
Example:
class NotificationSubscriber
include NatsPubsub::Subscriber
subscribe_to 'notifications.email'
# or with multiple topics
subscribe_to 'notifications.email', 'notifications.sms'
# or with wildcards
subscribe_to 'users.user.*'
jetstream_options retry: 3, ack_wait: 60
def handle(message, context)
# Handle the message
puts "Received: #{message}"
puts "Event ID: #{context.event_id}"
puts "Trace ID: #{context.trace_id}"
end
end
subscribe_to_wildcard(topic, **options)
Subscribe to all subtopics using wildcard (>).
Parameters:
topic- Topic nameoptions- Additional subscription options
Example:
class AllNotificationsSubscriber
include NatsPubsub::Subscriber
subscribe_to_wildcard 'notifications' # Subscribes to notifications.>
def handle(message, context)
puts "Notification on topic: #{context.topic}"
end
end
jetstream_options(opts = {})
Configure JetStream-specific options.
Parameters:
opts- Options hash
Options:
:retry- Number of retries (default: 5):ack_wait- ACK wait timeout in seconds (default: 30):max_deliver- Maximum delivery attempts (default: 5):dead_letter- Enable DLQ (default: true):batch_size- Batch size for fetching (default: 25)
Returns: Hash - Merged options
Example:
class EmailSubscriber
include NatsPubsub::Subscriber
subscribe_to 'notifications.email'
jetstream_options retry: 3, ack_wait: 60, max_deliver: 10
def handle(message, context)
send_email(message)
end
end
topic_subscriptions
Get all topic subscriptions.
Returns: Array<Hash> - Array of subscription hashes
all_subscriptions
Get all subscriptions.
Returns: Array<Hash> - Array of all subscription hashes
Instance Methods
handle(message, context)
Override this method to handle messages.
Parameters:
message- Message payload (Hash)context- Message context (Core::MessageContext)
Raises: NotImplementedError - If not overridden
Example:
def handle(message, context)
# Your message processing logic
OrderService.create_order(message)
# Access context information
logger.info "Processed event #{context.event_id}"
logger.info "Trace: #{context.trace_id}"
logger.info "Delivery attempt: #{context.deliveries}"
end
on_error(error_context)
Optional: Override for custom error handling.
Parameters:
error_context- Error context (Core::ErrorContext)
Returns: Symbol - Error action (:retry, :discard, :dlq)
Example:
def on_error(error_context)
case error_context.error
when ValidationError
Core::ErrorAction::DISCARD
when NetworkError
Core::ErrorAction::RETRY
else
Core::ErrorAction::DLQ
end
end
logger
Access to logger.
Returns: Logger - Logger instance
from_topic?(context, topic_name)
Check if message is from a specific topic.
Parameters:
context- Message contexttopic_name- Topic name to check
Returns: Boolean
Example:
def handle(message, context)
if from_topic?(context, 'notifications.email')
send_email(message)
elsif from_topic?(context, 'notifications.sms')
send_sms(message)
end
end
Pool API
Pool Class
Manages concurrent subscriber workers.
Constructor
Pool.new(concurrency: 5)
Parameters:
concurrency- Number of concurrent workers (default: 5)
Methods
start!
Start the worker pool.
Example:
pool = NatsPubsub::Subscribers::Pool.new(concurrency: 10)
pool.start!
stop!
Stop the worker pool gracefully.
Example:
pool.stop!
Outbox Pattern API
OutboxPublisher Class
Handles publishing messages using the Outbox pattern.
Class Methods
publish(subject:, envelope:, event_id:, &block)
Publish using the Outbox pattern.
Parameters:
subject- NATS subjectenvelope- Message envelopeevent_id- Event identifierblock- Block containing actual publish logic
Returns: PublishResult
Example:
OutboxPublisher.publish(
subject: subject,
envelope: envelope,
event_id: event_id
) do
# Actual NATS publish
jts.publish(subject, Oj.dump(envelope), header: headers)
end
OutboxRepository Class
Persistence layer for outbox events.
Methods
find_or_create(params)
Find or create an outbox event.
Parameters:
params- Event parameters
Returns: OutboxEvent - Outbox event model
find_pending(limit: 100)
Find pending events.
Parameters:
limit- Maximum number of events
Returns: Array<OutboxEvent>
mark_as_sent(event_id)
Mark event as sent.
Parameters:
event_id- Event identifier
mark_as_failed(event_id, error)
Mark event as failed.
Parameters:
event_id- Event identifiererror- Error message
cleanup(older_than)
Cleanup old sent events.
Parameters:
older_than- Date threshold
Returns: Integer - Number of deleted events
Middleware API
Middleware::Chain Class
Manages middleware execution chain.
Methods
add(middleware)
Add middleware to the chain.
Parameters:
middleware- Middleware instance
call(message, context, &handler)
Execute the middleware chain.
Parameters:
message- Message payloadcontext- Message contexthandler- Final handler block
Built-in Middleware
Logging Middleware
Logs message processing.
Example:
NatsPubsub.config.server_middleware do |chain|
chain.add NatsPubsub::Middleware::Logging.new
end
StructuredLogging Middleware
Structured logging with context.
Example:
NatsPubsub.config.server_middleware do |chain|
chain.add NatsPubsub::Middleware::StructuredLogging.new
end
ActiveRecord Middleware
ActiveRecord connection management.
Example:
NatsPubsub.config.server_middleware do |chain|
chain.add NatsPubsub::Middleware::ActiveRecord.new
end
Custom Middleware
Example:
class TimingMiddleware
def call(message, context)
start_time = Time.now
yield
duration = Time.now - start_time
puts "Processing took #{duration}s"
end
end
NatsPubsub.config.server_middleware do |chain|
chain.add TimingMiddleware.new
end
Configuration API
Config Class
Configuration object.
Attributes
nats_urls- NATS server URL(s)env- Environment nameapp_name- Application namedestination_app- Destination application namemax_deliver- Maximum delivery attemptsack_wait- Acknowledgment wait timebackoff- Backoff strategyuse_outbox- Enable outbox patternuse_inbox- Enable inbox patterninbox_model- Inbox model class nameoutbox_model- Outbox model class nameuse_dlq- Enable dead letter queuedlq_max_attempts- DLQ max attemptsdlq_stream_suffix- DLQ stream suffixlogger- Logger instanceconcurrency- Concurrency levelconnection_pool_size- Connection pool sizeconnection_pool_timeout- Connection pool timeout
Methods
initialize(preset: nil)
Create a new configuration.
Parameters:
preset- Optional preset name
stream_name
Get stream name for the environment.
Returns: String - Stream name
Example:
config.stream_name
# => "production-events-stream"
event_subject(domain, resource, action)
Build event subject.
Parameters:
domain- Business domainresource- Resource typeaction- Event action
Returns: String - NATS subject
Example:
subject = config.event_subject('orders', 'order', 'created')
# => "production.myapp.orders.order.created"
dlq_subject
Get DLQ subject.
Returns: String - DLQ subject
dlq_stream_name
Get DLQ stream name.
Returns: String - DLQ stream name
durable_name
Get durable consumer name.
Returns: String - Durable name
server_middleware(&block)
Access/configure server middleware.
Yields: Middleware::Chain - Middleware chain
Returns: Middleware::Chain
Example:
config.server_middleware do |chain|
chain.add CustomMiddleware.new
end
apply_preset!(preset_name)
Apply a configuration preset.
Parameters:
preset_name- Preset name (:development, :production, :testing)
Example:
config.apply_preset!(:production)
validate!
Validate configuration. Raises ConfigurationError if invalid.
Example:
begin
config.validate!
rescue NatsPubsub::ConfigurationError => e
puts "Invalid config: #{e.message}"
end
Models
OutboxEvent Model
ActiveRecord model for outbox events.
Attributes
event_id- Event identifier (string)subject- NATS subject (string)payload- Message payload (text)headers- Message headers (text)status- Event status (string: 'pending', 'publishing', 'sent', 'failed')attempts- Number of attempts (integer)last_error- Last error message (text)enqueued_at- Enqueued timestamp (datetime)sent_at- Sent timestamp (datetime)
Scopes
pending- Events with pending statusfailed- Events with failed statussent- Events with sent status
Example:
# Find pending events
pending = NatsPubsub::OutboxEvent.pending.limit(100)
# Find failed events
failed = NatsPubsub::OutboxEvent.failed
# Cleanup old sent events
old_events = NatsPubsub::OutboxEvent.sent.where('sent_at < ?', 7.days.ago)
old_events.delete_all
InboxEvent Model
ActiveRecord model for inbox events.
Attributes
event_id- Event identifier (string)subject- NATS subject (string)payload- Message payload (text)headers- Message headers (text)status- Event status (string: 'pending', 'processing', 'processed', 'failed')deliveries- Number of deliveries (integer)last_error- Last error message (text)received_at- Received timestamp (datetime)processed_at- Processed timestamp (datetime)
Scopes
pending- Events with pending statusfailed- Events with failed statusprocessed- Events with processed status
Example:
# Check if already processed
event = NatsPubsub::InboxEvent.find_by(event_id: 'event-123')
if event&.processed?
puts "Already processed"
end
# Find failed events for retry
failed = NatsPubsub::InboxEvent.failed.limit(10)
Utilities
Subject Class
Subject parsing and building utilities.
Class Methods
from_event(env:, app_name:, domain:, resource:, action:)
Build NATS subject from event components.
Parameters:
env- Environment nameapp_name- Application namedomain- Business domainresource- Resource typeaction- Event action
Returns: Subject - Subject object
Example:
subject = NatsPubsub::Subject.from_event(
env: 'production',
app_name: 'myapp',
domain: 'orders',
resource: 'order',
action: 'created'
)
puts subject.to_s
# => "production.myapp.orders.order.created"
from_topic(env:, app_name:, topic:)
Build NATS subject from topic.
Parameters:
env- Environment nameapp_name- Application nametopic- Topic name
Returns: Subject - Subject object
Example:
subject = NatsPubsub::Subject.from_topic(
env: 'production',
app_name: 'myapp',
topic: 'orders.created'
)
puts subject.to_s
# => "production.myapp.orders.created"
parse(subject_string)
Parse NATS subject string.
Parameters:
subject_string- Subject string
Returns: Subject - Subject object
Example:
subject = NatsPubsub::Subject.parse('production.myapp.orders.created')
puts subject.env # => "production"
puts subject.app_name # => "myapp"
puts subject.topic # => "orders.created"
normalize_topic(name)
Normalize topic name.
Parameters:
name- Topic name
Returns: String - Normalized topic
Duration Module
Duration parsing utilities.
Module Methods
parse(str)
Parse duration string.
Parameters:
str- Duration string (e.g., '30s', '5m', '1h')
Returns: Integer - Duration in milliseconds
Example:
ms = NatsPubsub::Duration.parse('30s') # => 30000
ms = NatsPubsub::Duration.parse('5m') # => 300000
ms = NatsPubsub::Duration.parse('1h') # => 3600000
to_nanos(milliseconds)
Convert milliseconds to nanoseconds.
Parameters:
milliseconds- Duration in milliseconds
Returns: Integer - Duration in nanoseconds
from_nanos(nanoseconds)
Convert nanoseconds to milliseconds.
Parameters:
nanoseconds- Duration in nanoseconds
Returns: Integer - Duration in milliseconds
Error Classes
NatsPubsub::Error- Base error classNatsPubsub::ConfigurationError- Configuration errorsNatsPubsub::ConnectionError- Connection errorsNatsPubsub::PublishError- Publishing errorsNatsPubsub::SubscriptionError- Subscription errors
Example:
begin
NatsPubsub.publish(topic: 'test', message: {})
rescue NatsPubsub::PublishError => e
puts "Publish failed: #{e.message}"
end
ActiveRecord Integration
Publishable Module
Include in ActiveRecord models to enable automatic event publishing.
Class Methods
publishes_event(event_name, if: nil, unless: nil, &block)
Configure automatic event publishing.
Parameters:
event_name- Event name or :auto for automatic namingif- Conditional proc/symbolunless- Negative conditional proc/symbolblock- Block to customize event payload
Example:
class Order < ApplicationRecord
include NatsPubsub::ActiveRecord::Publishable
# Automatic event publishing on create
publishes_event :created, on: :create
# With custom payload
publishes_event :updated, on: :update do |order|
{
id: order.id,
status: order.status,
total: order.total,
updated_at: order.updated_at
}
end
# With conditions
publishes_event :completed, on: :update, if: :status_changed_to_completed?
private
def status_changed_to_completed?
saved_change_to_status? && status == 'completed'
end
end
# Usage
order = Order.create!(total: 99.99)
# Automatically publishes to 'orders.order.created'
order.update!(status: 'shipped')
# Automatically publishes to 'orders.order.updated'
Instance Methods
publish_event(action, payload = nil)
Manually publish an event.
Parameters:
action- Event actionpayload- Optional payload (defaults to model attributes)
Example:
order = Order.find(123)
order.publish_event(:shipped, {
id: order.id,
shipped_at: Time.now
})
Rails Integration
Railtie
Automatic Rails integration.
Rake Tasks
nats_pubsub:install
Install NatsPubsub in Rails app.
rails nats_pubsub:install
nats_pubsub:config
Generate configuration file.
rails generate nats_pubsub:config
nats_pubsub:subscriber NAME
Generate a subscriber.
rails generate nats_pubsub:subscriber EmailNotification
nats_pubsub:migrations
Generate database migrations.
rails generate nats_pubsub:migrations
Generators
Config Generator
rails generate nats_pubsub:config
Creates config/initializers/nats_pubsub.rb
Subscriber Generator
rails generate nats_pubsub:subscriber NotificationSubscriber
Creates app/subscribers/notification_subscriber.rb
Migrations Generator
rails generate nats_pubsub:migrations
Creates migrations for inbox and outbox tables.
Testing Support
Testing Module
Test helpers for NatsPubsub.
Methods
enable_test_mode!
Enable test mode (messages are captured, not published).
Example:
RSpec.configure do |config|
config.before(:each) do
NatsPubsub::Testing.enable_test_mode!
end
config.after(:each) do
NatsPubsub::Testing.reset!
end
end
disable_test_mode!
Disable test mode.
reset!
Clear captured messages.
published_messages
Get captured messages.
Returns: Array - Array of published messages
Example:
NatsPubsub.publish(topic: 'test', message: { id: 1 })
messages = NatsPubsub::Testing.published_messages
expect(messages.length).to eq(1)
expect(messages.first[:topic]).to eq('test')
RSpec Matchers
have_published_message
Matcher for published messages.
Example:
# Basic usage
expect(NatsPubsub).to have_published_message('orders.created')
# With message content
expect(NatsPubsub).to have_published_message('orders.created')
.with_message(order_id: '123')
# With options
expect(NatsPubsub).to have_published_message('orders.created')
.with_options(trace_id: 'trace-123')