Skip to content

Commit 3c59635

Browse files
move subscription methods up a level
1 parent b157b41 commit 3c59635

7 files changed

Lines changed: 537 additions & 309 deletions

File tree

lib/common/util.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ util.arrayize = arrayize;
124124
*/
125125
function format(template, args) {
126126
return template.replace(/{([^}]*)}/g, function(match, key) {
127-
return args[key] || match;
127+
return key in args ? args[key] : match;
128128
});
129129
}
130130

lib/pubsub/index.js

Lines changed: 184 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,135 @@ PubSub.prototype.createTopic = function(name, callback) {
162162
});
163163
};
164164

165+
/**
166+
* Create a subscription to a topic. You may optionally provide an object to
167+
* customize the subscription.
168+
*
169+
* Your provided callback will be invoked with an error object if an API error
170+
* occurred or a {@linkcode module:pubsub/subscription} object.
171+
*
172+
* @throws {Error} If a Topic instance or topic name is not provided.
173+
* @throws {Error} If a subName is not provided.
174+
*
175+
* @param {module:pubsub/topic|string} - topic - The Topic to create a
176+
* subscription to.
177+
* @param {string} subName - The name of the subscription.
178+
* @param {object=} options - Configuration object.
179+
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
180+
* a message that you must ack a message before it is redelivered.
181+
* @param {boolean} options.autoAck - Automatically acknowledge the message once
182+
* it's pulled. (default: false)
183+
* @param {number} options.interval - Interval in milliseconds to check for new
184+
* messages. (default: 10)
185+
* @param {boolean} options.reuseExisting - If the subscription already exists,
186+
* reuse it. The options of the existing subscription are not changed. If
187+
* false, attempting to create a subscription that already exists will fail.
188+
* (default: false)
189+
* @param {function} callback - The callback function.
190+
*
191+
* @example
192+
* var topic = pubsub.topic('messageCenter');
193+
* var name = 'newMessages';
194+
*
195+
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
196+
*
197+
* //-
198+
* // Customize the subscription.
199+
* //-
200+
* pubsub.subscribe(topic, name, {
201+
* ackDeadlineSeconds: 90,
202+
* autoAck: true,
203+
* interval: 30
204+
* }, function(err, subscription, apiResponse) {});
205+
*
206+
* //-
207+
* // Create a subscription to a topic from another project.
208+
* //-
209+
* var anotherProject = gcloud.pubsub({
210+
* projectId: 'another-project'
211+
* });
212+
*
213+
* var topic = anotherProject.topic('messageCenter');
214+
*
215+
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
216+
*/
217+
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
218+
if (!util.is(topic, 'string') && !(topic instanceof Topic)) {
219+
throw new Error('A Topic is required for a new subscription.');
220+
}
221+
222+
if (!subName || !util.is(subName, 'string')) {
223+
throw new Error('A subscription name is required for a new subscription.');
224+
}
225+
226+
if (!callback) {
227+
callback = options;
228+
options = {};
229+
}
230+
231+
options = options || {};
232+
233+
if (util.is(topic, 'string')) {
234+
topic = this.topic(topic);
235+
}
236+
237+
var body = {
238+
topic: topic.name
239+
};
240+
241+
if (options.ackDeadlineSeconds) {
242+
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
243+
}
244+
245+
var subscription = this.subscription(subName, options);
246+
247+
this.makeReq_('PUT', subscription.name, null, body, function(err, result) {
248+
if (err && !(err.code === 409 && options.reuseExisting)) {
249+
callback(err, null, result);
250+
return;
251+
}
252+
253+
callback(null, subscription, result);
254+
});
255+
};
256+
257+
/**
258+
* Create a Subscription object in reference to an existing subscription. This
259+
* command by itself will not run any API requests. You will receive a
260+
* {@linkcode module:pubsub/subscription} object, which will allow you to
261+
* interact with your subscription.
262+
*
263+
* @throws {Error} If a name is not provided.
264+
*
265+
* @param {string} name - The name of the subscription.
266+
* @param {object=} options - Configuration object.
267+
* @param {boolean} options.autoAck - Automatically acknowledge the message once
268+
* it's pulled.
269+
* @param {number} options.interval - Interval in milliseconds to check for new
270+
* messages.
271+
* @return {module:pubsub/subscription}
272+
*
273+
* @example
274+
* var subscription = pubsub.subscription('my-existing-subscription');
275+
*
276+
* // Register a listener for `message` events.
277+
* subscription.on('message', function(message) {
278+
* // Called every time a message is received.
279+
* // message.id = ID used to acknowledge its receival.
280+
* // message.data = Contents of the message.
281+
* // message.attributes = Attributes of the message.
282+
* });
283+
*/
284+
PubSub.prototype.subscription = function(name, options) {
285+
if (!name) {
286+
throw new Error('The name of a subscription is required.');
287+
}
288+
289+
options = options || {};
290+
options.name = name;
291+
return new Subscription(this, options);
292+
};
293+
165294
/**
166295
* Create a Topic object to reference an existing topic.
167296
*
@@ -194,9 +323,16 @@ PubSub.prototype.topic = function(name, options) {
194323
* You may optionally provide a query object as the first argument to customize
195324
* the response.
196325
*
197-
* @param {object=} query - Query object.
198-
* @param {string=} query.pageToken - Page token.
199-
* @param {number=} query.pageSize - Maximum number of results to return.
326+
* Your provided callback will be invoked with an error object if an API error
327+
* occurred or an array of {@linkcode module:pubsub/subscription} objects.
328+
*
329+
* To get subscriptions for a topic, see {module:pubsub/topic}.
330+
*
331+
* @param {object=} options - Configuration object.
332+
* @param {string|module:pubsub/topic} options.topic - The name of the topic to
333+
* list subscriptions from.
334+
* @param {number} options.pageSize - Maximum number of results to return.
335+
* @param {string} options.pageToken - Page token.
200336
* @param {function} callback - The callback function.
201337
*
202338
* @example
@@ -205,38 +341,73 @@ PubSub.prototype.topic = function(name, options) {
205341
* // so, run `pubsub.getSubscriptions(nextQuery, callback);`.
206342
* };
207343
*
208-
* // Get all subscriptions.
344+
* //-
345+
* // Get all subscriptions for this project.
346+
* //-
209347
* pubsub.getSubscriptions(callback);
210348
*
349+
* //-
211350
* // Customize the query.
351+
* //-
212352
* pubsub.getSubscriptions({
213-
* pageSize: 10
353+
* pageSize: 3
214354
* }, callback);
215355
*/
216-
PubSub.prototype.getSubscriptions = function(query, callback) {
356+
PubSub.prototype.getSubscriptions = function(options, callback) {
217357
var self = this;
358+
218359
if (!callback) {
219-
callback = query;
220-
query = {};
360+
callback = options;
361+
options = {};
221362
}
222363

223-
var path = this.projectName + '/subscriptions';
224-
this.makeReq_('GET', path, query, true, function(err, result) {
364+
options = options || {};
365+
366+
var topicName;
367+
368+
if (util.is(options.topic, 'string')) {
369+
topicName = options.topic;
370+
} else if (options.topic instanceof Topic) {
371+
topicName = options.topic.unformattedName;
372+
}
373+
374+
var query = {};
375+
376+
if (options.pageSize) {
377+
query.pageSize = options.pageSize;
378+
}
379+
380+
if (options.pageToken) {
381+
query.pageToken = options.pageToken;
382+
}
383+
384+
var apiPath = util.format('{projectPath}{topicPath}/subscriptions', {
385+
projectPath: 'projects/' + this.projectId,
386+
topicPath: topicName ? '/topics/' + topicName : ''
387+
});
388+
389+
this.makeReq_('GET', apiPath, query, null, function(err, result) {
225390
if (err) {
226391
callback(err, null, null, result);
227392
return;
228393
}
229394

230-
var subscriptions = (result.subscriptions || []).map(function(item) {
395+
var subscriptions = (result.subscriptions || []).map(function(sub) {
231396
return new Subscription(self, {
232-
name: item.name
397+
// Depending on if we're using a subscriptions.list or
398+
// topics.subscriptions.list API endpoint, we will get back a
399+
// Subscription resource or just the name of the subscription.
400+
name: sub.name || sub
233401
});
234402
});
403+
235404
var nextQuery = null;
405+
236406
if (result.nextPageToken) {
237-
nextQuery = query;
407+
nextQuery = options;
238408
nextQuery.pageToken = result.nextPageToken;
239409
}
410+
240411
callback(null, subscriptions, nextQuery, result);
241412
});
242413
};

lib/pubsub/subscription.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ function Subscription(pubsub, options) {
117117
events.EventEmitter.call(this);
118118

119119
this.name = Subscription.formatName_(pubsub.projectId, options.name);
120+
120121
this.makeReq_ = pubsub.makeReq_.bind(pubsub);
121122

122123
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;

0 commit comments

Comments
 (0)