I’m working on an integration where I need to send a message to a RabbitMQ queue every time an item is updated in Directus. The goal is for the message to contain the complete item state before the update, the state after the update, and a list of the changed fields.
I’ve been exploring how to implement this reliably and have identified that handling concurrent updates on the same item is not easy.
What I’ve tried:
Using a combination of filter and action hooks on items.update:
filter hook: Read the item’s state from the database to get the “before” data.
action hook: Read the item’s state again to get the “after” data, then dispatch the message.
The core problem is reliably linking the data from the filter hook to its corresponding action hook.
I’ve considered a few patterns, but each seems to have a potential flaw in high-concurrency scenarios, especially with multiples Directus replicas:
Is there a recommended best practice or an official Directus pattern for solving this classic “before/after” state problem ?
Any advice or shared experience would be greatly appreciated. Thank you!
I have one concern with this approach. Since the filter hook runs before the database transaction is committed, a consumer could receive the RabbitMQ message and then call the API, only to get the old data before the database has actually updated.
This seems to create a potential race condition. Is my understanding correct?
That might handle the scenario. It’s basically saving the before state into an extension array and attaching an identifier to the payload to retrieve the before state in the action hook.
That solution would still leave a tiny hole between entering the filter and reading the before state for concurrency to mess up. To close this gap, I think you would need to implement some kind of lock and queue.
import { defineHook } from '@directus/extensions-sdk';
export default defineHook(({ filter, action }) => {
const pendingUpdates: any[] = []; // Pending updates get cleared when server or extension restarts. Could be even more robust using a db table as queue.
filter('demo.items.update', async (payload: any, meta, context) => {
const { database } = context;
const { keys, collection } = meta;
const updateId = crypto.randomUUID();
// Attach the update ID to the payload, so that we can identify the update in the action handler
payload.updateId = updateId;
// Query items before the update. Can be more than one item when batch updated
const itemsBeforeUpdate = await database
.from(collection)
.whereIn('id', keys)
.select('*');
// Create a pending update object to store the before state and updateId
const pendingUpdate = {
itemsBeforeUpdate,
updateId
}
pendingUpdates.push(pendingUpdate);
// Return the payload with the updateId
return payload;
});
action('demo.items.update', (meta, context) => {
const { payload, keys, collection } = meta;
// Read the update ID from the payload
const updateId = payload.updateId;
// Find the pending update in the pendingUpdates array
const pendingUpdate = pendingUpdates.find((item: any) => item.updateId === updateId);
if (!pendingUpdate) {
throw new Error(`Pending update with ID ${updateId} not found`);
}
const itemsBeforeUpdate = pendingUpdate.itemsBeforeUpdate;
// Handle your logic here
// Basically iterate over itemsBeforeUpdate, merge the payload to get the updated state of the item, strip of the identifier, calculate the diff and send to RabbitMQ
// (don't query items again, as that could lead to wrong results if the items got updated again in the meantime)
// Remove the pending update from the pendingUpdates array
pendingUpdates.splice(pendingUpdates.findIndex((item: any) => item.updateId === updateId), 1);
});
});
I wonder if we could actually make use of Directus revision system to handle this more reliable. Revision rows are created per item, so we don’t have to take care of batch updates.
action('revisions.create', async (meta, context) => {
const { payload, key, collection } = meta;
const { database } = context;
// Query for the previous revision (next lower ID) of the same collection and same item id
const previousRevision = await database
.from('directus_revisions')
.where('collection', payload.collection)
.where('item', payload.item)
.where('id', '<', key)
.orderBy('id', 'desc')
.first();
const oldData = previousRevision.data;
const newData = payload.data;
const delta = payload.delta;
// Send to RabbitMQ
});