@@ -3,39 +3,61 @@ import { MemoryQueue } from '../transport'
33import { BusState } from './bus'
44import { TestEvent } from '../test/test-event'
55import { sleep } from '../util'
6- import { Container } from 'inversify'
6+ import { Container , inject } from 'inversify'
77import { TestContainer } from '../test/test-container'
88import { BUS_SYMBOLS } from '../bus-symbols'
99import { Logger } from '@node-ts/logger-core'
10- import { Mock } from 'typemoq'
11- import { HandlerRegistry } from '../handler'
10+ import { Mock , IMock , Times } from 'typemoq'
11+ import { HandlerRegistry , HandlesMessage } from '../handler'
12+ import { ApplicationBootstrap } from '../application-bootstrap'
1213
1314const event = new TestEvent ( )
15+ type Callback = ( ) => void
16+ const CALLBACK = Symbol . for ( 'Callback' )
17+
18+ @HandlesMessage ( TestEvent )
19+ class TestEventHandler {
20+ constructor (
21+ @inject ( CALLBACK ) private readonly callback : Callback
22+ ) {
23+ }
24+
25+ async handle ( _ : TestEvent ) : Promise < void > {
26+ this . callback ( )
27+ }
28+ }
1429
1530describe ( 'ServiceBus' , ( ) => {
1631 let container : Container
1732
1833 let sut : ServiceBus
34+ let bootstrapper : ApplicationBootstrap
1935 let queue : MemoryQueue
2036
37+ let callback : IMock < Callback >
38+
2139 beforeAll ( async ( ) => {
2240 container = new TestContainer ( ) . silenceLogs ( )
2341 queue = new MemoryQueue ( Mock . ofType < Logger > ( ) . object )
42+
2443 const transport = container . get < MemoryQueue > ( BUS_SYMBOLS . Transport )
2544 const registry = container . get < HandlerRegistry > ( BUS_SYMBOLS . HandlerRegistry )
45+
46+ bootstrapper = container . get < ApplicationBootstrap > ( BUS_SYMBOLS . ApplicationBootstrap )
47+ bootstrapper . registerHandler ( TestEventHandler )
48+
49+ callback = Mock . ofType < Callback > ( )
50+ container . bind ( CALLBACK ) . toConstantValue ( callback . object )
2651 await transport . initialize ( registry )
52+ await bootstrapper . initialize ( container )
2753 sut = container . get ( BUS_SYMBOLS . Bus )
2854 } )
2955
30- describe ( 'when starting the service bus' , ( ) => {
31- beforeAll ( async ( ) => {
32- await sut . start ( )
33- } )
34-
35- afterAll ( async ( ) => {
36- await sut . stop ( )
37- } )
56+ afterAll ( async ( ) => {
57+ await bootstrapper . dispose ( )
58+ } )
3859
60+ describe ( 'when starting the service bus' , ( ) => {
3961 it ( 'should complete into a started state' , ( ) => {
4062 expect ( sut . state ) . toEqual ( BusState . Started )
4163 } )
@@ -45,16 +67,42 @@ describe('ServiceBus', () => {
4567 await expect ( sut . start ( ) ) . rejects . toThrowError ( )
4668 } )
4769 } )
70+ } )
4871
49- describe ( 'and a message is received on the queue' , ( ) => {
50- beforeAll ( async ( ) => {
51- await sut . publish ( event )
52- await sleep ( 1 )
53- } )
5472
55- it ( 'should delete the message from the queue' , ( ) => {
56- expect ( queue . depth ) . toEqual ( 0 )
57- } )
73+ describe ( 'when a message is successfully handled from the queue' , ( ) => {
74+ it ( 'should delete the message from the queue' , async ( ) => {
75+ callback . reset ( )
76+ callback
77+ . setup ( c => c ( ) )
78+ . callback ( ( ) => undefined )
79+ . verifiable ( Times . once ( ) )
80+ await sut . publish ( event )
81+ await sleep ( 10 )
82+
83+ expect ( queue . depth ) . toEqual ( 0 )
84+ callback . verifyAll ( )
85+ } )
86+ } )
87+
88+ describe ( 'and a handled message throw an Error' , ( ) => {
89+
90+ it ( 'should return the message for retry' , async ( ) => {
91+ callback . reset ( )
92+ let callCount = 0
93+ callback
94+ . setup ( c => c ( ) )
95+ . callback ( ( ) => {
96+ if ( callCount ++ === 0 ) {
97+ throw new Error ( )
98+ }
99+ } )
100+ . verifiable ( Times . exactly ( 2 ) )
101+
102+ await sut . publish ( event )
103+ await sleep ( 2000 )
104+
105+ callback . verifyAll ( )
58106 } )
59107 } )
60108} )
0 commit comments