How to programmatically trigger flows

I could successfully trigger a webhook flow (without web request) using dynamically imported FlowManager. I guess you should be able to run other flow trigger types as well but I didn’t test that.

First of all make sure to change the module config inside your tsconfig.json of your extension to make dynamic imports possible.

	"compilerOptions": {
		"module": "node16",
		"moduleResolution": "node16",

In case you are running Directus Docker Compose right inside your extension dev folder, make sure that you don’t expose the dev directory for convenience reasons like I am used to do it, because it leaks the dev dependencies to the directus container and messes up with module resolutions:

volumes:
     - .:/directus/extensions/my-custom-extension // <= remove that in case you set it up this way

Then you should be able to import all that good stuff that’s not exposed by the Extension SDK at runtime.

import { defineHook } from '@directus/extensions-sdk';

export default defineHook(({ filter, action }) => {
	action('server.start', async (_meta, ctx) => {
		console.log('[flow-manager] server.start');
		console.log('[flow-manager] incoming accountability:', ctx?.accountability ?? null);
		try {
			// Build import path at runtime so bundler won't include @directus/api
			const base = '@directus' + '/api';

            // Get the FlowManager
			const { getFlowManager } = await import(`${base}/flows`);
			const { default: getDatabase } = await import(`${base}/database/index`);
			const { getSchema } = await import(`${base}/utils/get-schema`);
			const flowManager = getFlowManager();
			await flowManager?.initialize?.();
			console.log('[flow-manager] FlowManager initialized');

			// Build context: use provided accountability or fallback to internal admin
            // But since server.start hook does not have accountability it will be admin
			const accountability = (ctx as any)?.accountability ?? { admin: true, app: 'server' };
			const database = (ctx as any)?.database ?? getDatabase();
			const schema = (ctx as any)?.schema ?? (await getSchema({ database }));
			const flowCtx = {
				accountability,
				database,
				schema,
			} as any;

			// Change this to the friendly name of you flow
			const flowName = 'my-webhook-flow';

            // Resolve flow by friendly name and run it
			const { FlowsService } = await import(`${base}/services/flows`);
			const flowsService = new FlowsService({ knex: database, schema });
			const flows = await flowsService.readByQuery({
				filter: { name: { _eq: flowName }, status: { _eq: 'active' } },
				fields: ['id', 'trigger', 'options'],
				limit: 1,
			});
			const flow = flows?.[0];
			if (!flow) {
				console.warn(`[flow-manager] Flow not found or inactive: ${flowName}`);
				return;
			}
			if (flow.trigger === 'operation') {
				const res = await flowManager.runOperationFlow(flow.id, {}, flowCtx);
				console.log(`[flow-manager] Ran operation flow ${flowName} (${flow.id})`, res);
			} else if (flow.trigger === 'webhook') {
				const method = (flow.options?.method || 'GET').toUpperCase();
				const key = `${method}-${flow.id}`;
				const res = await flowManager.runWebhookFlow(key, {}, flowCtx);
				console.log(`[flow-manager] Ran webhook flow ${flowName} (${key})`, res);
			} else if (flow.trigger === 'manual') {
				const key = `POST-${flow.id}`;
				const res = await flowManager.runWebhookFlow(key, {}, flowCtx);
				console.log(`[flow-manager] Ran manual flow ${flowName} (${key})`, res);
			} else {
				console.warn(`[flow-manager] Flow trigger not supported in startup: ${flow.trigger}`);
			}
	
	
		} catch (err) {
			console.error('Something went wrong executing a flow from the server.start hook:', err);
		}
	});
});