Skip to content

Commit 0a7e8ed

Browse files
authored
Provide topic identifier for system messages (#51)
* provide topic identifier in HandlesMessages * rabbitmq-updates * fold in topic identifiers * revert keyof
1 parent 615fa60 commit 0a7e8ed

19 files changed

Lines changed: 145 additions & 99 deletions

packages/bus-core/src/application-bootstrap/application-bootstrap.spec.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ describe('ApplicationBootstrap', () => {
6161
It.isAny(),
6262
It.isAny(),
6363
TestCommandHandler,
64-
TestCommand
64+
TestCommand,
65+
undefined
6566
),
6667
Times.once()
6768
)

packages/bus-core/src/application-bootstrap/application-bootstrap.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ export class ApplicationBootstrap {
6868
prototype.$resolver,
6969
prototype.$symbol,
7070
handler,
71-
prototype.$message
71+
prototype.$message,
72+
prototype.$topicIdentifier
7273
)
7374
}
7475
}

packages/bus-core/src/handler/handler-registry.spec.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { Logger } from '@node-ts/logger-core'
44
import { TestEvent, TestEventHandler, TestCommandHandler, TestCommand } from '../test'
55
import { Container, interfaces } from 'inversify'
66
import { Message } from '@node-ts/bus-messages'
7+
import * as faker from 'faker'
78

