Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/fair-mirrors-enjoy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@medusajs/medusa": patch
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/core-flows": patch
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---

Fix/workflows concurrency
90 changes: 84 additions & 6 deletions integration-tests/http/__tests__/cart/store/cart.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import {
PromotionStatus,
PromotionType,
} from "@medusajs/utils"
import { createAdminUser, generatePublishableKey, generateStoreHeaders, } from "../../../../helpers/create-admin-user"
import {
createAdminUser,
generatePublishableKey,
generateStoreHeaders,
} from "../../../../helpers/create-admin-user"
import { setupTaxStructure } from "../../../../modules/__tests__/fixtures"
import { createAuthenticatedCustomer } from "../../../../modules/helpers/create-authenticated-customer"
import { medusaTshirtProduct } from "../../../__fixtures__/product"
import { setTimeout } from "timers/promises"

jest.setTimeout(100000)

Expand Down Expand Up @@ -150,10 +155,9 @@ medusaIntegrationTestRunner({

describe("GET /store/carts/[id]", () => {
it("should return 404 when trying to fetch a cart that does not exist", async () => {
const response = await api.get(
`/store/carts/fake`,
storeHeadersWithCustomer
).catch((e) => e)
const response = await api
.get(`/store/carts/fake`, storeHeadersWithCustomer)
.catch((e) => e)

expect(response.response.status).toEqual(404)
})
Expand Down Expand Up @@ -1868,6 +1872,80 @@ medusaIntegrationTestRunner({
)
})

it("should successfully complete cart and fail on concurrent complete", async () => {
const paymentCollection = (
await api.post(
`/store/payment-collections`,
{ cart_id: cart.id },
storeHeaders
)
).data.payment_collection

await api.post(
`/store/payment-collections/${paymentCollection.id}/payment-sessions`,
{ provider_id: "pp_system_default" },
storeHeaders
)

await createCartCreditLinesWorkflow.run({
input: [
{
cart_id: cart.id,
amount: 100,
currency_code: "usd",
reference: "test",
reference_id: "test",
},
],
container: appContainer,
})

// Concurrently complete the cart
let completedCart: any[] = []
for (let i = 0; i < 5; i++) {
completedCart.push(
api
.post(`/store/carts/${cart.id}/complete`, {}, storeHeaders)
.catch((e) => e)
)

await setTimeout(25)
}

let all = await Promise.all(completedCart)

let success = all.filter((res) => res.status === 200)
let failure = all.filter((res) => res.status !== 200)

const successData = success[0].data.order
for (const res of success) {
expect(res.data.order).toEqual(successData)
}

expect(failure.length).toBeGreaterThan(0)

expect(successData).toEqual(
expect.objectContaining({
id: expect.any(String),
currency_code: "usd",
credit_lines: [
expect.objectContaining({
amount: 100,
reference: "test",
reference_id: "test",
}),
],
items: expect.arrayContaining([
expect.objectContaining({
unit_price: 1500,
compare_at_unit_price: null,
quantity: 1,
}),
]),
})
)
})

it("should successfully complete cart", async () => {
const paymentCollection = (
await api.post(
Expand All @@ -1883,7 +1961,7 @@ medusaIntegrationTestRunner({
storeHeaders
)

createCartCreditLinesWorkflow.run({
await createCartCreditLinesWorkflow.run({
input: [
{
cart_id: cart.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ export const completeCartWorkflow = createWorkflow(
entity: "order_cart",
fields: ["cart_id", "order_id"],
filters: { cart_id: input.id },
options: {
isList: false,
},
})

const orderId = transform({ orderCart }, ({ orderCart }) => {
return orderCart.data[0]?.order_id
return orderCart?.data?.order_id
})

const cart = useRemoteQueryStep({
Expand Down Expand Up @@ -263,7 +266,7 @@ export const completeCartWorkflow = createWorkflow(
const createdOrders = createOrdersStep([cartToOrder])

const createdOrder = transform({ createdOrders }, ({ createdOrders }) => {
return createdOrders?.[0] ?? undefined
return createdOrders[0]
})

const reservationItemsData = transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ describe("Transaction Orchestrator", () => {
},
{
action: "three",
async: true,
maxRetries: 0,
next: {
action: "five",
Expand All @@ -228,24 +227,14 @@ describe("Transaction Orchestrator", () => {

await strategy.resume(transaction)

expect(transaction.getErrors()).toHaveLength(2)
expect(transaction.getErrors()).toHaveLength(1)
expect(transaction.getErrors()).toEqual([
{
action: "three",
error: {
error: expect.objectContaining({
message: "Step 3 failed",
name: "Error",
stack: expect.any(String),
},
handlerType: "invoke",
},
{
action: "three",
error: expect.objectContaining({
message: expect.stringContaining(
"Converting circular structure to JSON"
),
stack: expect.any(String),
}),
handlerType: "invoke",
},
Expand Down Expand Up @@ -1052,6 +1041,8 @@ describe("Transaction Orchestrator", () => {

await strategy.resume(transaction)

await new Promise((resolve) => process.nextTick(resolve))

expect(mocks.one).toHaveBeenCalledTimes(1)
expect(mocks.two).toHaveBeenCalledTimes(0)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
Expand Down Expand Up @@ -1148,6 +1139,8 @@ describe("Transaction Orchestrator", () => {

await strategy.resume(transaction)

await new Promise((resolve) => process.nextTick(resolve))

expect(mocks.one).toHaveBeenCalledTimes(1)
expect(mocks.compensateOne).toHaveBeenCalledTimes(0)
expect(mocks.two).toHaveBeenCalledTimes(0)
Expand All @@ -1171,6 +1164,8 @@ describe("Transaction Orchestrator", () => {
transaction,
})

await new Promise((resolve) => process.nextTick(resolve))

expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING)
expect(mocks.compensateOne).toHaveBeenCalledTimes(1)

Expand Down Expand Up @@ -1263,6 +1258,7 @@ describe("Transaction Orchestrator", () => {
})

await strategy.resume(transaction)
await new Promise((resolve) => process.nextTick(resolve))

expect(mocks.one).toHaveBeenCalledTimes(1)
expect(mocks.compensateOne).toHaveBeenCalledTimes(1)
Expand Down Expand Up @@ -1335,6 +1331,7 @@ describe("Transaction Orchestrator", () => {
})

await strategy.resume(transaction)
await new Promise((resolve) => process.nextTick(resolve))

expect(transaction.getState()).toBe(TransactionState.DONE)
expect(mocks.one).toHaveBeenCalledTimes(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ describe("WorkflowManager", () => {
it("should continue an asyncronous transaction after reporting a successful step", async () => {
const transaction = await flow.run("deliver-product", "t-id")

await new Promise((resolve) => process.nextTick(resolve))

expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
Expand All @@ -135,6 +137,8 @@ describe("WorkflowManager", () => {
it("should revert an asyncronous transaction after reporting a failure step", async () => {
const transaction = await flow.run("deliver-product", "t-id")

await new Promise((resolve) => process.nextTick(resolve))

expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ describe("WorkflowManager", () => {
const flow = new LocalWorkflow("deliver-product", container)
const transaction = await flow.run("t-id")

await new Promise((resolve) => process.nextTick(resolve))

expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
Expand All @@ -177,6 +179,8 @@ describe("WorkflowManager", () => {
const flow = new LocalWorkflow("deliver-product", container)
const transaction = await flow.run("t-id")

await new Promise((resolve) => process.nextTick(resolve))

expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,8 +951,10 @@ export class TransactionOrchestrator extends EventEmitter {
this.executeSyncStep(promise, transaction, step, nextSteps)
)
} else {
// Execute async step in background and continue the execution of the transaction
this.executeAsyncStep(promise, transaction, step, nextSteps)
// Execute async step in background as part of the next event loop cycle and continue the execution of the transaction
process.nextTick(() =>
this.executeAsyncStep(promise, transaction, step, nextSteps)
)
hasAsyncSteps = true
}
}
Expand Down
22 changes: 13 additions & 9 deletions packages/core/workflows-sdk/src/utils/composer/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ export function transform(
const ret = {
__id: uniqId,
__type: OrchestrationUtils.SymbolWorkflowStepTransformer,
__temporary_storage_key: null as { key: string } | null,
} as WorkflowData & {
__id: string
__type: string
__temporary_storage_key: { key: string } | null
}

const returnFn = async function (
Expand All @@ -176,6 +179,7 @@ export function transform(
): Promise<any> {
if ("transaction" in transactionContext) {
const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}`

ret.__temporary_storage_key ??= { key: temporaryDataKey }

if (
Expand All @@ -199,15 +203,14 @@ export function transform(
const fn = functions[i]
const arg = i === 0 ? stepValue : finalResult

finalResult = await fn.apply(fn, [arg, transactionContext])
finalResult = fn.apply(fn, [arg, transactionContext])
if (finalResult instanceof Promise) {
finalResult = await finalResult
}
}

if ("transaction" in transactionContext) {
const temporaryDataKey = ret.__temporary_storage_key!
if (!temporaryDataKey) {
return finalResult
}

transactionContext.transaction.setTemporaryData(
temporaryDataKey,
finalResult
Expand All @@ -217,10 +220,11 @@ export function transform(
return finalResult
}

const proxyfiedRet = proxify<WorkflowData & { __resolver: any }>(
ret as unknown as WorkflowData
)
const proxyfiedRet = proxify<
WorkflowData & { __resolver: any; __temporary_storage_key: string | null }
>(ret as unknown as WorkflowData)
proxyfiedRet.__resolver = returnFn as any
proxyfiedRet.__temporary_storage_key = null as string | null

return proxyfiedRet
}
17 changes: 13 additions & 4 deletions packages/medusa/src/api/store/carts/[id]/complete/route.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { completeCartWorkflow } from "@medusajs/core-flows"
import { completeCartWorkflowId } from "@medusajs/core-flows"
import { prepareRetrieveQuery } from "@medusajs/framework"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { HttpTypes } from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
MedusaError,
Modules,
} from "@medusajs/framework/utils"
import { refetchCart } from "../../helpers"
import { defaultStoreCartFields } from "../../query-config"
Expand All @@ -14,13 +15,21 @@ export const POST = async (
res: MedusaResponse<HttpTypes.StoreCompleteCartResponse>
) => {
const cart_id = req.params.id
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)

const { errors, result } = await completeCartWorkflow(req.scope).run({
const { errors, result, transaction } = await we.run(completeCartWorkflowId, {
input: { id: cart_id },
context: { transactionId: cart_id },
transactionId: cart_id,
throwOnError: false,
})

if (!transaction.hasFinished()) {
throw new MedusaError(
MedusaError.Types.CONFLICT,
"Cart is already being completed by another request"
)
}

const query = req.scope.resolve(ContainerRegistrationKeys.QUERY)

// When an error occurs on the workflow, its potentially got to with cart validations, payments
Expand All @@ -47,7 +56,7 @@ export const POST = async (
).remoteQueryConfig.fields
)

if (!statusOKErrors.includes(error.type)) {
if (!statusOKErrors.includes(error?.type)) {
throw error
}

Expand Down
Loading