@@ -8,7 +8,11 @@ import {
88} from 'n8n-workflow' ;
99import { NatsConnection , jetstream , jetstreamManager } from '../../bundled/nats-bundled' ;
1010import { createNatsConnection , closeNatsConnection } from '../../utils/NatsConnection' ;
11- import { parseNatsMessage , validateStreamName , validateConsumerName } from '../../utils/NatsHelpers' ;
11+ import {
12+ parseNatsMessage ,
13+ validateStreamName ,
14+ validateConsumerName ,
15+ } from '../../utils/NatsHelpers' ;
1216import { NodeLogger } from '../../utils/NodeLogger' ;
1317
1418export class NatsJetstreamTrigger implements INodeType {
@@ -72,7 +76,8 @@ export class NatsJetstreamTrigger implements INodeType {
7276 name : 'maxBytes' ,
7377 type : 'number' ,
7478 default : 0 ,
75- description : 'Maximum bytes to fetch in each pull. If set, takes priority over Max Messages.' ,
79+ description :
80+ 'Maximum bytes to fetch in each pull. If set, takes priority over Max Messages.' ,
7681 hint : 'Set to 0 to use Max Messages instead. These options are mutually exclusive' ,
7782 } ,
7883 {
@@ -135,18 +140,18 @@ export class NatsJetstreamTrigger implements INodeType {
135140 sku : 'WIDGET-001' ,
136141 name : 'Premium Widget' ,
137142 quantity : 2 ,
138- price : 49.99
139- }
143+ price : 49.99 ,
144+ } ,
140145 ] ,
141146 shipping : {
142147 address : '123 Main St, Anytown, NY 12345' ,
143148 method : 'standard' ,
144- cost : 9.99
149+ cost : 9.99 ,
145150 } ,
146151 payment : {
147152 method : 'credit_card' ,
148153 last4 : '4242' ,
149- status : 'authorized'
154+ status : 'authorized' ,
150155 } ,
151156 status : 'confirmed' ,
152157 timestamp : Date . now ( ) ,
@@ -156,7 +161,7 @@ export class NatsJetstreamTrigger implements INodeType {
156161 'X-Order-Source' : 'web-app' ,
157162 'X-User-Agent' : 'Mozilla/5.0 (example)' ,
158163 'X-Request-ID' : 'req-' + Math . random ( ) . toString ( 36 ) . substr ( 2 , 12 ) ,
159- 'X-Correlation-ID' : 'corr-' + Math . random ( ) . toString ( 36 ) . substr ( 2 , 12 )
164+ 'X-Correlation-ID' : 'corr-' + Math . random ( ) . toString ( 36 ) . substr ( 2 , 12 ) ,
160165 } ,
161166 // JetStream specific metadata
162167 seq : Math . floor ( Math . random ( ) * 10000 ) + 1000 ,
@@ -198,7 +203,9 @@ export class NatsJetstreamTrigger implements INodeType {
198203 await jsm . streams . info ( streamName ) ;
199204 } catch ( error : any ) {
200205 if ( error . code === 'STREAM_NOT_FOUND' ) {
201- throw new ApplicationError ( `JetStream stream '${ streamName } ' not found. Please create the stream first.` ) ;
206+ throw new ApplicationError (
207+ `JetStream stream '${ streamName } ' not found. Please create the stream first.` ,
208+ ) ;
202209 }
203210 throw error ;
204211 }
@@ -208,7 +215,9 @@ export class NatsJetstreamTrigger implements INodeType {
208215 consumer = await js . consumers . get ( streamName , consumerName ) ;
209216 } catch ( error : any ) {
210217 if ( error . code === 'CONSUMER_NOT_FOUND' ) {
211- throw new ApplicationError ( `JetStream consumer '${ consumerName } ' not found in stream '${ streamName } '. Please create the consumer first.` ) ;
218+ throw new ApplicationError (
219+ `JetStream consumer '${ consumerName } ' not found in stream '${ streamName } '. Please create the consumer first.` ,
220+ ) ;
212221 }
213222 throw error ;
214223 }
@@ -236,7 +245,7 @@ export class NatsJetstreamTrigger implements INodeType {
236245 for await ( const msg of messageIterator ) {
237246 try {
238247 const parsedMessage = parseNatsMessage ( msg ) ;
239-
248+
240249 // Add JetStream-specific metadata
241250 const jetstreamMessage = {
242251 ...parsedMessage ,
@@ -252,9 +261,9 @@ export class NatsJetstreamTrigger implements INodeType {
252261 // Acknowledge the message
253262 msg . ack ( ) ;
254263 } catch ( messageError : any ) {
255- nodeLogger . error ( 'Error processing JetStream message:' , {
264+ nodeLogger . error ( 'Error processing JetStream message:' , {
256265 error : messageError ,
257- seq : msg . seq
266+ seq : msg . seq ,
258267 } ) ;
259268 // NAK the message for redelivery
260269 msg . nak ( ) ;
@@ -273,4 +282,4 @@ export class NatsJetstreamTrigger implements INodeType {
273282 throw new ApplicationError ( `Failed to setup JetStream consumer: ${ error . message } ` ) ;
274283 }
275284 }
276- }
285+ }
0 commit comments