89
describe('HandlerRegistry', () => {
910
let sut: HandlerRegistry
@@ -32,8 +33,8 @@ describe('HandlerRegistry', () => {
3233
expect(handlers).toHaveLength(1)
3334
})
3435

35-
it('should add it to the subscribed bus messages list', () => {
36-
const subscription = sut.subscribedBusMessages.find(s => new s().$name === messageType.NAME)
36+
it('should add it to the subscribed message subscription list', () => {
37+
const subscription = sut.messageSubscriptions.find(s => new s.messageType!().$name === messageType.NAME)
3738
expect(subscription).toBeDefined()
3839
})
3940

@@ -73,13 +74,15 @@ describe('HandlerRegistry', () => {
7374
})
7475

7576
describe('when registering a handler for an external message', () => {
77+
const identifier = faker.random.uuid()
78+
7679
beforeEach(() => {
77-
sut.register((m: Message) => m.$name === messageName, symbol, handler)
80+
sut.register((m: Message) => m.$name === messageName, symbol, handler, undefined, identifier)
7881
})
7982

80-
it('should not include the message in the subscribed bus messages list', () => {
81-
const messageSubscription = sut.subscribedBusMessages.find(m => new m().$name === messageName)
82-
expect(messageSubscription).toBeUndefined()
83+
it('should include the message in the subscribed messages list', () => {
84+
const messageSubscription = sut.messageSubscriptions.find(m => m.topicIdentifier === identifier)
85+
expect(messageSubscription).toBeDefined()
8386
})
8487
})
8588

packages/bus-core/src/handler/handler-registry.ts

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,11 @@ export interface HandlerRegistration<TMessage extends MessageType> {
1212
resolveHandler (handlerContextContainer: Container): Handler<TMessage>
1313
}
1414

15-
interface HandlerBinding {
16-
symbol: symbol
17-
handler: HandlerType
18-
}
19-
20-
interface RegisteredHandlers {
21-
messageType: ClassConstructor<Message>
22-
handlers: HandlerBinding[]
23-
}
24-
25-
interface HandlerRegistrations {
26-
[key: string]: RegisteredHandlers
27-
}
28-
29-
type MessageName = string
30-
31-
interface HandlerResolver {
15+
export interface HandlerResolver {
3216
handler: HandlerType
3317
symbol: symbol
18+
topicIdentifier: string | undefined
19+
messageType: ClassConstructor<Message> | undefined
3420
resolver (message: unknown): boolean
3521
}
3622

@@ -42,9 +28,7 @@ interface HandlerResolver {
4228
export class HandlerRegistry {
4329

4430
private container: Container
45-
private unhandledMessages: MessageName[] = []
4631
private handlerResolvers: HandlerResolver[] = []
47-
private registeredBusMessages: ClassConstructor<Message>[] = []
4832

4933
constructor (
5034
@inject(LOGGER_SYMBOLS.Logger) private readonly logger: Logger
@@ -57,12 +41,15 @@ export class HandlerRegistry {
5741
* @param symbol A unique symbol to identify the binding of the message to the function
5842
* @param handler The function handler to dispatch messages to as they arrive
5943
* @param messageType The class type of message to handle
44+
* @param topicIdentifier Identifies the topic where the message is sourced from. This topic must exist
45+
* before being consumed as the library assumes it's managed externally
6046
*/
6147
register<TMessage extends MessageType = MessageType> (
6248
resolver: (message: TMessage) => boolean,
6349
symbol: symbol,
6450
handler: HandlerType,
65-
messageType?: ClassConstructor<Message>
51+
messageType?: ClassConstructor<Message>,
52+
topicIdentifier?: string
6653
): void {
6754

6855
const handlerName = getHandlerName(handler)
@@ -84,10 +71,8 @@ export class HandlerRegistry {
8471
}
8572
}
8673

87-
this.handlerResolvers.push({ resolver, symbol, handler })
88-
if (!!messageType) {
89-
this.registeredBusMessages.push(messageType)
90-
}
74+
this.handlerResolvers.push({ messageType, resolver, symbol, handler, topicIdentifier })
75+
9176
this.logger.info(
9277
'Handler registered',
9378
{ messageName: messageType ? messageType.name : undefined, handler: handlerName }
@@ -151,8 +136,8 @@ export class HandlerRegistry {
151136
/**
152137
* Retrieves a list of all messages that have handler registrations
153138
*/
154-
get subscribedBusMessages (): ClassConstructor<Message>[] {
155-
return this.registeredBusMessages
139+
get messageSubscriptions (): HandlerResolver[] {
140+
return this.handlerResolvers
156141
}
157142

158143
private bindHandlers (): void {

packages/bus-core/src/handler/handler.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ export interface HandlerPrototype<T extends MessageType> {
1616
*/
1717
$symbol: symbol
1818

19+
/**
20+
* An optional topic identifier where the source message is published to. This is used to
21+
* subscribe the application queue to non-application type messages like infrastructure or
22+
* third party messages.
23+
*
24+
* This field accepts identifiers of the transport used. eg. SQS transports will accept a
25+
* source SNS arn, whereas a RabbitMQ transport would use a topic name.
26+
*/
27+
$topicIdentifier: string | undefined
28+
1929
/**
2030
* The resolver to use that determines if a given message should be dispatched to the handler
2131
* it's attached to

packages/bus-core/src/handler/handles-message.ts

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,52 @@ import { ClassConstructor, isClassConstructor } from '../util'
99
*
1010
* The dispatcher will dispatch received messages to the `handle()` function of your class.
1111
*
12-
* @param messageConstructor The type of message that the function handles
13-
* @param customResolver A custom resolver to use to map messages to the handler. This can be used to handle
12+
* @param messageConstructor A custom resolver to use to map messages to the handler. This can be used to handle
1413
* messages that originate in a different system or that don't conform to the `Message` conventions.
1514
*/
1615
export function HandlesMessage<
1716
TMessage extends MessageType,
1817
THandler extends Handler<TMessage>,
1918
HandlerConstructor extends ClassConstructor<THandler>
2019
> (
21-
resolveWith: ClassConstructor<TMessage> | ((message: TMessage) => boolean)
20+
messageConstructor: ClassConstructor<TMessage>
21+
): (handlerConstructor: HandlerConstructor) => void
22+
23+
/**
24+
* Marks that the decorated class handles a particular message. When a message
25+
* matching the given type is received from the underlying transport it will be dispatched
26+
* to this function.
27+
*
28+
* The dispatcher will dispatch received messages to the `handle()` function of your class.
29+
*
30+
* @param resolveWith A custom resolver to use to map messages to the handler. This can be used to handle
31+
* messages that originate in a different system or that don't conform to the `Message` conventions.
32+
* @param topicIdentifier Identifies the topic where the message is sourced from. This topic must exist
33+
* before being consumed as the library assumes it's managed externally
34+
*/
35+
export function HandlesMessage<
36+
TMessage extends MessageType,
37+
THandler extends Handler<TMessage>,
38+
HandlerConstructor extends ClassConstructor<THandler>
39+
> (
40+
resolveWith: (message: TMessage) => boolean,
41+
topicIdentifier: string
42+
): (handlerConstructor: HandlerConstructor) => void
43+
44+
45+
46+
export function HandlesMessage<
47+
TMessage extends MessageType,
48+
THandler extends Handler<TMessage>,
49+
HandlerConstructor extends ClassConstructor<THandler>
50+
> (
51+
resolveWith: ClassConstructor<TMessage> | ((message: TMessage) => boolean),
52+
topicIdentifier?: string
2253
): (handlerConstructor: HandlerConstructor) => void {
2354
return (handlerConstructor: HandlerConstructor) => {
2455
const prototype = handlerConstructor.prototype as HandlerPrototype<TMessage>
2556
prototype.$symbol = Symbol.for(`node-ts/bus-core/handles-message/${handlerConstructor.name}`)
57+
prototype.$topicIdentifier = topicIdentifier
2658

2759
const isBusMessage = isClassConstructor(resolveWith)
2860

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
export * from './handler-registry'
22
export * from './handles-message'
3-
export { Handler } from './handler'
3+
export { Handler, MessageType } from './handler'

packages/bus-core/src/test/test-resolver-handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export class SystemEvent {
1313
}
1414
}
1515

16-
@HandlesMessage((e: SystemEvent) => e.type === eventType)
16+
@HandlesMessage((e: SystemEvent) => e.type === eventType, '')
1717
export class TestResolverHandler {
1818
constructor (
1919
@inject(MESSAGE_LOGGER) private readonly messageLogger: MessageLogger

packages/bus-core/src/transport/memory-queue.spec.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { TestCommand, TestEvent, TestCommand2, TestSystemMessage } from '../test
33
import { TransportMessage } from '../transport'
44
import { Mock } from 'typemoq'
55
import { Logger } from '@node-ts/logger-core'
6-
import { HandlerRegistry } from '../handler'
6+
import { HandlerRegistry, HandlerResolver } from '../handler'
77
import { MessageAttributes } from '@node-ts/bus-messages'
88
import * as faker from 'faker'
99

@@ -13,7 +13,7 @@ const command2 = new TestCommand2()
1313

1414
describe('MemoryQueue', () => {
1515
let sut: MemoryQueue
16-
const handledMessageNames = [TestCommand, TestEvent]
16+
const handledMessages = [TestCommand, TestEvent]
1717
const messageOptions = new MessageAttributes({
1818
correlationId: faker.random.uuid()
1919
})
@@ -25,8 +25,8 @@ describe('MemoryQueue', () => {
2525

2626
const handlerRegistry = Mock.ofType<HandlerRegistry>()
2727
handlerRegistry
28-
.setup(h => h.subscribedBusMessages)
29-
.returns(() => handledMessageNames)
28+
.setup(h => h.messageSubscriptions)
29+
.returns(() => handledMessages.map(h => ({ messageType: h }) as {} as HandlerResolver))
3030

3131
await sut.initialize(handlerRegistry.object)
3232
})

packages/bus-core/src/transport/memory-queue.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ export class MemoryQueue implements Transport<InMemoryMessage> {
4646

4747
async initialize (handlerRegistry: HandlerRegistry): Promise<void> {
4848
this.messagesWithHandlers = {}
49-
handlerRegistry.subscribedBusMessages
49+
handlerRegistry.messageSubscriptions
50+
.filter(subscription => !!subscription.messageType)
51+
.map(subscription => subscription.messageType!)
5052
.map(ctor => new ctor().$name)
5153
.forEach(messageName => this.messagesWithHandlers[messageName] = {})
5254
}

0 commit comments

Comments
 (0)