Skip to content

Commit 1a78476

Browse files
authored
fix(workflow-sdk): Async/nested runAsStep propagation (#12675)
FIXES CLO-524 **What** Add hidden stepDefinition object as part of the step argument and ensure the runAsStep handlers rely on the latest definition when config is being used on the returned step in order to ensure async configuration propagation and nested configuration
1 parent b456044 commit 1a78476

File tree

10 files changed

+210
-23
lines changed

10 files changed

+210
-23
lines changed

.changeset/twelve-bears-wait.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@medusajs/workflow-engine-inmemory": patch
3+
"@medusajs/workflow-engine-redis": patch
4+
"@medusajs/workflows-sdk": patch
5+
---
6+
7+
fix(workflow-sdk): Async propagation

packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,143 @@ import { transform } from "../transform"
99
import { WorkflowData } from "../type"
1010
import { when } from "../when"
1111
import { createHook } from "../create-hook"
12+
import { TransactionStepsDefinition } from "@medusajs/orchestration"
1213

1314
let count = 1
1415
const getNewWorkflowId = () => `workflow-${count++}`
1516

1617
describe("Workflow composer", () => {
17-
describe("running sub workflows", () => {
18+
describe("when running workflows as sub-workflows", () => {
19+
describe("handling of async and nested workflow configurations", () => {
20+
it("should set the runAsStep as nested and async when parent workflow is async", async () => {
21+
const subworkflowStep1 = createStep("step1", async (_, context) => {
22+
return new StepResponse({ result: "sub workflow step1" })
23+
})
24+
25+
const subWorkflowId = getNewWorkflowId()
26+
const subWorkflow = createWorkflow(
27+
subWorkflowId,
28+
function (input: WorkflowData<string>) {
29+
subworkflowStep1()
30+
return new WorkflowResponse(void 0)
31+
}
32+
)
33+
34+
const step1 = createStep(
35+
{ name: "step1", async: true },
36+
async (_, context) => {
37+
return new StepResponse({ result: "step1" })
38+
}
39+
)
40+
41+
const workflowId = getNewWorkflowId()
42+
const workflow = createWorkflow(workflowId, function () {
43+
step1()
44+
45+
const subWorkflowRes = subWorkflow.runAsStep({
46+
input: "hi from outside",
47+
})
48+
49+
return new WorkflowResponse(subWorkflowRes)
50+
})
51+
52+
expect(workflow().getFlow().async).toBe(true)
53+
expect(subWorkflow().getFlow().async).toBeUndefined()
54+
55+
const runAsStep = workflow().getFlow()
56+
.next! as TransactionStepsDefinition
57+
58+
expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`)
59+
expect(runAsStep.async).toBe(true)
60+
expect(runAsStep.nested).toBe(true)
61+
})
62+
63+
it("should set the runAsStep as nested and async when parent workflow is sync but sub workflow is async", async () => {
64+
const subworkflowStep1 = createStep(
65+
{ name: "step1", async: true },
66+
async (_, context) => {
67+
return new StepResponse({ result: "sub workflow step1" })
68+
}
69+
)
70+
71+
const subWorkflowId = getNewWorkflowId()
72+
const subWorkflow = createWorkflow(
73+
subWorkflowId,
74+
function (input: WorkflowData<string>) {
75+
subworkflowStep1()
76+
return new WorkflowResponse(void 0)
77+
}
78+
)
79+
80+
const step1 = createStep("step1", async (_, context) => {
81+
return new StepResponse({ result: "step1" })
82+
})
83+
84+
const workflowId = getNewWorkflowId()
85+
const workflow = createWorkflow(workflowId, function () {
86+
step1()
87+
88+
const subWorkflowRes = subWorkflow.runAsStep({
89+
input: "hi from outside",
90+
})
91+
92+
return new WorkflowResponse(subWorkflowRes)
93+
})
94+
95+
expect(workflow().getFlow().async).toBeUndefined()
96+
expect(subWorkflow().getFlow().async).toBe(true)
97+
98+
const runAsStep = workflow().getFlow()
99+
.next! as TransactionStepsDefinition
100+
101+
expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`)
102+
expect(runAsStep.async).toBe(true)
103+
expect(runAsStep.nested).toBe(true)
104+
})
105+
106+
it("should set the runAsStep as nested and async when parent workflow is sync as well as sub workflow but the step is configured as async", async () => {
107+
const subworkflowStep1 = createStep("step1", async (_, context) => {
108+
return new StepResponse({ result: "sub workflow step1" })
109+
})
110+
111+
const subWorkflowId = getNewWorkflowId()
112+
const subWorkflow = createWorkflow(
113+
subWorkflowId,
114+
function (input: WorkflowData<string>) {
115+
subworkflowStep1()
116+
return new WorkflowResponse({})
117+
}
118+
)
119+
120+
const step1 = createStep("step1", async (_, context) => {
121+
return new StepResponse({ result: "step1" })
122+
})
123+
124+
const workflowId = getNewWorkflowId()
125+
const workflow = createWorkflow(workflowId, function () {
126+
step1()
127+
128+
const subWorkflowRes = subWorkflow
129+
.runAsStep({
130+
input: "hi from outside",
131+
})
132+
.config({ async: true })
133+
134+
return new WorkflowResponse(subWorkflowRes)
135+
})
136+
137+
expect(workflow().getFlow().async).toBeUndefined()
138+
expect(subWorkflow().getFlow().async).toBeUndefined()
139+
140+
const runAsStep = workflow().getFlow()
141+
.next! as TransactionStepsDefinition
142+
143+
expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`)
144+
expect(runAsStep.async).toBe(true)
145+
expect(runAsStep.nested).toBe(true)
146+
})
147+
})
148+
18149
it("should succeed", async function () {
19150
const step1 = createStep("step1", async (_, context) => {
20151
return new StepResponse({ result: "step1" })

packages/core/workflows-sdk/src/utils/composer/create-step.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
WorkflowStepHandler,
55
WorkflowStepHandlerArguments,
66
} from "@medusajs/orchestration"
7-
import { isString, OrchestrationUtils } from "@medusajs/utils"
7+
import { isDefined, isString, OrchestrationUtils } from "@medusajs/utils"
88
import { ulid } from "ulid"
99
import { resolveValue, StepResponse } from "./helpers"
1010
import { createStepHandler } from "./helpers/create-step-handler"
@@ -173,6 +173,10 @@ export function applyStep<
173173
...localConfig,
174174
}
175175

176+
if (isDefined(newConfig.nested)) {
177+
newConfig.nested ||= newConfig.async
178+
}
179+
176180
delete localConfig.name
177181

178182
const handler = createStepHandler.bind(this)({

packages/core/workflows-sdk/src/utils/composer/create-workflow.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
198198
},
199199
async (stepInput: TData, stepContext) => {
200200
const { container, ...sharedContext } = stepContext
201+
const isAsync = stepContext[" stepDefinition"]?.async
201202

202203
const workflowEngine = container.resolve(Modules.WORKFLOW_ENGINE, {
203204
allowUnregistered: true,
@@ -212,7 +213,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
212213
}
213214

214215
let transaction
215-
if (workflowEngine && runAsAsync) {
216+
if (workflowEngine && isAsync) {
216217
transaction = await workflowEngine.run(name, {
217218
input: stepInput as any,
218219
context: executionContext,
@@ -227,7 +228,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
227228

228229
return new StepResponse(
229230
transaction.result,
230-
runAsAsync ? stepContext.transactionId : transaction
231+
isAsync ? stepContext.transactionId : transaction
231232
)
232233
},
233234
async (transaction, stepContext) => {
@@ -237,6 +238,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
237238
}
238239

239240
const { container, ...sharedContext } = stepContext
241+
const isAsync = stepContext[" stepDefinition"]?.async
240242

241243
const workflowEngine = container.resolve(Modules.WORKFLOW_ENGINE, {
242244
allowUnregistered: true,
@@ -252,7 +254,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
252254

253255
const transactionId = step.__step__ + "-" + stepContext.transactionId
254256

255-
if (workflowEngine && runAsAsync) {
257+
if (workflowEngine && isAsync) {
256258
await workflowEngine.cancel(name, {
257259
transactionId: transactionId,
258260
context: executionContext,

packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ function buildStepContext({
2121

2222
stepArguments.context!.idempotencyKey = idempotencyKey
2323

24-
const flowMetadata = stepArguments.transaction.getFlow()?.metadata
24+
const flow = stepArguments.transaction.getFlow()
25+
const flowMetadata = flow?.metadata
26+
const stepDefinition = stepArguments.step.definition
27+
2528
const executionContext: StepExecutionContext = {
2629
workflowId: metadata.model_id,
2730
stepName: metadata.action,
@@ -36,6 +39,7 @@ function buildStepContext({
3639
preventReleaseEvents: flowMetadata?.preventReleaseEvents ?? false,
3740
transactionId: stepArguments.context!.transactionId,
3841
context: stepArguments.context!,
42+
" stepDefinition": stepDefinition,
3943
" getStepResult"(
4044
stepId: string,
4145
action: "invoke" | "compensate" = "invoke"

packages/core/workflows-sdk/src/utils/composer/type.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ export interface StepExecutionContext {
198198
* Adding a space hides the method from the autocomplete
199199
*/
200200
" getStepResult"(stepId: string, action?: "invoke" | "compensate"): any
201+
202+
/**
203+
* Get access to the definition of the step.
204+
*/
205+
" stepDefinition": TransactionStepsDefinition
201206
}
202207

203208
export type WorkflowTransactionContext = StepExecutionContext &

packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,12 @@ export class WorkflowOrchestratorService {
208208
await this.triggerParentStep(ret.transaction, result)
209209
}
210210

211-
if (throwOnError && ret.thrownError) {
212-
throw ret.thrownError
211+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
212+
if (ret.thrownError) {
213+
throw ret.thrownError
214+
}
215+
216+
throw ret.errors[0].error
213217
}
214218

215219
return { acknowledgement, ...ret }
@@ -317,8 +321,12 @@ export class WorkflowOrchestratorService {
317321
await this.triggerParentStep(ret.transaction, result)
318322
}
319323

320-
if (throwOnError && ret.thrownError) {
321-
throw ret.thrownError
324+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
325+
if (ret.thrownError) {
326+
throw ret.thrownError
327+
}
328+
329+
throw ret.errors[0].error
322330
}
323331

324332
return { acknowledgement, ...ret }
@@ -411,8 +419,12 @@ export class WorkflowOrchestratorService {
411419
await this.triggerParentStep(ret.transaction, result)
412420
}
413421

414-
if (throwOnError && ret.thrownError) {
415-
throw ret.thrownError
422+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
423+
if (ret.thrownError) {
424+
throw ret.thrownError
425+
}
426+
427+
throw ret.errors[0].error
416428
}
417429

418430
return ret
@@ -477,8 +489,12 @@ export class WorkflowOrchestratorService {
477489
await this.triggerParentStep(ret.transaction, result)
478490
}
479491

480-
if (throwOnError && ret.thrownError) {
481-
throw ret.thrownError
492+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
493+
if (ret.thrownError) {
494+
throw ret.thrownError
495+
}
496+
497+
throw ret.errors[0].error
482498
}
483499

484500
return ret

packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import {
22
createStep,
33
createWorkflow,
4-
parallelize,
54
StepResponse,
65
WorkflowResponse,
76
} from "@medusajs/framework/workflows-sdk"

packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
495495
transactionId: "transaction_1",
496496
},
497497
stepResponse: { uhuuuu: "yeaah!" },
498+
options: {
499+
throwOnError: false,
500+
},
498501
})
499502
;({ data: executionsList } = await query.graph({
500503
entity: "workflow_executions",

packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,12 @@ export class WorkflowOrchestratorService {
268268
await this.triggerParentStep(ret.transaction, result)
269269
}
270270

271-
if (throwOnError && ret.thrownError) {
272-
throw ret.thrownError
271+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
272+
if (ret.thrownError) {
273+
throw ret.thrownError
274+
}
275+
276+
throw ret.errors[0].error
273277
}
274278

275279
return { acknowledgement, ...ret }
@@ -373,8 +377,12 @@ export class WorkflowOrchestratorService {
373377
await this.triggerParentStep(ret.transaction, result)
374378
}
375379

376-
if (throwOnError && ret.thrownError) {
377-
throw ret.thrownError
380+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
381+
if (ret.thrownError) {
382+
throw ret.thrownError
383+
}
384+
385+
throw ret.errors[0].error
378386
}
379387

380388
return { acknowledgement, ...ret }
@@ -467,8 +475,12 @@ export class WorkflowOrchestratorService {
467475
await this.triggerParentStep(ret.transaction, result)
468476
}
469477

470-
if (throwOnError && ret.thrownError) {
471-
throw ret.thrownError
478+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
479+
if (ret.thrownError) {
480+
throw ret.thrownError
481+
}
482+
483+
throw ret.errors[0].error
472484
}
473485

474486
return ret
@@ -534,8 +546,12 @@ export class WorkflowOrchestratorService {
534546
await this.triggerParentStep(ret.transaction, result)
535547
}
536548

537-
if (throwOnError && ret.thrownError) {
538-
throw ret.thrownError
549+
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
550+
if (ret.thrownError) {
551+
throw ret.thrownError
552+
}
553+
554+
throw ret.errors[0].error
539555
}
540556

541557
return ret

0 commit comments

Comments
 (0)