Skip to content

Commit 0669289

Browse files
move subscription methods up a level
1 parent b157b41 commit 0669289

7 files changed

Lines changed: 540 additions & 309 deletions

File tree

lib/common/util.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ util.arrayize = arrayize;
124124
*/
125125
function format(template, args) {
126126
return template.replace(/{([^}]*)}/g, function(match, key) {
127-
return args[key] || match;
127+
return key in args ? args[key] : match;
128128
});
129129
}
130130

lib/pubsub/index.js

Lines changed: 187 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,138 @@ PubSub.prototype.createTopic = function(name, callback) {
162162
});
163163
};
164164

165+
/**
166+
* Create a subscription to a topic. You may optionally provide an object to
167+
* customize the subscription.
168+
*
169+
* Your provided callback will be invoked with an error object if an API error
170+
* occurred or a {@linkcode module:pubsub/subscription} object.
171+
*
172+
* @throws {Error} If a Topic instance or topic name is not provided.
173+
* @throws {Error} If a subName is not provided.
174+
*
175+
* @param {module:pubsub/topic|string} - topic - The Topic to create a
176+
* subscription to.
177+
* @param {string} subName - The name of the subscription.
178+
* @param {object=} options - Configuration object.
179+
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
180+
* a message that you must ack a message before it is redelivered.
181+
* @param {boolean} options.autoAck - Automatically acknowledge the message once
182+
* it's pulled. (default: false)
183+
* @param {number} options.interval - Interval in milliseconds to check for new
184+
* messages. (default: 10)
185+
* @param {boolean} options.reuseExisting - If the subscription already exists,
186+
* reuse it. The options of the existing subscription are not changed. If
187+
* false, attempting to create a subscription that already exists will fail.
188+
* (default: false)
189+
* @param {function} callback - The callback function.
190+
*
191+
* @example
192+
* //-
193+
* // Subscribe to a topic. (Also see {module:pubsub/topic#subscribe}).
194+
* //-
195+
* var topic = 'messageCenter';
196+
* var name = 'newMessages';
197+
*
198+
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
199+
*
200+
* //-
201+
* // Customize the subscription.
202+
* //-
203+
* pubsub.subscribe(topic, name, {
204+
* ackDeadlineSeconds: 90,
205+
* autoAck: true,
206+
* interval: 30
207+
* }, function(err, subscription, apiResponse) {});
208+
*
209+
* //-
210+
* // Create a subscription to a topic from another project.
211+
* //-
212+
* var anotherProject = gcloud.pubsub({
213+
* projectId: 'another-project'
214+
* });
215+
*
216+
* var topic = anotherProject.topic('messageCenter');
217+
*
218+
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
219+
*/
220+
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
221+
if (!util.is(topic, 'string') && !(topic instanceof Topic)) {
222+
throw new Error('A Topic is required for a new subscription.');
223+
}
224+
225+
if (!util.is(subName, 'string')) {
226+
throw new Error('A subscription name is required for a new subscription.');
227+
}
228+
229+
if (!callback) {
230+
callback = options;
231+
options = {};
232+
}
233+
234+
options = options || {};
235+
236+
if (util.is(topic, 'string')) {
237+
topic = this.topic(topic);
238+
}
239+
240+
var body = {
241+
topic: topic.name
242+
};
243+
244+
if (options.ackDeadlineSeconds) {
245+
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
246+
}
247+
248+
var subscription = this.subscription(subName, options);
249+
250+
this.makeReq_('PUT', subscription.name, null, body, function(err, result) {
251+
if (err && !(err.code === 409 && options.reuseExisting)) {
252+
callback(err, null, result);
253+
return;
254+
}
255+
256+
callback(null, subscription, result);
257+
});
258+
};
259+
260+
/**
261+
* Create a Subscription object in reference to an existing subscription. This
262+
* command by itself will not run any API requests. You will receive a
263+
* {@linkcode module:pubsub/subscription} object, which will allow you to
264+
* interact with your subscription.
265+
*
266+
* @throws {Error} If a name is not provided.
267+
*
268+
* @param {string} name - The name of the subscription.
269+
* @param {object=} options - Configuration object.
270+
* @param {boolean} options.autoAck - Automatically acknowledge the message once
271+
* it's pulled.
272+
* @param {number} options.interval - Interval in milliseconds to check for new
273+
* messages.
274+
* @return {module:pubsub/subscription}
275+
*
276+
* @example
277+
* var subscription = pubsub.subscription('my-existing-subscription');
278+
*
279+
* // Register a listener for `message` events.
280+
* subscription.on('message', function(message) {
281+
* // Called every time a message is received.
282+
* // message.id = ID used to acknowledge its receival.
283+
* // message.data = Contents of the message.
284+
* // message.attributes = Attributes of the message.
285+
* });
286+
*/
287+
PubSub.prototype.subscription = function(name, options) {
288+
if (!name) {
289+
throw new Error('The name of a subscription is required.');
290+
}
291+
292+
options = options || {};
293+
options.name = name;
294+
return new Subscription(this, options);
295+
};
296+
165297
/**
166298
* Create a Topic object to reference an existing topic.
167299
*
@@ -194,9 +326,16 @@ PubSub.prototype.topic = function(name, options) {
194326
* You may optionally provide a query object as the first argument to customize
195327
* the response.
196328
*
197-
* @param {object=} query - Query object.
198-
* @param {string=} query.pageToken - Page token.
199-
* @param {number=} query.pageSize - Maximum number of results to return.
329+
* Your provided callback will be invoked with an error object if an API error
330+
* occurred or an array of {@linkcode module:pubsub/subscription} objects.
331+
*
332+
* To get subscriptions for a topic, see {module:pubsub/topic}.
333+
*
334+
* @param {object=} options - Configuration object.
335+
* @param {string|module:pubsub/topic} options.topic - The name of the topic to
336+
* list subscriptions from.
337+
* @param {number} options.pageSize - Maximum number of results to return.
338+
* @param {string} options.pageToken - Page token.
200339
* @param {function} callback - The callback function.
201340
*
202341
* @example
@@ -205,38 +344,73 @@ PubSub.prototype.topic = function(name, options) {
205344
* // so, run `pubsub.getSubscriptions(nextQuery, callback);`.
206345
* };
207346
*
208-
* // Get all subscriptions.
347+
* //-
348+
* // Get all subscriptions for this project.
349+
* //-
209350
* pubsub.getSubscriptions(callback);
210351
*
352+
* //-
211353
* // Customize the query.
354+
* //-
212355
* pubsub.getSubscriptions({
213-
* pageSize: 10
356+
* pageSize: 3
214357
* }, callback);
215358
*/
216-
PubSub.prototype.getSubscriptions = function(query, callback) {
359+
PubSub.prototype.getSubscriptions = function(options, callback) {
217360
var self = this;
361+
218362
if (!callback) {
219-
callback = query;
220-
query = {};
363+
callback = options;
364+
options = {};
221365
}
222366

223-
var path = this.projectName + '/subscriptions';
224-
this.makeReq_('GET', path, query, true, function(err, result) {
367+
options = options || {};
368+
369+
var topicName;
370+
371+
if (util.is(options.topic, 'string')) {
372+
topicName = options.topic;
373+
} else if (options.topic instanceof Topic) {
374+
topicName = options.topic.unformattedName;
375+
}
376+
377+
var query = {};
378+
379+
if (options.pageSize) {
380+
query.pageSize = options.pageSize;
381+
}
382+
383+
if (options.pageToken) {
384+
query.pageToken = options.pageToken;
385+
}
386+
387+
var apiPath = util.format('{projectPath}{topicPath}/subscriptions', {
388+
projectPath: 'projects/' + this.projectId,
389+
topicPath: topicName ? '/topics/' + topicName : ''
390+
});
391+
392+
this.makeReq_('GET', apiPath, query, null, function(err, result) {
225393
if (err) {
226394
callback(err, null, null, result);
227395
return;
228396
}
229397

230-
var subscriptions = (result.subscriptions || []).map(function(item) {
398+
var subscriptions = (result.subscriptions || []).map(function(sub) {
231399
return new Subscription(self, {
232-
name: item.name
400+
// Depending on if we're using a subscriptions.list or
401+
// topics.subscriptions.list API endpoint, we will get back a
402+
// Subscription resource or just the name of the subscription.
403+
name: sub.name || sub
233404
});
234405
});
406+
235407
var nextQuery = null;
408+
236409
if (result.nextPageToken) {
237-
nextQuery = query;
410+
nextQuery = options;
238411
nextQuery.pageToken = result.nextPageToken;
239412
}
413+
240414
callback(null, subscriptions, nextQuery, result);
241415
});
242416
};

lib/pubsub/subscription.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ function Subscription(pubsub, options) {
117117
events.EventEmitter.call(this);
118118

119119
this.name = Subscription.formatName_(pubsub.projectId, options.name);
120+
120121
this.makeReq_ = pubsub.makeReq_.bind(pubsub);
121122

122123
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;

0 commit comments

Comments
 (0)