How to programmatically trigger flows

Hi there!

I need to trigger a flow from a hook extension. I know there is a FlowsService, but this does not contain a trigger method.

I now there is a POST /flows/trigger/<id>endpoint, but I’d much rather use an internal service to prevent http request.

Any suggestions?

2 Answers

2

I could successfully trigger a webhook flow (without web request) using dynamically imported FlowManager. I guess you should be able to run other flow trigger types as well but I didn’t test that.

First of all make sure to change the module config inside your tsconfig.json of your extension to make dynamic imports possible.

	"compilerOptions": {
		"module": "node16",
		"moduleResolution": "node16",

In case you are running Directus Docker Compose right inside your extension dev folder, make sure that you don’t expose the dev directory for convenience reasons like I am used to do it, because it leaks the dev dependencies to the directus container and messes up with module resolutions:

volumes:
     - .:/directus/extensions/my-custom-extension // <= remove that in case you set it up this way

Then you should be able to import all that good stuff that’s not exposed by the Extension SDK at runtime.

import { defineHook } from '@directus/extensions-sdk';

export default defineHook(({ filter, action }) => {
	action('server.start', async (_meta, ctx) => {
		console.log('[flow-manager] server.start');
		console.log('[flow-manager] incoming accountability:', ctx?.accountability ?? null);
		try {
			// Build import path at runtime so bundler won't include @directus/api
			const base = '@directus' + '/api';

            // Get the FlowManager
			const { getFlowManager } = await import(`${base}/flows`);
			const { default: getDatabase } = await import(`${base}/database/index`);
			const { getSchema } = await import(`${base}/utils/get-schema`);
			const flowManager = getFlowManager();
			await flowManager?.initialize?.();
			console.log('[flow-manager] FlowManager initialized');

			// Build context: use provided accountability or fallback to internal admin
            // But since server.start hook does not have accountability it will be admin
			const accountability = (ctx as any)?.accountability ?? { admin: true, app: 'server' };
			const database = (ctx as any)?.database ?? getDatabase();
			const schema = (ctx as any)?.schema ?? (await getSchema({ database }));
			const flowCtx = {
				accountability,
				database,
				schema,
			} as any;

			// Change this to the friendly name of you flow
			const flowName = 'my-webhook-flow';

            // Resolve flow by friendly name and run it
			const { FlowsService } = await import(`${base}/services/flows`);
			const flowsService = new FlowsService({ knex: database, schema });
			const flows = await flowsService.readByQuery({
				filter: { name: { _eq: flowName }, status: { _eq: 'active' } },
				fields: ['id', 'trigger', 'options'],
				limit: 1,
			});
			const flow = flows?.[0];
			if (!flow) {
				console.warn(`[flow-manager] Flow not found or inactive: ${flowName}`);
				return;
			}
			if (flow.trigger === 'operation') {
				const res = await flowManager.runOperationFlow(flow.id, {}, flowCtx);
				console.log(`[flow-manager] Ran operation flow ${flowName} (${flow.id})`, res);
			} else if (flow.trigger === 'webhook') {
				const method = (flow.options?.method || 'GET').toUpperCase();
				const key = `${method}-${flow.id}`;
				const res = await flowManager.runWebhookFlow(key, {}, flowCtx);
				console.log(`[flow-manager] Ran webhook flow ${flowName} (${key})`, res);
			} else if (flow.trigger === 'manual') {
				const key = `POST-${flow.id}`;
				const res = await flowManager.runWebhookFlow(key, {}, flowCtx);
				console.log(`[flow-manager] Ran manual flow ${flowName} (${key})`, res);
			} else {
				console.warn(`[flow-manager] Flow trigger not supported in startup: ${flow.trigger}`);
			}
	
	
		} catch (err) {
			console.error('Something went wrong executing a flow from the server.start hook:', err);
		}
	});
});

@Nik thanks for the exmple! I'll give it a go

Hmm setting the ts config to node16 is a rabbit hole I'm currently not willing to jump down.. :grin: For now, i'll just send post requests to the webhooks endpoints. That did lead to another problem though.. Im sending these requests on 'server.start' hook, and for some reason webhooks endpoints are not available right away (but return 404). I implemented and incremental retry, and it seems the webhooks are operational after about 30 sec.. super weird. Took a dive in the flow manager and could not spot any reason for this behaviour. No Lifecycle events occur within those ...

... 30 seconds (eg extensions.load unload or reload). So no clue why they return 404 in that time window. Also locally they just work on start up, but not when deployed in our serverless environment.

I checked btw if the flow 'exists' during that window (eg await new FlowsService.readOne(flowId), and that returns the flow just fine. So its an issue with the webhooks endpoints, not de flow itself :thinking:

No expierence about serverless here, but I can see some delay before routing kicks in on server startup when running Directus behind Traefik reverse proxy.

Hello @remihuigen

Did you try this?

@ahmad_quvor Nope, not what im looking for.. I need to trigger flows immediately after directus has started. So I need to trigger them from a action('server.started') hook.