Skip to content

Commit 08ed5d4

Browse files
move subscription methods up a level
1 parent 5aff671 commit 08ed5d4

7 files changed

Lines changed: 511 additions & 303 deletions

File tree

lib/common/util.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ util.arrayize = arrayize;
124124
*/
125125
function format(template, args) {
126126
return template.replace(/{([^}]*)}/g, function(match, key) {
127-
return args[key] || match;
127+
return key in args ? args[key] : match;
128128
});
129129
}
130130

lib/pubsub/index.js

Lines changed: 163 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,123 @@ PubSub.prototype.createTopic = function(name, callback) {
162162
});
163163
};
164164

165+
/**
166+
* Create a subscription to a topic. You may optionally provide an object to
167+
* customize the subscription.
168+
*
169+
* Your provided callback will either be invoked with an error object, if an API
170+
* error occurred, or a {@linkcode module:pubsub/subscription} object.
171+
*
172+
* @throws {Error} If a Topic instance or topic name is not provided.
173+
* @throws {Error} If a subName is not provided.
174+
*
175+
* @param {module:pubsub/topic|string} - topic - The Topic to create a
176+
* subscription to.
177+
* @param {string} subName - The name of the subscription.
178+
* @param {object=} options - Configuration object.
179+
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
180+
* a message that you must ack a message before it is redelivered.
181+
* @param {boolean} options.autoAck - Automatically acknowledge the message once
182+
* it's pulled. (default: false)
183+
* @param {number} options.interval - Interval in milliseconds to check for new
184+
* messages. (default: 10)
185+
* @param {boolean} options.reuseExisting - If the subscription already exists,
186+
* reuse it. The options of the existing subscription are not changed. If
187+
* false, attempting to create a subscription that already exists will fail.
188+
* (default: false)
189+
* @param {function} callback - The callback function.
190+
*
191+
* @example
192+
* // Without specifying any options.
193+
* var topic = pubsub.topic('messageCenter');
194+
* var name = 'newMessages';
195+
*
196+
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
197+
*
198+
* // With options.
199+
* pubsub.subscribe(topic, name, {
200+
* ackDeadlineSeconds: 90,
201+
* autoAck: true,
202+
* interval: 30
203+
* }, function(err, subscription, apiResponse) {});
204+
*/
205+
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
206+
if (!util.is(topic, 'string') && !(topic instanceof Topic)) {
207+
throw new Error('A Topic is required for a new subscription.');
208+
}
209+
210+
if (!subName || !util.is(subName, 'string')) {
211+
throw new Error('A subscription name is required for a new subscription.');
212+
}
213+
214+
if (!callback) {
215+
callback = options;
216+
options = {};
217+
}
218+
219+
options = options || {};
220+
221+
if (util.is(topic, 'string')) {
222+
topic = this.topic(topic);
223+
}
224+
225+
var body = {
226+
topic: topic.name
227+
};
228+
229+
if (options.ackDeadlineSeconds) {
230+
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
231+
}
232+
233+
var subscription = this.subscription(subName, options);
234+
235+
this.makeReq_('PUT', subscription.name, null, body, function(err, result) {
236+
if (err && !(err.code === 409 && options.reuseExisting)) {
237+
callback(err, null, result);
238+
return;
239+
}
240+
241+
callback(null, subscription, result);
242+
});
243+
};
244+
245+
/**
246+
* Create a Subscription object in reference to an existing subscription. This
247+
* command by itself will not run any API requests. You will receive a
248+
* {@linkcode module:pubsub/subscription} object, which will allow you to
249+
* interact with your subscription.
250+
*
251+
* @throws {Error} If a name is not provided.
252+
*
253+
* @param {string} name - The name of the subscription.
254+
* @param {object=} options - Configuration object.
255+
* @param {boolean} options.autoAck - Automatically acknowledge the message once
256+
* it's pulled.
257+
* @param {number} options.interval - Interval in milliseconds to check for new
258+
* messages.
259+
* @return {module:pubsub/subscription}
260+
*
261+
* @example
262+
* var subscription = pubsub.subscription('my-existing-subscription');
263+
*
264+
* // Register a listener for `message` events.
265+
* subscription.on('message', function(message) {
266+
* // Called every time a message is received.
267+
* // message.id = ID used to acknowledge its receival.
268+
* // message.data = Contents of the message.
269+
* // message.attributes = Attributes of the message.
270+
* });
271+
*/
272+
PubSub.prototype.subscription = function(name, options) {
273+
if (!name) {
274+
throw new Error('The name of a subscription is required.');
275+
}
276+
277+
options = options || {};
278+
options.name = name;
279+
return new Subscription(this, options);
280+
};
281+
165282
/**
166283
* Create a Topic object to reference an existing topic.
167284
*
@@ -194,9 +311,11 @@ PubSub.prototype.topic = function(name, options) {
194311
* You may optionally provide a query object as the first argument to customize
195312
* the response.
196313
*
197-
* @param {object=} query - Query object.
198-
* @param {string=} query.pageToken - Page token.
199-
* @param {number=} query.pageSize - Maximum number of results to return.
314+
* @param {object=} options - Configuration object.
315+
* @param {string} options.topic - The name of the topic to list subscriptions
316+
* from.
317+
* @param {number} options.pageSize - Maximum number of results to return.
318+
* @param {string} options.pageToken - Page token.
200319
* @param {function} callback - The callback function.
201320
*
202321
* @example
@@ -205,38 +324,69 @@ PubSub.prototype.topic = function(name, options) {
205324
* // so, run `pubsub.getSubscriptions(nextQuery, callback);`.
206325
* };
207326
*
208-
* // Get all subscriptions.
327+
* // Get all subscriptions for this project.
209328
* pubsub.getSubscriptions(callback);
210329
*
211330
* // Customize the query.
212331
* pubsub.getSubscriptions({
213-
* pageSize: 10
332+
* pageSize: 3
214333
* }, callback);
215334
*/
216-
PubSub.prototype.getSubscriptions = function(query, callback) {
335+
PubSub.prototype.getSubscriptions = function(options, callback) {
217336
var self = this;
337+
218338
if (!callback) {
219-
callback = query;
220-
query = {};
339+
callback = options;
340+
options = {};
221341
}
222342

223-
var path = this.projectName + '/subscriptions';
224-
this.makeReq_('GET', path, query, true, function(err, result) {
343+
options = options || {};
344+
345+
var topicName;
346+
347+
if (util.is(options.topic, 'string')) {
348+
topicName = options.topic;
349+
} else if (options.topic instanceof Topic) {
350+
topicName = options.topic.unformattedName;
351+
}
352+
353+
var query = {};
354+
355+
if (options.pageSize) {
356+
query.pageSize = options.pageSize;
357+
}
358+
359+
if (options.pageToken) {
360+
query.pageToken = options.pageToken;
361+
}
362+
363+
var apiPath = util.format('{projectPath}{topicPath}/subscriptions', {
364+
projectPath: 'projects/' + this.projectId,
365+
topicPath: topicName ? '/topics/' + topicName : ''
366+
});
367+
368+
this.makeReq_('GET', apiPath, query, null, function(err, result) {
225369
if (err) {
226370
callback(err, null, null, result);
227371
return;
228372
}
229373

230-
var subscriptions = (result.subscriptions || []).map(function(item) {
374+
var subscriptions = (result.subscriptions || []).map(function(sub) {
231375
return new Subscription(self, {
232-
name: item.name
376+
// Depending on if we're using a subscriptions.list or
377+
// topics.subscriptions.list API endpoint, we will get back a
378+
// Subscription resource or just the name of the subscription.
379+
name: sub.name || sub
233380
});
234381
});
382+
235383
var nextQuery = null;
384+
236385
if (result.nextPageToken) {
237-
nextQuery = query;
386+
nextQuery = options;
238387
nextQuery.pageToken = result.nextPageToken;
239388
}
389+
240390
callback(null, subscriptions, nextQuery, result);
241391
});
242392
};

lib/pubsub/subscription.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ function Subscription(pubsub, options) {
117117
events.EventEmitter.call(this);
118118

119119
this.name = Subscription.formatName_(pubsub.projectId, options.name);
120+
120121
this.makeReq_ = pubsub.makeReq_.bind(pubsub);
121122

122123
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;

0 commit comments

Comments
 (0)