Rails Integration Guide
This comprehensive guide covers everything you need to integrate NatsPubsub into your Ruby on Rails application, from initial setup through production deployment.
Table of Contents
- Overview
- Prerequisites
- Installation
- Configuration
- Generators
- Publishing from Rails
- ActiveRecord Integration
- Creating Subscribers
- Background Jobs Integration
- Testing in Rails
- Web UI
- Database Setup
- Running in Production
- Best Practices
- Troubleshooting
- Example Application
Overview
NatsPubsub integrates seamlessly with Rails through a Railtie that provides:
- Automatic Configuration Loading: Config files in
config/initializers/nats_pubsub.rb - Rails Generators: Scaffolding for subscribers, migrations, and configuration
- ActiveRecord Integration: Publish model events with callbacks
- Web UI: Sinatra-based monitoring interface
- Background Job Support: Works with Sidekiq, Resque, and ActiveJob
- Testing Helpers: RSpec matchers and test modes
- CLI Integration: Rake tasks and executable for running subscribers
Architecture
Prerequisites
Before integrating NatsPubsub with Rails:
- Ruby: 3.2 or higher
- Rails: 6.0 or higher (7.0+ recommended)
- Database: PostgreSQL (required for Inbox/Outbox)
- NATS Server: With JetStream enabled
- Redis (optional): For Sidekiq/background jobs
Installation
Step 1: Add to Gemfile
# Gemfile
gem 'nats_pubsub', '~> 0.1'
Step 2: Install
bundle install
Step 3: Run Install Generator
rails generate nats_pubsub:install
This creates:
config/initializers/nats_pubsub.rb- Configuration fileapp/subscribers/- Directory for subscribers.env.example- Environment variable template
Step 4: Configure Environment
# Copy and configure environment variables
cp .env.example .env
Edit .env:
NATS_URLS=nats://localhost:4222
APP_NAME=my-rails-app
RAILS_ENV=development
CONCURRENCY=10
Step 5: Start NATS Server
# Using Docker
docker run -d -p 4222:4222 --name nats-server nats:latest -js
# Or using Homebrew (macOS)
brew install nats-server
nats-server -js
Configuration
Basic Configuration
# config/initializers/nats_pubsub.rb
NatsPubsub.configure do |config|
# Connection settings
config.servers = ENV.fetch('NATS_URLS', 'nats://localhost:4222')
config.env = Rails.env
config.app_name = ENV.fetch('APP_NAME', 'myapp')
# Consumer settings
config.concurrency = ENV.fetch('CONCURRENCY', 10).to_i
config.max_deliver = 5
config.ack_wait = 30_000 # 30 seconds
config.backoff = [1_000, 5_000, 15_000, 30_000, 60_000]
# Features
config.use_dlq = true
config.use_inbox = false
config.use_outbox = false
# Logging
config.logger = Rails.logger
end
Advanced Configuration
# config/initializers/nats_pubsub.rb
NatsPubsub.configure do |config|
config.servers = ENV.fetch('NATS_URLS', 'nats://localhost:4222')
config.env = Rails.env
config.app_name = ENV.fetch('APP_NAME', 'myapp')
# Authentication (if using NATS with auth)
config.user = ENV['NATS_USER']
config.password = ENV['NATS_PASSWORD']
# TLS Configuration
if Rails.env.production?
config.tls = {
ca_file: Rails.root.join('config', 'certs', 'ca.pem').to_s,
cert_file: Rails.root.join('config', 'certs', 'client-cert.pem').to_s,
key_file: Rails.root.join('config', 'certs', 'client-key.pem').to_s
}
end
# Stream Configuration
config.stream_config = {
max_msgs: 1_000_000,
max_bytes: 10 * 1024 * 1024 * 1024, # 10GB
max_age: 7 * 24 * 60 * 60 * 1_000_000_000, # 7 days in nanoseconds
storage: 'file', # or 'memory'
replicas: Rails.env.production? ? 3 : 1
}
# Inbox/Outbox Configuration
config.use_inbox = Rails.env.production?
config.use_outbox = Rails.env.production?
config.outbox = {
batch_size: 100,
poll_interval: 5_000, # 5 seconds
stale_timeout: 5.minutes,
retention_period: 7.days
}
config.inbox = {
stale_timeout: 5.minutes,
retention_period: 30.days
}
# Middleware
config.middleware.use NatsPubsub::Middleware::StructuredLogging
config.middleware.use CustomMetricsMiddleware
config.middleware.use ErrorReportingMiddleware
# Custom logger
config.logger = ActiveSupport::Logger.new(Rails.root.join('log', 'nats_pubsub.log'))
config.logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG
end
Environment-Specific Configuration
# config/initializers/nats_pubsub.rb
NatsPubsub.configure do |config|
config.servers = ENV.fetch('NATS_URLS', 'nats://localhost:4222')
config.env = Rails.env
config.app_name = ENV.fetch('APP_NAME', 'myapp')
case Rails.env
when 'development'
config.concurrency = 2
config.use_inbox = false
config.use_outbox = false
config.logger.level = Logger::DEBUG
when 'test'
config.concurrency = 1
config.use_inbox = false
config.use_outbox = false
when 'staging'
config.concurrency = 5
config.use_inbox = true
config.use_outbox = true
config.max_deliver = 3
when 'production'
config.concurrency = 20
config.use_inbox = true
config.use_outbox = true
config.max_deliver = 5
config.backoff = [1_000, 5_000, 15_000, 30_000, 60_000, 120_000]
end
end
Generators
NatsPubsub provides several Rails generators to scaffold common patterns.
Install Generator
Generates initial setup files.
rails generate nats_pubsub:install
Created Files:
config/initializers/nats_pubsub.rb- Configurationapp/subscribers/.keep- Subscribers directory.env.example- Environment template
Subscriber Generator
Generate a new subscriber class.
rails generate nats_pubsub:subscriber User created
This creates:
# app/subscribers/user_created_subscriber.rb
class UserCreatedSubscriber < NatsPubsub::Subscriber
subscribe_to "user.created"
def handle(message, context)
# Process user.created event
Rails.logger.info "Processing user.created: #{message.inspect}"
# Your business logic here
end
end
With Namespace:
rails generate nats_pubsub:subscriber Notifications::Email sent
Creates:
# app/subscribers/notifications/email_sent_subscriber.rb
module Notifications
class EmailSentSubscriber < NatsPubsub::Subscriber
subscribe_to "notifications.email.sent"
def handle(message, context)
# Process notifications.email.sent event
end
end
end
Migration Generator
Generate Inbox and Outbox tables.
rails generate nats_pubsub:migrations
Creates:
# db/migrate/20250117000000_create_nats_pubsub_inbox.rb
class CreateNatsPubsubInbox < ActiveRecord::Migration[7.0]
def change
create_table :nats_pubsub_inbox, id: false, primary_key: :event_id do |t|
t.string :event_id, null: false, limit: 255
t.string :subject, null: false, limit: 500
t.text :payload, null: false
t.text :headers
t.string :stream, limit: 255
t.bigint :stream_seq
t.string :status, null: false, default: 'processing', limit: 50
t.timestamp :received_at, null: false, default: -> { 'CURRENT_TIMESTAMP' }
t.timestamp :processed_at
t.integer :deliveries, default: 1
t.text :error_message
t.timestamps
end
add_index :nats_pubsub_inbox, :event_id, unique: true
add_index :nats_pubsub_inbox, [:stream, :stream_seq],
unique: true,
name: 'idx_inbox_stream_seq',
where: 'stream IS NOT NULL'
add_index :nats_pubsub_inbox, [:status, :processed_at]
add_index :nats_pubsub_inbox, :received_at
end
end
# db/migrate/20250117000001_create_nats_pubsub_outbox.rb
class CreateNatsPubsubOutbox < ActiveRecord::Migration[7.0]
def change
create_table :nats_pubsub_outbox, id: false, primary_key: :event_id do |t|
t.string :event_id, null: false, limit: 255
t.string :subject, null: false, limit: 500
t.text :payload, null: false
t.text :headers
t.string :status, null: false, default: 'pending', limit: 50
t.timestamp :enqueued_at, null: false, default: -> { 'CURRENT_TIMESTAMP' }
t.timestamp :sent_at
t.text :error_message
t.timestamps
end
add_index :nats_pubsub_outbox, :event_id, unique: true
add_index :nats_pubsub_outbox, [:status, :enqueued_at]
add_index :nats_pubsub_outbox, :sent_at
end
end
Run migrations:
rails db:migrate
Config Generator
Generate a custom configuration preset.
rails generate nats_pubsub:config production
Creates:
# config/nats_pubsub/production.rb
NatsPubsub::ConfigPresets.define :production do
concurrency 20
max_deliver 5
use_inbox true
use_outbox true
backoff [1_000, 5_000, 15_000, 30_000, 60_000]
end
Publishing from Rails
Publishing from Controllers
# app/controllers/users_controller.rb
class UsersController < ApplicationController
def create
@user = User.new(user_params)
if @user.save
# Publish user.created event
NatsPubsub.publish(
topic: 'user.created',
message: {
id: @user.id,
email: @user.email,
name: @user.name,
created_at: @user.created_at
},
trace_id: request.request_id
)
render json: @user, status: :created
else
render json: { errors: @user.errors }, status: :unprocessable_entity
end
end
def update
if @user.update(user_params)
# Publish user.updated event
NatsPubsub.publish(
topic: 'user.updated',
message: {
id: @user.id,
changes: @user.previous_changes
},
trace_id: request.request_id
)
render json: @user
else
render json: { errors: @user.errors }, status: :unprocessable_entity
end
end
private
def user_params
params.require(:user).permit(:email, :name, :password)
end
end
Publishing from Models
Using Callbacks
# app/models/order.rb
class Order < ApplicationRecord
belongs_to :user
has_many :order_items
after_create :publish_created_event
after_update :publish_updated_event, if: :saved_change_to_status?
after_destroy :publish_cancelled_event
private
def publish_created_event
NatsPubsub.publish(
topic: 'order.created',
message: {
id: id,
user_id: user_id,
total: total,
status: status,
items: order_items.map { |item|
{ product_id: item.product_id, quantity: item.quantity, price: item.price }
}
}
)
end
def publish_updated_event
NatsPubsub.publish(
topic: 'order.updated',
message: {
id: id,
status: status,
previous_status: status_before_last_save
}
)
end
def publish_cancelled_event
NatsPubsub.publish(
topic: 'order.cancelled',
message: {
id: id,
user_id: user_id,
cancelled_at: Time.current
}
)
end
end
Publishing from Services
# app/services/order_processing_service.rb
class OrderProcessingService
def initialize(order)
@order = order
end
def process
ActiveRecord::Base.transaction do
# Update inventory
@order.order_items.each do |item|
item.product.decrement!(:stock, item.quantity)
end
# Update order status
@order.update!(status: 'processing')
# Publish event (with outbox, this is transactional)
NatsPubsub.publish(
topic: 'order.processing_started',
message: {
order_id: @order.id,
user_id: @order.user_id,
total: @order.total,
started_at: Time.current
}
)
end
# Process payment
payment_result = PaymentGateway.charge(@order)
if payment_result.success?
@order.update!(status: 'paid')
NatsPubsub.publish(
topic: 'order.payment_completed',
message: {
order_id: @order.id,
payment_id: payment_result.id,
amount: @order.total
}
)
else
handle_payment_failure(payment_result)
end
rescue StandardError => e
handle_error(e)
end
private
def handle_payment_failure(result)
@order.update!(status: 'payment_failed')
NatsPubsub.publish(
topic: 'order.payment_failed',
message: {
order_id: @order.id,
reason: result.error_message
}
)
end
def handle_error(error)
Rails.logger.error("Order processing failed: #{error.message}")
NatsPubsub.publish(
topic: 'order.processing_failed',
message: {
order_id: @order.id,
error: error.message
}
)
raise
end
end
Publishing from Background Jobs
# app/jobs/send_welcome_email_job.rb
class SendWelcomeEmailJob < ApplicationJob
queue_as :default
def perform(user_id)
user = User.find(user_id)
# Send email
UserMailer.welcome_email(user).deliver_now
# Publish event
NatsPubsub.publish(
topic: 'notification.email.sent',
message: {
user_id: user.id,
email_type: 'welcome',
sent_at: Time.current
}
)
end
end
ActiveRecord Integration
Publishable Concern
Create a reusable concern for publishable models:
# app/models/concerns/publishable.rb
module Publishable
extend ActiveSupport::Concern
included do
after_create :publish_created_event
after_update :publish_updated_event
after_destroy :publish_destroyed_event
end
private
def publish_created_event
publish_event('created', as_event_payload)
end
def publish_updated_event
return unless saved_changes.any?
publish_event('updated', as_event_payload.merge(
changes: saved_changes.except('updated_at')
))
end
def publish_destroyed_event
publish_event('destroyed', { id: id })
end
def publish_event(action, payload)
topic = "#{model_name.singular}.#{action}"
NatsPubsub.publish(
topic: topic,
message: payload,
event_id: "#{model_name.singular}-#{id}-#{action}-#{Time.current.to_i}"
)
end
def as_event_payload
# Override in model to customize payload
attributes.except('created_at', 'updated_at')
end
end
Use in models:
# app/models/product.rb
class Product < ApplicationRecord
include Publishable
# Override to customize event payload
def as_event_payload
{
id: id,
name: name,
price: price,
sku: sku,
available: in_stock?
}
end
end
Model Callbacks
Selective Publishing
# app/models/user.rb
class User < ApplicationRecord
after_create :publish_welcome_event
after_update :publish_profile_updated_event, if: :profile_changed?
after_update :publish_email_verified_event, if: :saved_change_to_email_verified_at?
private
def publish_welcome_event
NatsPubsub.publish(
topic: 'user.welcome',
message: {
id: id,
email: email,
name: name
}
)
end
def publish_profile_updated_event
NatsPubsub.publish(
topic: 'user.profile_updated',
message: {
id: id,
changes: previous_changes.slice('name', 'bio', 'avatar_url')
}
)
end
def publish_email_verified_event
NatsPubsub.publish(
topic: 'user.email_verified',
message: {
id: id,
email: email,
verified_at: email_verified_at
}
)
end
def profile_changed?
saved_change_to_name? || saved_change_to_bio? || saved_change_to_avatar_url?
end
end
Transactional Publishing
With Outbox enabled, publishing is automatically transactional:
# app/models/payment.rb
class Payment < ApplicationRecord
belongs_to :order
def process!
ActiveRecord::Base.transaction do
# Update payment status
update!(status: 'completed', completed_at: Time.current)
# Update order
order.update!(status: 'paid')
# Publish event (stored in outbox table in same transaction)
NatsPubsub.publish(
topic: 'payment.completed',
message: {
payment_id: id,
order_id: order_id,
amount: amount,
completed_at: completed_at
}
)
# If transaction rolls back, event is not published
end
end
end
Creating Subscribers
Basic Subscriber
# app/subscribers/order_created_subscriber.rb
class OrderCreatedSubscriber < NatsPubsub::Subscriber
subscribe_to "order.created"
def handle(message, context)
order_id = message['id']
# Process order
OrderProcessingService.new(order_id).call
Rails.logger.info "Processed order: #{order_id}"
end
end
Wildcard Subscriptions
Single-Level Wildcard (*)
# app/subscribers/user_events_subscriber.rb
class UserEventsSubscriber < NatsPubsub::Subscriber
subscribe_to "user.*" # Matches user.created, user.updated, etc.
def handle(message, context)
action = context.topic.split('.').last # created, updated, destroyed
case action
when 'created'
handle_user_created(message)
when 'updated'
handle_user_updated(message)
when 'destroyed'
handle_user_destroyed(message)
end
end
private
def handle_user_created(message)
# Send welcome email
UserMailer.welcome_email(message['email']).deliver_later
end
def handle_user_updated(message)
# Update search index
SearchIndexJob.perform_later('User', message['id'])
end
def handle_user_destroyed(message)
# Cleanup user data
DataCleanupJob.perform_later('User', message['id'])
end
end
Multi-Level Wildcard (>)
# app/subscribers/audit_log_subscriber.rb
class AuditLogSubscriber < NatsPubsub::Subscriber
subscribe_to ">" # Matches ALL events from this app
def handle(message, context)
AuditLog.create!(
event_type: context.topic,
event_id: context.event_id,
subject: context.subject,
payload: message,
occurred_at: context.occurred_at,
trace_id: context.trace_id
)
end
end
Error Handling in Subscribers
# app/subscribers/payment_processor_subscriber.rb
class PaymentProcessorSubscriber < NatsPubsub::Subscriber
subscribe_to "payment.pending"
def handle(message, context)
payment_id = message['payment_id']
begin
payment = Payment.find(payment_id)
# Process payment
result = PaymentGateway.charge(payment)
if result.success?
payment.update!(
status: 'completed',
transaction_id: result.transaction_id
)
# Publish success event
NatsPubsub.publish(
topic: 'payment.completed',
message: {
payment_id: payment.id,
transaction_id: result.transaction_id
}
)
else
handle_payment_failure(payment, result)
end
rescue Payment::GatewayError => e
# Retryable error - let it retry
Rails.logger.warn "Payment gateway error: #{e.message}"
raise # Will retry with backoff
rescue ActiveRecord::RecordNotFound => e
# Non-retryable error - don't retry
Rails.logger.error "Payment not found: #{payment_id}"
# Don't raise - message will be ACKed and not retried
rescue StandardError => e
# Unexpected error - log and send to error tracking
Rails.logger.error "Unexpected error processing payment: #{e.message}"
Sentry.capture_exception(e, extra: { payment_id: payment_id })
raise # Will retry
end
end
private
def handle_payment_failure(payment, result)
payment.update!(
status: 'failed',
error_message: result.error_message
)
NatsPubsub.publish(
topic: 'payment.failed',
message: {
payment_id: payment.id,
reason: result.error_message
}
)
end
end
Middleware Integration
# app/subscribers/order_subscriber.rb
class OrderSubscriber < NatsPubsub::Subscriber
subscribe_to "order.*"
# Use middleware for this subscriber
use TimingMiddleware
use ValidationMiddleware
def handle(message, context)
# Your logic here
end
end
# app/middleware/timing_middleware.rb
class TimingMiddleware
def call(message, context, next_middleware)
start_time = Time.current
next_middleware.call
duration = Time.current - start_time
Rails.logger.info "Subscriber took #{duration}s to process"
if duration > 5.0
SlackNotifier.warn("Slow subscriber: #{context.subscriber_class} took #{duration}s")
end
end
end
Background Jobs Integration
Sidekiq Integration
Setup
# Gemfile
gem 'sidekiq'
gem 'redis'
# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
config.redis = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') }
end
Sidekiq.configure_client do |config|
config.redis = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') }
end
Publishing from Sidekiq Jobs
# app/workers/order_fulfillment_worker.rb
class OrderFulfillmentWorker
include Sidekiq::Worker
sidekiq_options retry: 3, queue: :critical
def perform(order_id)
order = Order.find(order_id)
# Process fulfillment
fulfillment = FulfillmentService.new(order).process
# Publish event
NatsPubsub.publish(
topic: 'order.fulfilled',
message: {
order_id: order.id,
tracking_number: fulfillment.tracking_number,
carrier: fulfillment.carrier,
fulfilled_at: Time.current
}
)
end
end
Triggering Sidekiq from Subscribers
# app/subscribers/order_paid_subscriber.rb
class OrderPaidSubscriber < NatsPubsub::Subscriber
subscribe_to "order.paid"
def handle(message, context)
order_id = message['order_id']
# Enqueue Sidekiq job for processing
OrderFulfillmentWorker.perform_async(order_id)
end
end
ActiveJob Integration
# config/application.rb
class Application < Rails::Application
config.active_job.queue_adapter = :sidekiq
end
# app/jobs/send_notification_job.rb
class SendNotificationJob < ApplicationJob
queue_as :notifications
def perform(user_id, notification_type, data)
user = User.find(user_id)
# Send notification
NotificationService.send(user, notification_type, data)
# Publish event
NatsPubsub.publish(
topic: "notification.#{notification_type}.sent",
message: {
user_id: user.id,
notification_type: notification_type,
sent_at: Time.current
}
)
end
end
# app/subscribers/user_created_subscriber.rb
class UserCreatedSubscriber < NatsPubsub::Subscriber
subscribe_to "user.created"
def handle(message, context)
user_id = message['id']
# Trigger ActiveJob
SendNotificationJob.perform_later(user_id, 'welcome', { name: message['name'] })
end
end
Outbox Worker
Run outbox worker with Sidekiq:
# app/workers/outbox_publisher_worker.rb
class OutboxPublisherWorker
include Sidekiq::Worker
sidekiq_options queue: :outbox, retry: false
def perform
repository = NatsPubsub::Publisher::OutboxRepository.new
publisher = NatsPubsub::Publisher::OutboxPublisher.new(repository)
# Process pending events
results = publisher.publish_pending(limit: 100) do |event|
# Extract topic from subject
topic = event.subject.split('.')[2..-1].join('.')
# Publish to NATS
NatsPubsub.publish(
topic: topic,
message: JSON.parse(event.payload)['message'],
headers: event.headers ? JSON.parse(event.headers) : {}
)
end
Rails.logger.info "Published #{results.length} outbox events"
# Reschedule if there are more pending
OutboxPublisherWorker.perform_in(5.seconds) if results.length > 0
end
end
# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
# Start outbox worker on Sidekiq startup
config.on(:startup) do
OutboxPublisherWorker.perform_async
end
end
# Or use sidekiq-cron
# config/schedule.yml
outbox_publisher:
cron: "*/5 * * * * *" # Every 5 seconds
class: "OutboxPublisherWorker"
Testing in Rails
RSpec Setup
# spec/rails_helper.rb
require 'nats_pubsub/testing'
RSpec.configure do |config|
# Enable test mode (fake by default)
config.before(:suite) do
NatsPubsub::Testing.fake!
end
# Clear published messages between tests
config.before(:each) do
NatsPubsub::Testing.reset!
end
# Include helpers
config.include NatsPubsub::Testing::Helpers
end
Testing Publishers
Testing Controller Publishing
# spec/controllers/users_controller_spec.rb
require 'rails_helper'
RSpec.describe UsersController, type: :controller do
describe 'POST #create' do
let(:valid_params) do
{ user: { email: 'test@example.com', name: 'Test User', password: 'password123' } }
end
it 'publishes user.created event' do
post :create, params: valid_params
expect(NatsPubsub).to have_published_event('user.created')
.with_message(hash_including(
email: 'test@example.com',
name: 'Test User'
))
end
it 'includes trace_id in event' do
request.headers['X-Request-ID'] = 'trace-123'
post :create, params: valid_params
expect(NatsPubsub).to have_published_event('user.created')
.with_metadata(hash_including(trace_id: 'trace-123'))
end
end
end
Testing Model Publishing
# spec/models/order_spec.rb
require 'rails_helper'
RSpec.describe Order, type: :model do
describe 'callbacks' do
it 'publishes order.created event on create' do
order = create(:order)
expect(NatsPubsub).to have_published_event('order.created')
.with_message(hash_including(
id: order.id,
user_id: order.user_id,
total: order.total
))
end
it 'publishes order.updated event on status change' do
order = create(:order, status: 'pending')
NatsPubsub::Testing.reset!
order.update!(status: 'paid')
expect(NatsPubsub).to have_published_event('order.updated')
.with_message(hash_including(
id: order.id,
status: 'paid',
previous_status: 'pending'
))
end
it 'does not publish on non-status updates' do
order = create(:order)
NatsPubsub::Testing.reset!
order.update!(notes: 'Updated notes')
expect(NatsPubsub).not_to have_published_event('order.updated')
end
end
end
Testing Subscribers
# spec/subscribers/order_created_subscriber_spec.rb
require 'rails_helper'
RSpec.describe OrderCreatedSubscriber do
let(:subscriber) { described_class.new }
let(:message) do
{
'id' => 123,
'user_id' => 456,
'total' => 99.99,
'status' => 'pending'
}
end
let(:context) do
OpenStruct.new(
event_id: 'evt-123',
subject: 'production.myapp.order.created',
topic: 'order.created',
occurred_at: Time.current
)
end
describe '#handle' do
it 'processes the order' do
expect(OrderProcessingService).to receive(:new).with(123).and_call_original
expect_any_instance_of(OrderProcessingService).to receive(:call)
subscriber.handle(message, context)
end
it 'logs the processing' do
allow(OrderProcessingService).to receive_message_chain(:new, :call)
expect(Rails.logger).to receive(:info).with('Processed order: 123')
subscriber.handle(message, context)
end
context 'when order not found' do
before do
allow(OrderProcessingService).to receive(:new).and_raise(ActiveRecord::RecordNotFound)
end
it 'logs the error and does not raise' do
expect(Rails.logger).to receive(:error).with(/Order not found/)
expect { subscriber.handle(message, context) }.not_to raise_error
end
end
end
end
Integration Tests
# spec/integration/order_workflow_spec.rb
require 'rails_helper'
RSpec.describe 'Order Workflow', type: :integration do
# Use inline mode for integration tests
before { NatsPubsub::Testing.inline! }
after { NatsPubsub::Testing.fake! }
it 'completes full order workflow' do
user = create(:user)
# Create order
order = create(:order, user: user, status: 'pending')
# Should have published order.created and triggered fulfillment
expect(OrderFulfillmentWorker).to have_enqueued_sidekiq_job(order.id)
# Process payment
order.update!(status: 'paid')
# Should have published order.updated
expect(NatsPubsub).to have_published_event('order.updated')
.with_message(hash_including(status: 'paid'))
# Fulfill order
perform_enqueued_jobs
# Should have published order.fulfilled
expect(NatsPubsub).to have_published_event('order.fulfilled')
end
end
Web UI
Mounting the Web UI
The Web UI provides monitoring for Inbox and Outbox events.
# config/routes.rb
Rails.application.routes.draw do
# Mount NatsPubsub Web UI
mount NatsPubsub::Web => '/nats_pubsub'
# Your other routes...
end
Access at: http://localhost:3000/nats_pubsub
Authentication
Basic Authentication
# config/initializers/nats_pubsub.rb
NatsPubsub::Web.use(Rack::Auth::Basic) do |username, password|
username == ENV['NATS_UI_USERNAME'] && password == ENV['NATS_UI_PASSWORD']
end
Devise Authentication
# config/routes.rb
authenticate :user, ->(user) { user.admin? } do
mount NatsPubsub::Web => '/nats_pubsub'
end
Custom Authentication
# lib/nats_pubsub_auth.rb
class NatsPubsubAuth
def initialize(app)
@app = app
end
def call(env)
request = Rack::Request.new(env)
session = request.session
if session[:user_id] && User.find_by(id: session[:user_id])&.admin?
@app.call(env)
else
[401, { 'Content-Type' => 'text/html' }, ['Unauthorized']]
end
end
end
# config/initializers/nats_pubsub.rb
NatsPubsub::Web.use NatsPubsubAuth
Customizing the UI
# config/initializers/nats_pubsub.rb
NatsPubsub::Web.configure do |config|
# Set page title
config.title = 'My App - Event Monitor'
# Set number of items per page
config.per_page = 50
# Enable/disable features
config.enable_retry = true
config.enable_delete = Rails.env.development?
end
Database Setup
Inbox/Outbox Tables
Generate and run migrations:
rails generate nats_pubsub:migrations
rails db:migrate
Running Migrations
Development
rails db:migrate
Production
# Run on deployment
bundle exec rails db:migrate
# Or with Capistrano
cap production deploy:migrate
Rollback
rails db:rollback STEP=2
Running in Production
Production Configuration
# config/initializers/nats_pubsub.rb
NatsPubsub.configure do |config|
config.servers = ENV.fetch('NATS_URLS')
config.env = 'production'
config.app_name = ENV.fetch('APP_NAME')
# Enable reliability patterns
config.use_inbox = true
config.use_outbox = true
config.use_dlq = true
# Scale for production
config.concurrency = 20
config.max_deliver = 5
# TLS for security
config.tls = {
ca_file: '/app/certs/ca.pem',
cert_file: '/app/certs/client-cert.pem',
key_file: '/app/certs/client-key.pem'
}
# Structured logging
config.logger = Logger.new(STDOUT)
config.logger.level = Logger::INFO
config.logger.formatter = proc do |severity, datetime, progname, msg|
{
timestamp: datetime.iso8601,
severity: severity,
progname: progname,
message: msg,
app: config.app_name,
env: config.env
}.to_json + "\n"
end
end
Deployment
With Capistrano
# Capfile
require 'capistrano/bundler'
require 'capistrano/rails/migrations'
# config/deploy.rb
set :application, 'myapp'
set :repo_url, 'git@github.com:myorg/myapp.git'
# Start subscriber process after deployment
namespace :deploy do
after :publishing, :restart_subscribers do
on roles(:app) do
execute :sudo, :systemctl, :restart, 'nats-subscribers'
end
end
end
Systemd Service
# /etc/systemd/system/nats-subscribers.service
[Unit]
Description=NatsPubsub Subscriber Process
After=network.target
[Service]
Type=simple
User=deploy
WorkingDirectory=/var/www/myapp/current
Environment=RAILS_ENV=production
ExecStart=/usr/local/bin/bundle exec nats_pubsub
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
Start service:
sudo systemctl enable nats-subscribers
sudo systemctl start nats-subscribers
sudo systemctl status nats-subscribers
Docker
# Dockerfile
FROM ruby:3.2
WORKDIR /app
COPY Gemfile Gemfile.lock ./
RUN bundle install
COPY . .
# Compile assets
RUN bundle exec rails assets:precompile
# Run subscribers
CMD ["bundle", "exec", "nats_pubsub"]
# docker-compose.yml
version: "3.8"
services:
web:
build: .
command: bundle exec rails server -b 0.0.0.0
ports:
- "3000:3000"
environment:
- NATS_URLS=nats://nats:4222
- DATABASE_URL=postgresql://postgres:password@db:5432/myapp
depends_on:
- db
- nats
subscribers:
build: .
command: bundle exec nats_pubsub
environment:
- NATS_URLS=nats://nats:4222
- DATABASE_URL=postgresql://postgres:password@db:5432/myapp
- CONCURRENCY=10
depends_on:
- db
- nats
outbox_worker:
build: .
command: bundle exec rake nats_pubsub:outbox:worker
environment:
- NATS_URLS=nats://nats:4222
- DATABASE_URL=postgresql://postgres:password@db:5432/myapp
depends_on:
- db
- nats
nats:
image: nats:latest
command: -js
ports:
- "4222:4222"
db:
image: postgres:15
environment:
POSTGRES_PASSWORD: password
POSTGRES_DB: myapp
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Monitoring
Health Checks
# config/routes.rb
Rails.application.routes.draw do
get '/health', to: 'health#check'
end
# app/controllers/health_controller.rb
class HealthController < ApplicationController
def check
checks = {
nats: nats_healthy?,
database: database_healthy?,
outbox: outbox_healthy?
}
status = checks.values.all? ? :ok : :service_unavailable
render json: {
status: status == :ok ? 'healthy' : 'unhealthy',
checks: checks,
timestamp: Time.current
}, status: status
end
private
def nats_healthy?
NatsPubsub.connection.connected?
rescue StandardError
false
end
def database_healthy?
ActiveRecord::Base.connection.active?
rescue StandardError
false
end
def outbox_healthy?
# Check for backlog
pending_count = NatsOutboxEvent.pending.count
pending_count < 1000
rescue StandardError
false
end
end
Metrics Collection
# app/middleware/metrics_middleware.rb
class MetricsMiddleware
def call(message, context, next_middleware)
start_time = Time.current
next_middleware.call
duration = Time.current - start_time
# Send to metrics service (Prometheus, Datadog, etc.)
Metrics.histogram('nats.subscriber.duration', duration, tags: {
topic: context.topic,
subscriber: context.subscriber_class
})
Metrics.increment('nats.subscriber.processed', tags: {
topic: context.topic
})
rescue StandardError => e
Metrics.increment('nats.subscriber.error', tags: {
topic: context.topic,
error: e.class.name
})
raise
end
end
Scaling
Horizontal Scaling
Run multiple subscriber processes:
# docker-compose.yml
services:
subscribers_1:
build: .
command: bundle exec nats_pubsub
environment:
- CONCURRENCY=10
subscribers_2:
build: .
command: bundle exec nats_pubsub
environment:
- CONCURRENCY=10
subscribers_3:
build: .
command: bundle exec nats_pubsub
environment:
- CONCURRENCY=10
Vertical Scaling
Increase concurrency:
# config/initializers/nats_pubsub.rb
NatsPubsub.configure do |config|
config.concurrency = 50 # Increase for more throughput
end
Best Practices
1. Use Meaningful Topic Names
# Good: Clear hierarchy
NatsPubsub.publish(topic: 'order.created', message: data)
NatsPubsub.publish(topic: 'order.payment.completed', message: data)
# Bad: Flat, vague names
NatsPubsub.publish(topic: 'event1', message: data)
2. Keep Message Payloads Small
# Good: IDs and references
NatsPubsub.publish(
topic: 'order.created',
message: {
id: order.id,
user_id: order.user_id,
total: order.total
}
)
# Bad: Entire object graphs
NatsPubsub.publish(
topic: 'order.created',
message: order.as_json(include: [:user, :items, :shipping_address])
)
3. Use Event IDs for Idempotency
# Consistent event IDs enable deduplication
NatsPubsub.publish(
topic: 'order.created',
message: { id: order.id },
event_id: "order-#{order.id}-created"
)
4. Handle Errors Appropriately
class PaymentSubscriber < NatsPubsub::Subscriber
subscribe_to 'payment.pending'
def handle(message, context)
# Retryable errors: raise to retry
raise if transient_error?
# Non-retryable errors: log and don't raise
Rails.logger.error('Invalid payment') if invalid?
end
end
5. Use Transactions with Outbox
ActiveRecord::Base.transaction do
order.update!(status: 'paid')
# Published in same transaction
NatsPubsub.publish(
topic: 'order.paid',
message: { order_id: order.id }
)
end
6. Monitor Metrics
# Track key metrics
Metrics.gauge('nats.outbox.pending', NatsOutboxEvent.pending.count)
Metrics.gauge('nats.inbox.failed', NatsInboxEvent.failed.count)
7. Use Wildcards Strategically
# Good: Specific wildcard
subscribe_to 'order.*' # order.created, order.updated
# Bad: Too broad
subscribe_to '>' # ALL events
8. Test Thoroughly
# Test publishing
expect(NatsPubsub).to have_published_event('order.created')
# Test subscriber behavior
subscriber.handle(message, context)
Troubleshooting
Common Issues
Issue: Subscribers Not Starting
Symptom: bundle exec nats_pubsub exits immediately
Solution:
# Check for syntax errors
bundle exec ruby -c app/subscribers/*.rb
# Check logs
tail -f log/nats_pubsub.log
# Verify NATS connection
nc -zv localhost 4222
Issue: Messages Not Being Published
Symptom: Events not appearing in NATS
Solution:
# Check connection
NatsPubsub.connection.connected?
# Check subject pattern
Rails.logger.info NatsPubsub.config.subject_for('order.created')
# Verify stream exists
# nats stream ls
Issue: Duplicate Processing
Symptom: Messages processed multiple times
Solution:
# Enable Inbox pattern
NatsPubsub.configure do |config|
config.use_inbox = true
end
# Run migrations
rails generate nats_pubsub:migrations
rails db:migrate
Issue: High Database Load
Symptom: Slow queries on inbox/outbox tables
Solution:
-- Add indexes
CREATE INDEX idx_outbox_status ON nats_pubsub_outbox(status, enqueued_at);
CREATE INDEX idx_inbox_status ON nats_pubsub_inbox(status, processed_at);
-- Cleanup old events
DELETE FROM nats_pubsub_outbox WHERE status = 'sent' AND sent_at < NOW() - INTERVAL '7 days';
Example Application
Complete Rails example:
# config/initializers/nats_pubsub.rb
NatsPubsub.configure do |config|
config.servers = ENV.fetch('NATS_URLS', 'nats://localhost:4222')
config.env = Rails.env
config.app_name = 'ecommerce'
config.concurrency = 10
config.use_outbox = Rails.env.production?
config.use_inbox = Rails.env.production?
config.logger = Rails.logger
end
# app/models/order.rb
class Order < ApplicationRecord
belongs_to :user
has_many :order_items
after_create :publish_created_event
def complete_payment!
transaction do
update!(status: 'paid', paid_at: Time.current)
NatsPubsub.publish(
topic: 'order.paid',
message: {
order_id: id,
user_id: user_id,
total: total,
paid_at: paid_at
}
)
end
end
private
def publish_created_event
NatsPubsub.publish(
topic: 'order.created',
message: {
order_id: id,
user_id: user_id,
total: total,
items: order_items.map(&:as_json)
}
)
end
end
# app/subscribers/order_paid_subscriber.rb
class OrderPaidSubscriber < NatsPubsub::Subscriber
subscribe_to 'order.paid'
def handle(message, context)
order = Order.find(message['order_id'])
# Trigger fulfillment
OrderFulfillmentJob.perform_later(order.id)
# Send confirmation email
OrderMailer.payment_confirmation(order).deliver_later
Rails.logger.info "Order #{order.id} payment processed"
end
end
# app/subscribers/order_fulfilled_subscriber.rb
class OrderFulfilledSubscriber < NatsPubsub::Subscriber
subscribe_to 'order.fulfilled'
def handle(message, context)
order = Order.find(message['order_id'])
# Update inventory
InventoryService.decrement_stock(order)
# Send shipping notification
OrderMailer.shipping_notification(order).deliver_later
end
end
Navigation
- Previous: Databases Integration
- Next: Express.js Integration
- Related: