Skip to content

Commit ff52f3c

Browse files
mgabeler-lee-6rsJustinBeckwith
authored andcommitted
feat: Add optional delay when calling nack() (#255) (#256)
1 parent e47af42 commit ff52f3c

4 files changed

Lines changed: 92 additions & 13 deletions

File tree

handwritten/pubsub/src/connection-pool.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,8 @@ export class ConnectionPool extends EventEmitter {
301301
ack: () => {
302302
this.subscription.ack_(message);
303303
},
304-
nack: () => {
305-
this.subscription.nack_(message);
304+
nack: (delay?: number) => {
305+
this.subscription.nack_(message, delay);
306306
}
307307
}
308308
return message;

handwritten/pubsub/src/subscriber.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -237,24 +237,41 @@ export class Subscriber extends EventEmitter {
237237
}
238238
const acks = this.inventory_.ack;
239239
const nacks = this.inventory_.nack;
240+
240241
if (!acks.length && !nacks.length) {
241242
return Promise.resolve();
242243
}
244+
243245
const requests: Promise<void>[] = [];
246+
244247
if (acks.length) {
245248
requests.push(
246249
this.acknowledge_(acks).then(() => {
247250
this.inventory_.ack = [];
248251
})
249252
);
250253
}
254+
251255
if (nacks.length) {
252-
requests.push(
253-
this.modifyAckDeadline_(nacks, 0).then(() => {
254-
this.inventory_.nack = [];
255-
})
256-
);
256+
const modAcks = nacks.reduce((table, [ackId, deadline]) => {
257+
if (!table[deadline]) {
258+
table[deadline] = [];
259+
}
260+
261+
table[deadline].push(ackId);
262+
return table;
263+
}, {});
264+
265+
const modAckRequests = Object.keys(modAcks).map(deadline =>
266+
this.modifyAckDeadline_(modAcks[deadline], Number(deadline)));
267+
268+
requests.push.apply(requests, modAckRequests);
269+
270+
Promise.all(modAckRequests).then(() => {
271+
this.inventory_.nack = [];
272+
});
257273
}
274+
258275
return Promise.all(requests);
259276
}
260277
/*!
@@ -371,16 +388,19 @@ export class Subscriber extends EventEmitter {
371388
* @private
372389
*
373390
* @param {object} message - The message object.
391+
* @param {number} [delay=0] - Number of seconds before the message may be redelivered
374392
*/
375-
nack_(message) {
393+
nack_(message, delay = 0) {
376394
const breakLease = this.breakLease_.bind(this, message);
395+
377396
if (this.isConnected_()) {
378-
this.modifyAckDeadline_(message.ackId, 0, message.connectionId).then(
397+
this.modifyAckDeadline_(message.ackId, delay, message.connectionId).then(
379398
breakLease
380399
);
381400
return;
382401
}
383-
this.inventory_.nack.push(message.ackId);
402+
403+
this.inventory_.nack.push([message.ackId, delay]);
384404
this.setFlushTimeout_().then(breakLease);
385405
}
386406
/*!

handwritten/pubsub/test/connection-pool.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,18 @@ describe('ConnectionPool', function() {
867867

868868
message.nack();
869869
});
870+
871+
it('should create a nack method accepting a delay argument', function(done) {
872+
const delay = Math.random();
873+
874+
SUBSCRIPTION.nack_ = function(message_, delay_) {
875+
assert.strictEqual(message_, message);
876+
assert.strictEqual(delay_, delay);
877+
done();
878+
};
879+
880+
message.nack(delay);
881+
});
870882
});
871883

872884
describe('getAndEmitChannelState', function() {

handwritten/pubsub/test/subscriber.ts

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -650,10 +650,12 @@ describe('Subscriber', function() {
650650
});
651651

652652
it('should send any pending nacks', function() {
653-
const fakeAckIds = (subscriber.inventory_.nack = ['ghi', 'jkl']);
653+
const fakeAckIds = ['ghi', 'jkl'];
654+
655+
subscriber.inventory_.nack = fakeAckIds.map(ackId => [ackId, 0]);
654656

655657
subscriber.modifyAckDeadline_ = function(ackIds, deadline) {
656-
assert.strictEqual(ackIds, fakeAckIds);
658+
assert.deepStrictEqual(ackIds, fakeAckIds);
657659
assert.strictEqual(deadline, 0);
658660
return Promise.resolve();
659661
};
@@ -662,6 +664,22 @@ describe('Subscriber', function() {
662664
assert.strictEqual(subscriber.inventory_.nack.length, 0);
663665
});
664666
});
667+
668+
it('should send any pending delayed nacks', function() {
669+
const fakeAckIds = ['ghi', 'jkl'];
670+
671+
subscriber.inventory_.nack = fakeAckIds.map(ackId => [ackId, 1]);
672+
673+
subscriber.modifyAckDeadline_ = function(ackIds, deadline) {
674+
assert.deepStrictEqual(ackIds, fakeAckIds);
675+
assert.strictEqual(deadline, 1);
676+
return Promise.resolve();
677+
};
678+
679+
return subscriber.flushQueues_().then(function() {
680+
assert.strictEqual(subscriber.inventory_.nack.length, 0);
681+
});
682+
});
665683
});
666684

667685
describe('isConnected_', function() {
@@ -1045,6 +1063,18 @@ describe('Subscriber', function() {
10451063

10461064
subscriber.nack_(MESSAGE);
10471065
});
1066+
1067+
it('should use the delay if passed', function(done) {
1068+
subscriber.modifyAckDeadline_ = function(ackId, deadline, connId) {
1069+
assert.strictEqual(ackId, MESSAGE.ackId);
1070+
assert.strictEqual(deadline, 1);
1071+
assert.strictEqual(connId, MESSAGE.connectionId);
1072+
setImmediate(done);
1073+
return Promise.resolve();
1074+
};
1075+
1076+
subscriber.nack_(MESSAGE, 1);
1077+
});
10481078
});
10491079

10501080
describe('without connection', function() {
@@ -1056,7 +1086,10 @@ describe('Subscriber', function() {
10561086

10571087
it('should queue the message to be nacked if no conn', function(done) {
10581088
subscriber.setFlushTimeout_ = function() {
1059-
assert(subscriber.inventory_.nack.indexOf(MESSAGE.ackId) > -1);
1089+
assert.deepStrictEqual(
1090+
subscriber.inventory_.nack,
1091+
[[MESSAGE.ackId, 0]]
1092+
);
10601093
setImmediate(done);
10611094
return Promise.resolve();
10621095
};
@@ -1072,6 +1105,20 @@ describe('Subscriber', function() {
10721105

10731106
subscriber.nack_(MESSAGE);
10741107
});
1108+
1109+
it('should use the delay if passed when queueing', function(done) {
1110+
subscriber.setFlushTimeout_ = function() {
1111+
assert(
1112+
subscriber.inventory_.nack.findIndex(element => {
1113+
return element[0] === MESSAGE.ackId && element[1] === 1;
1114+
}) > -1
1115+
);
1116+
setImmediate(done);
1117+
return Promise.resolve();
1118+
};
1119+
1120+
subscriber.nack_(MESSAGE, 1);
1121+
});
10751122
});
10761123
});
10771124

0 commit comments

Comments
 (0)