Skip to content

Commit 669f365

Browse files
anguillanneuffhinkel
authored andcommitted
Add Pub/Sub ack deadline example (#315)
Add Pub/Sub ack deadline example with 1 worker.
1 parent d2bd0e5 commit 669f365

1 file changed

Lines changed: 56 additions & 65 deletions

File tree

handwritten/pubsub/samples/subscriptions.js

Lines changed: 56 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -340,94 +340,85 @@ function synchronousPull(projectName, subscriptionName) {
340340
);
341341
// The maximum number of messages returned for this request.
342342
// Pub/Sub may return fewer than the number specified.
343-
const maxMessages = 3;
344-
const ackDeadlineSeconds = 30;
343+
const maxMessages = 1;
344+
const newAckDeadlineSeconds = 30;
345345
const request = {
346346
subscription: formattedSubscription,
347347
maxMessages: maxMessages,
348348
};
349-
// `messages` is a dict that stores message ack ids as keys, and message
350-
// data and the processing states (true if done, false if not) as values.
351-
const messages = {};
352349

353-
// The worker function takes a message and starts a long-running process.
350+
let isProcessed = false;
351+
352+
// The worker function is meant to be non-blocking. It starts a long-
353+
// running process, such as writing the message to a table, which may
354+
// take longer than the default 10-sec acknowledge deadline.
354355
function worker(message) {
355-
const target = Math.floor(Math.random() * 1e5);
356-
console.log(`Processing "${message.message.data}" for ${target / 1e3}s...`);
356+
console.log(`Processing "${message.message.data}"...`);
357357

358358
setTimeout(() => {
359359
console.log(`Finished procesing "${message.message.data}".`);
360-
// After the message has been processed, set its processing state to true.
361-
messages[message.ackId][1] = true;
362-
}, target);
360+
isProcessed = true;
361+
}, 30000);
363362
}
364363

365-
// The subscriber pulls a specific number of messages.
364+
// The subscriber pulls a specified number of messages.
366365
client
367366
.pull(request)
368367
.then(responses => {
369368
// The first element of `responses` is a PullResponse object.
370369
const response = responses[0];
370+
// Obtain the first message.
371+
const message = response.receivedMessages[0];
371372

372-
// Initialize `messages` with message ackId, message data and `false` as
373-
// processing state. Then, start each message in a worker function.
374-
response.receivedMessages.forEach(message => {
375-
messages[message.ackId] = [message.message.data, false];
376-
worker(message);
377-
});
378-
379-
let numProcessed = 0;
373+
// Send the message to the worker function.
374+
worker(message);
380375

381-
// setInterval() gets run every 10s.
376+
// setInterval() checks the worker process every 5 sec.
377+
// If the pre-set ack deadline is n sec, it is best to
378+
// set the interval to be every (n/2) sec.
382379
const interval = setInterval(function() {
383-
// Every 10s, we do a check on the processing states of the messages.
384-
Object.keys(messages).forEach(ackId => {
385-
if (messages[ackId][1]) {
386-
// If the processing state for a particular message is true,
387-
// We will ack the message.
388-
const ackRequest = {
389-
subscription: formattedSubscription,
390-
ackIds: [ackId],
391-
};
392-
393-
client.acknowledge(ackRequest).catch(err => {
380+
// If the message has been processed..
381+
if (isProcessed) {
382+
const ackRequest = {
383+
subscription: formattedSubscription,
384+
ackIds: [message.ackId],
385+
};
386+
387+
//..acknowledges the message.
388+
client
389+
.acknowledge(ackRequest)
390+
.then(() => {
391+
console.log(`Acknowledged: "${message.message.data}".`);
392+
// Exit after the message is acknowledged.
393+
clearInterval(interval);
394+
console.log(`Done.`);
395+
})
396+
.catch(err => {
394397
console.error(err);
395398
});
396-
397-
console.log(`Acknowledged: "${messages[ackId][0]}".`);
398-
399-
// Increment numProcessed by 1.
400-
numProcessed += 1;
401-
402-
// Remove this message from `messages`.
403-
delete messages[ackId];
404-
} else {
405-
// If the processing state of a particular message remains false,
406-
// we will modify its ack deadline.
407-
const modifyAckRequest = {
408-
subscription: formattedSubscription,
409-
ackIds: [ackId],
410-
ackDeadlineSeconds: ackDeadlineSeconds,
411-
};
412-
413-
client.modifyAckDeadline(modifyAckRequest).catch(err => {
399+
} else {
400+
// If the message is not yet processed..
401+
const modifyAckRequest = {
402+
subscription: formattedSubscription,
403+
ackIds: [message.ackId],
404+
ackDeadlineSeconds: newAckDeadlineSeconds,
405+
};
406+
407+
//..reset its ack deadline.
408+
client
409+
.modifyAckDeadline(modifyAckRequest)
410+
.then(() => {
411+
console.log(
412+
`Reset ack deadline for "${
413+
message.message.data
414+
}" for ${newAckDeadlineSeconds}s.`
415+
);
416+
})
417+
.catch(err => {
414418
console.error(err);
415419
});
416-
417-
console.log(
418-
`Reset ack deadline for "${
419-
messages[ackId][0]
420-
}" for ${ackDeadlineSeconds}s.`
421-
);
422-
}
423-
424-
// If all messages have been processed, we clear out of the interval.
425-
if (numProcessed === response.receivedMessages.length) {
426-
clearInterval(interval);
427-
console.log(`Done.`);
428-
}
429-
});
430-
}, 10000);
420+
}
421+
}, 5000);
431422
})
432423
.catch(err => {
433424
console.error(err);

0 commit comments

Comments
 (0)