Documentation Index Fetch the complete documentation index at: https://mintlify.com/ably/docs/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Database synchronization in LiveSync ensures that changes in your backend database are reliably propagated to all connected clients in realtime. This page covers synchronization patterns, the Models SDK, and best practices for maintaining consistent state across distributed systems.
Synchronization Patterns
Outbox Pattern (Postgres)
The outbox pattern provides transactional guarantees when publishing database changes:
BEGIN ;
-- Update your application data
UPDATE users
SET email = 'newemail@example.com'
WHERE id = 123 ;
-- Write change event to outbox
INSERT INTO outbox (
mutation_id,
channel,
name ,
data
) VALUES (
'mutation-uuid-123' ,
'users:123' ,
'user.updated' ,
jsonb_build_object(
'id' , 123 ,
'email' , 'newemail@example.com'
)
);
COMMIT ;
Benefits:
Transactional consistency between database writes and message publishing
Exactly-once delivery guarantees
Automatic retry on failures
Message ordering preserved per channel
Change Streams (MongoDB)
MongoDB uses Change Streams to watch for document modifications:
// MongoDB automatically detects changes
db . users . updateOne (
{ _id: ObjectId ( '123' ) },
{ $set: { email: 'newemail@example.com' } }
);
// Change stream pipeline routes this to Ably
// No additional code needed!
Benefits:
Native MongoDB integration
Real-time change detection
Full document access with pre/post images
Flexible filtering via aggregation pipelines
Frontend Data Models
The Models SDK provides a high-level abstraction for managing synchronized state in frontend applications.
Installing the Models SDK
npm install ably @ably-labs/models
Creating a Model
A model represents a synchronized data structure in your application:
import ModelsClient from '@ably-labs/models' ;
import { Realtime } from 'ably' ;
// Initialize Ably
const ably = new Realtime ({ key: 'YOUR_API_KEY' });
// Create Models client
const modelsClient = new ModelsClient ({ ably });
// Define a model
const postModel = modelsClient . models . get ({
channelName: 'posts:123' ,
sync : async () => {
// Fetch initial state from backend
const response = await fetch ( '/api/posts/123' );
const { sequenceId , data } = await response . json ();
return { sequenceId , data };
},
merge : ( state , event ) => {
// Merge incoming changes into state
switch ( event . name ) {
case 'comment.added' :
return {
... state ,
comments: [ ... state . comments , event . data ]
};
case 'comment.deleted' :
return {
... state ,
comments: state . comments . filter ( c => c . id !== event . data . id )
};
default :
return state ;
}
}
});
The Sync Function
The sync function fetches the current state from your backend:
async function sync () {
// Query your database within a transaction
const response = await fetch ( '/api/posts/123' );
const result = await response . json ();
return {
// Current version of the data
sequenceId: result . sequenceId ,
// The actual data
data: result . post
};
}
Backend implementation:
app . get ( '/api/posts/:id' , async ( req , res ) => {
const { id } = req . params ;
// Use a transaction for consistency
const result = await db . transaction ( async ( tx ) => {
// Fetch the post data
const post = await tx . query (
'SELECT * FROM posts WHERE id = $1' ,
[ id ]
);
// Get the latest sequence ID
const { sequenceId } = await tx . query (
'SELECT COALESCE(MAX(sequence_id), 0) as sequenceId FROM outbox'
);
return {
sequenceId: sequenceId . toString (),
data: post . rows [ 0 ]
};
});
res . json ( result );
});
The Merge Function
The merge function combines incoming changes with existing state:
function merge ( state , event ) {
// Must be pure and deterministic
// No side effects or external dependencies
const newState = { ... state };
switch ( event . name ) {
case 'task.created' :
newState . tasks . push ( event . data );
break ;
case 'task.updated' :
const index = newState . tasks . findIndex (
t => t . id === event . data . id
);
if ( index !== - 1 ) {
newState . tasks [ index ] = {
... newState . tasks [ index ],
... event . data
};
}
break ;
case 'task.deleted' :
newState . tasks = newState . tasks . filter (
t => t . id !== event . data . id
);
break ;
}
return newState ;
}
Important: The merge function must be:
Pure : Same input always produces same output
Deterministic : No randomness or external state
Side-effect free : No API calls, mutations, or logging
Subscribing to Changes
Subscribe to model updates to react to state changes:
// Subscribe to model changes
postModel . subscribe (( err , state ) => {
if ( err ) {
console . error ( 'Model error:' , err );
return ;
}
// Update UI with new state
updateUI ( state );
});
// Subscribe to confirmed changes only
postModel . subscribe (
( err , state ) => {
updateUI ( state );
},
{ optimistic: false }
);
Optimistic Updates
Optimistic updates provide instant feedback by applying changes locally before server confirmation.
Implementing Optimistic Updates
async function addComment ( postId , commentText ) {
const mutationId = crypto . randomUUID ();
// Apply optimistically
const [ confirmation , cancel ] = await postModel . optimistic ({
mutationId ,
name: 'comment.added' ,
data: {
id: mutationId ,
text: commentText ,
createdAt: new Date (). toISOString (),
status: 'pending'
}
});
try {
// Send to backend
const response = await fetch ( `/api/posts/ ${ postId } /comments` , {
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ({
mutationId ,
text: commentText
})
});
if ( ! response . ok ) {
throw new Error ( 'Failed to add comment' );
}
// Wait for confirmation
await confirmation ;
} catch ( error ) {
// Cancel optimistic update on error
cancel ();
throw error ;
}
}
Backend Confirmation
app . post ( '/api/posts/:id/comments' , async ( req , res ) => {
const { id } = req . params ;
const { mutationId , text } = req . body ;
try {
await db . transaction ( async ( tx ) => {
// Create the comment
const comment = await tx . query (
'INSERT INTO comments (post_id, text) VALUES ($1, $2) RETURNING *' ,
[ id , text ]
);
// Write to outbox with mutation_id
await tx . query ( `
INSERT INTO outbox (mutation_id, channel, name, data)
VALUES ($1, $2, $3, $4)
` , [
mutationId ,
`posts: ${ id } ` ,
'comment.added' ,
JSON . stringify ( comment . rows [ 0 ])
]);
});
res . json ({ success: true });
} catch ( error ) {
// Write rejection to outbox
await db . query ( `
INSERT INTO outbox (mutation_id, channel, name, rejected)
VALUES ($1, $2, $3, true)
` , [ mutationId , `posts: ${ id } ` , 'comment.added' ]);
res . status ( 500 ). json ({ error: error . message });
}
});
Synchronization Strategies
Full State Synchronization
Send complete object state with each update:
{
"name" : "task.updated" ,
"data" : {
"id" : 123 ,
"title" : "Complete project" ,
"status" : "in_progress" ,
"assignee" : "alice" ,
"dueDate" : "2026-03-15"
}
}
Pros:
Simple to implement
No merge logic needed
Always consistent state
Cons:
Higher bandwidth usage
May exceed message size limits
Less efficient for large objects
Delta Synchronization
Send only the changed fields:
{
"name" : "task.updated" ,
"data" : {
"id" : 123 ,
"changes" : {
"status" : "in_progress"
}
}
}
Pros:
Efficient bandwidth usage
Works with large objects
Clear change intent
Cons:
Requires merge logic
More complex implementation
JSON Patch Synchronization
Use standardized JSON Patch format:
{
"name" : "task.updated" ,
"data" : [
{
"op" : "replace" ,
"path" : "/status" ,
"value" : "in_progress"
}
]
}
Pros:
Standardized format
Automatic merge with libraries
Precise change tracking
Cons:
More complex to generate
Requires JSON Patch library
Implementation Example
import { applyPatch } from 'fast-json-patch' ;
function merge ( state , event ) {
if ( event . name . endsWith ( '.patched' )) {
// Apply JSON Patch
const result = applyPatch ( state , event . data );
return result . newDocument ;
}
// Handle other event types
// ...
}
Handling History and Replay
Message History
Retrieve historical changes when a client reconnects:
const channel = ably . channels . get ( 'posts:123' );
// Get last 50 messages
const history = await channel . history ({ limit: 50 });
for ( const message of history . items ) {
// Replay changes
applyChange ( message . data );
}
// Subscribe to live updates
await channel . subscribe (( message ) => {
applyChange ( message . data );
});
Automatic History Replay
The Models SDK automatically handles history replay:
const model = modelsClient . models . get ({
channelName: 'posts:123' ,
sync ,
merge
});
// Model automatically:
// 1. Calls sync() to get initial state
// 2. Rewinds through history to sequenceId
// 3. Replays missed events
// 4. Subscribes to live updates
model . subscribe (( err , state ) => {
// State is always up-to-date
});
Channel Design
Design channels for optimal performance:
// Good: Fine-grained channels
'users:123' // User-specific updates
'workspace:abc:tasks' // Workspace tasks
'post:456:comments' // Post comments
// Avoid: Overly broad channels
'all-users' // Too many subscribers
'global-updates' // Too much traffic
Message Batching
Batch multiple changes into single messages:
await db . transaction ( async ( tx ) => {
// Make multiple changes
await tx . query ( 'UPDATE tasks SET status = $1' , [ 'done' ]);
await tx . query ( 'UPDATE tasks SET completed_at = NOW()' );
// Single outbox entry
await tx . query ( `
INSERT INTO outbox (mutation_id, channel, name, data)
VALUES ($1, $2, $3, $4)
` , [
mutationId ,
'tasks:updates' ,
'tasks.batch-updated' ,
JSON . stringify ({ taskIds: [ 1 , 2 , 3 ], status: 'done' })
]);
});
Event Buffering
Buffer events to reduce merge operations:
const modelsClient = new ModelsClient ({
ably ,
eventBufferOptions: {
bufferMs: 100 , // Buffer events for 100ms
eventOrderer : ( a , b ) => {
// Custom ordering logic
return a . timestamp - b . timestamp ;
}
}
});
Error Handling
Connection Errors
ably . connection . on ( 'failed' , ( stateChange ) => {
console . error ( 'Connection failed:' , stateChange . reason );
// Show offline UI
showOfflineIndicator ();
});
ably . connection . on ( 'connected' , () => {
// Hide offline UI
hideOfflineIndicator ();
});
Sync Errors
const model = modelsClient . models . get ({
channelName: 'posts:123' ,
sync : async () => {
try {
return await fetchInitialState ();
} catch ( error ) {
// Log error and retry
console . error ( 'Sync failed:' , error );
throw error ; // Models SDK will retry
}
},
merge
});
// Listen for model errors
model . on ( 'errored' , ( error ) => {
console . error ( 'Model error:' , error );
showErrorNotification ( 'Failed to sync data' );
});
Optimistic Update Failures
try {
const [ confirmation , cancel ] = await model . optimistic ( event );
await fetch ( '/api/update' , {
method: 'POST' ,
body: JSON . stringify ( event . data )
});
// Wait with timeout
await Promise . race ([
confirmation ,
new Promise (( _ , reject ) =>
setTimeout (() => reject ( new Error ( 'Timeout' )), 5000 )
)
]);
} catch ( error ) {
// Automatically rolls back
console . error ( 'Update failed:' , error );
showErrorNotification ( 'Failed to save changes' );
}
Best Practices
1. Design Clear Channel Hierarchies
// Resource-based
'posts:123'
'posts:123:comments'
'posts:123:reactions'
// User-scoped
'users:alice:notifications'
'users:alice:messages'
// Workspace-scoped
'workspace:acme:tasks'
'workspace:acme:users'
2. Use Consistent Event Names
// Good naming convention
'resource.action'
'post.created'
'post.updated'
'post.deleted'
'comment.added'
'comment.edited'
// Avoid ambiguity
// Bad: 'update', 'change', 'modified'
// Good: 'updated', 'status-changed', 'content-modified'
3. Implement Idempotent Operations
function merge ( state , event ) {
// Use IDs to ensure idempotency
switch ( event . name ) {
case 'item.added' :
// Check if already exists
if ( ! state . items . find ( i => i . id === event . data . id )) {
state . items . push ( event . data );
}
break ;
}
return state ;
}
4. Handle Concurrent Updates
function merge ( state , event ) {
// Use timestamps for conflict resolution
const existingItem = state . items . find ( i => i . id === event . data . id );
if ( existingItem ) {
// Keep newer version
if ( event . data . updatedAt > existingItem . updatedAt ) {
Object . assign ( existingItem , event . data );
}
} else {
state . items . push ( event . data );
}
return state ;
}
5. Monitor and Log
const model = modelsClient . models . get ({
channelName: 'posts:123' ,
sync ,
merge : ( state , event ) => {
// Log events for debugging
console . log ( 'Merging event:' , event . name , event . data );
return actualMerge ( state , event );
}
});
// Monitor model lifecycle
model . on ( 'syncing' , () => console . log ( 'Model syncing...' ));
model . on ( 'ready' , () => console . log ( 'Model ready' ));
model . on ( 'paused' , () => console . log ( 'Model paused' ));
model . on ( 'disposed' , () => console . log ( 'Model disposed' ));
Next Steps
Conflict Resolution Learn strategies for resolving conflicts in distributed systems
Postgres Models Explore the Models SDK for Postgres in depth
Quickstart Build a complete LiveSync application from scratch