Skip to content

Commit 9ab62e3

Browse files
feat(orchestrator): triggering kafka workflows with cloud events. backend only code (#2512)
* feat: triggering kafka workflows with cloud events. backend only code for https://redhat.atlassian.net/browse/RHIDP-9143 * squash: add the changeset * squash: only log the clientid and brokers * squash: only log the clientid and brokers * squash: kafka service options should be undefined if not there * squash: return an error if there is no kafka implementation when triggering as a CE * squash: update the config.d.ts * squash: adding more tests that mock the kafka calls * squash: add orchestrator.kafka config example in app-config * squash: remove rogue console.log * squash: addd config.d.ts to package.json * squash: Update workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts Co-authored-by: Oleksandr Andriienko <oandriie@redhat.com> * squash: add delay for retry * squash: update tests and key is now configurable * squash: update yarn.lock * squash: dedupe * squash: remove redundant * squash: small fix * squash: sonarcube cleanup * squash: dedupe * squash: dedupe --------- Co-authored-by: Oleksandr Andriienko <oandriie@redhat.com>
1 parent cd78380 commit 9ab62e3

File tree

15 files changed

+1014
-26
lines changed

15 files changed

+1014
-26
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@red-hat-developer-hub/backstage-plugin-orchestrator-backend': minor
3+
---
4+
5+
feature: add the ability to trigger kafka based workflows from the plugin

workspaces/orchestrator/app-config.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,18 @@ orchestrator:
166166
# logStreamSelectors:
167167
# - label: 'selector'
168168
# value: 'value'
169+
kafka:
170+
# A logical identifier of an application.
171+
# https://kafka.js.org/docs/configuration#client-id
172+
clientId: orchestratorKafka
173+
# logLevel override for the orchestrator kafka services. Defaults to INFO which is 4
174+
# logLevel values based on KafkaJS values https://kafka.js.org/docs/configuration#logging
175+
# logLevel: 5 // DEBUG
176+
brokers:
177+
- localhost:9092
178+
# https://kafka.js.org/docs/producing#message-key
179+
# Optional and will just default to an empty string
180+
# messageKey: messageKey
169181
sonataFlowService:
170182
# uncomment the next line to use podman instead of docker
171183
# runtime: podman
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
export interface Config {
18+
/**
19+
* Configuration for the Orchestrator plugin.
20+
*/
21+
orchestrator?: {
22+
kafka?: {
23+
// A logical identifier of an application.
24+
// https://kafka.js.org/docs/configuration#client-id
25+
clientId: string;
26+
// logLevel override for the orchestrator kafka services
27+
// logLevel values based on KafkaJS values https://kafka.js.org/docs/configuration#logging
28+
// export enum logLevel {
29+
// NOTHING = 0,
30+
// ERROR = 1,
31+
// WARN = 2,
32+
// INFO = 4,
33+
// DEBUG = 5,
34+
// }
35+
logLevel?: 0 | 1 | 2 | 4 | 5;
36+
/**
37+
* List of brokers in the Kafka cluster to connect to.
38+
*/
39+
brokers: string[];
40+
/**
41+
* https://kafka.js.org/docs/producing#message-key
42+
*/
43+
messageKey?: string;
44+
/**
45+
* Optional SSL connection parameters to connect to the cluster. Passed directly to Node tls.connect.
46+
* See https://nodejs.org/dist/latest-v8.x/docs/api/tls.html#tls_tls_createsecurecontext_options
47+
*/
48+
ssl?:
49+
| {
50+
ca?: string[];
51+
/** @visibility secret */
52+
key?: string;
53+
cert?: string;
54+
rejectUnauthorized?: boolean;
55+
}
56+
| boolean;
57+
/**
58+
* Optional SASL connection parameters.
59+
*/
60+
sasl?: {
61+
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
62+
username: string;
63+
/** @visibility secret */
64+
password: string;
65+
};
66+
};
67+
};
68+
}

workspaces/orchestrator/plugins/orchestrator-backend/package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343
],
4444
"files": [
4545
"app-config.yaml",
46+
"config.d.ts",
4647
"dist",
4748
"dist-dynamic/*.*",
4849
"dist-dynamic/dist/**",
4950
"static"
5051
],
52+
"configSchema": "config.d.ts",
5153
"scripts": {
5254
"start": "backstage-cli package start",
5355
"build": "backstage-cli package build",
@@ -78,11 +80,13 @@
7880
"@red-hat-developer-hub/backstage-plugin-orchestrator-node": "workspace:^",
7981
"@urql/core": "^6.0.1",
8082
"ajv-formats": "^2.1.1",
81-
"cloudevents": "^8.0.0",
83+
"cloudevents": "^10.0.0",
8284
"express": "^4.21.2",
8385
"express-promise-router": "^4.1.1",
8486
"fs-extra": "^10.1.0",
8587
"isomorphic-git": "^1.23.0",
88+
"js-yaml": "^4.1.0",
89+
"kafkajs": "^2.2.4",
8690
"lodash": "^4.18.1",
8791
"luxon": "^3.7.2",
8892
"openapi-backend": "^5.10.5",

workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 The Backstage Authors
2+
* Copyright Red Hat, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
import type { LoggerService } from '@backstage/backend-plugin-api';
1718
import type { Config } from '@backstage/config';
1819

workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,4 +327,30 @@ describe('OrchestratorService', () => {
327327
expect(result).toBeDefined();
328328
});
329329
});
330+
331+
describe('executeWorkflowAsCloudEvent', () => {
332+
const executeResponse: WorkflowExecutionResponse = {
333+
id: createInstanceIdMock(1),
334+
};
335+
336+
beforeEach(() => {
337+
jest.clearAllMocks();
338+
});
339+
340+
it('should execute the operation', async () => {
341+
sonataFlowServiceMock.executeWorkflowAsCloudEvent = jest
342+
.fn()
343+
.mockResolvedValue(executeResponse);
344+
345+
const result = await orchestratorService.executeWorkflowAsCloudEvent({
346+
definitionId,
347+
workflowSource: 'local',
348+
workflowEventType: 'lock-event',
349+
contextAttribute: 'lockId',
350+
inputData,
351+
});
352+
353+
expect(result).toBeDefined();
354+
});
355+
});
330356
});

workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,18 @@ export class OrchestratorService {
143143
});
144144
}
145145

146+
public async executeWorkflowAsCloudEvent(args: {
147+
definitionId: string;
148+
workflowSource: string;
149+
workflowEventType: string;
150+
contextAttribute: string;
151+
inputData?: ProcessInstanceVariables;
152+
authTokens?: Array<AuthToken>;
153+
backstageToken?: string;
154+
}) {
155+
return await this.sonataFlowService.executeWorkflowAsCloudEvent(args);
156+
}
157+
146158
public async executeWorkflow(args: {
147159
definitionId: string;
148160
serviceUrl: string;

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.test.ts

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,14 @@
1616

1717
import { LoggerService } from '@backstage/backend-plugin-api';
1818

19+
import { OrchestratorKafkaServiceOptions } from '../types/kafka';
1920
import { DataIndexService } from './DataIndexService';
2021
import { SonataFlowService } from './SonataFlowService';
2122

23+
jest.mock('node:crypto', () => ({
24+
randomUUID: () => '12345',
25+
}));
26+
2227
describe('SonataFlowService', () => {
2328
let loggerMock: jest.Mocked<LoggerService>;
2429
let dataIndexServiceMock: jest.Mocked<DataIndexService>;
@@ -155,6 +160,112 @@ describe('SonataFlowService', () => {
155160
);
156161
});
157162
});
163+
164+
describe('executeWorkflowAsCloudEvent', () => {
165+
const runErrorTestAsCloudEventNoKafkaImplementation =
166+
async (): Promise<void> => {
167+
await sonataFlowService.executeWorkflowAsCloudEvent({
168+
definitionId,
169+
workflowSource: 'workflowSource',
170+
workflowEventType: 'workflowEventType',
171+
contextAttribute: 'contextAttribute',
172+
});
173+
};
174+
beforeEach(() => {
175+
jest.clearAllMocks();
176+
});
177+
178+
it('should return the an error when no orchestrator kafka config is implemented', async () => {
179+
let result;
180+
try {
181+
await runErrorTestAsCloudEventNoKafkaImplementation();
182+
} catch (error: any) {
183+
result = error;
184+
}
185+
186+
expect(result).toBeDefined();
187+
expect(result.message).toEqual(
188+
'No Orchestrator kafka implementation added',
189+
);
190+
});
191+
it('should return the contextAttributeId on successful send', async () => {
192+
const kafkaServiceOptionsMock: OrchestratorKafkaServiceOptions = {
193+
clientId: 'kafkaClientId',
194+
brokers: ['localhost:9091'],
195+
};
196+
const sonataFlowServiceWithKafka = new SonataFlowService(
197+
dataIndexServiceMock,
198+
loggerMock,
199+
kafkaServiceOptionsMock,
200+
);
201+
const spy = jest
202+
.spyOn(
203+
sonataFlowServiceWithKafka.getOrchestratorKafkaImpl() as any,
204+
'producer',
205+
)
206+
.mockImplementation(() => {
207+
return {
208+
connect: jest.fn(),
209+
send: jest.fn(),
210+
disconnect: jest.fn(),
211+
};
212+
});
213+
const result =
214+
await sonataFlowServiceWithKafka.executeWorkflowAsCloudEvent({
215+
definitionId,
216+
workflowSource: 'workflowSource',
217+
workflowEventType: 'workflowEventType',
218+
contextAttribute: 'lockid',
219+
});
220+
expect(spy).toHaveBeenCalled();
221+
expect(result).toBeDefined();
222+
expect(result?.id).toBeDefined();
223+
expect(result?.id).toEqual('12345');
224+
});
225+
226+
it('should error on a bad connection', async () => {
227+
const kafkaServiceOptionsMock: OrchestratorKafkaServiceOptions = {
228+
clientId: 'kafkaClientId',
229+
brokers: ['localhost:9091'],
230+
};
231+
const sonataFlowServiceWithKafka = new SonataFlowService(
232+
dataIndexServiceMock,
233+
loggerMock,
234+
kafkaServiceOptionsMock,
235+
);
236+
jest
237+
.spyOn(
238+
sonataFlowServiceWithKafka.getOrchestratorKafkaImpl() as any,
239+
'producer',
240+
)
241+
.mockImplementation(() => {
242+
return {
243+
connect: jest
244+
.fn()
245+
.mockRejectedValue(new Error('Wrong Connection Info')),
246+
send: jest.fn(),
247+
disconnect: jest.fn(),
248+
};
249+
});
250+
let result;
251+
try {
252+
result = await sonataFlowServiceWithKafka.executeWorkflowAsCloudEvent({
253+
definitionId,
254+
workflowSource: 'workflowSource',
255+
workflowEventType: 'workflowEventType',
256+
contextAttribute: 'lockid',
257+
});
258+
} catch (error: any) {
259+
result = error;
260+
}
261+
262+
expect(result).toBeDefined();
263+
expect(result.message).toEqual(
264+
'Error with Kafka client with connection Options: clientId: kafkaClientId and broker: ["localhost:9091"]',
265+
);
266+
});
267+
});
268+
158269
describe('executeWorkflow', () => {
159270
const inputData = { var1: 'value1' };
160271
const urlToFetch = 'http://example.com/workflows/workflow-123';

0 commit comments

Comments
 (0)