diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb index 4658ed06..a7e84898 100644 --- a/lib/resque/scheduler/delaying_extensions.rb +++ b/lib/resque/scheduler/delaying_extensions.rb @@ -68,16 +68,20 @@ def enqueue_in_with_queue(queue, number_of_seconds_from_now, # insertion time complexity is O(log(n)). Returns true if it's # the first job to be scheduled at that time, else false. def delayed_push(timestamp, item) - # First add this item to the list for this timestamp - redis.rpush("delayed:#{timestamp.to_i}", encode(item)) - - # Store the timestamps at with this item occurs - redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") - - # Now, add this timestamp to the zsets. The score and the value are - # the same since we'll be querying by timestamp, and we don't have - # anything else to store. - redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i + redis.pipelined do + redis.multi do + # First add this item to the list for this timestamp + redis.rpush("delayed:#{timestamp.to_i}", encode(item)) + + # Store the timestamps at with this item occurs + redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") + + # Now, add this timestamp to the zsets. The score and the value are + # the same since we'll be querying by timestamp, and we don't have + # anything else to store. + redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i + end + end end # Returns an array of timestamps based on start and count @@ -318,10 +322,10 @@ def clean_up_timestamp(key, timestamp) # queue while we're removing it. redis.watch(key) do if redis.llen(key).to_i == 0 - # If the list is empty, remove it. - redis.multi do - redis.del(key) - redis.zrem(:delayed_queue_schedule, timestamp.to_i) + redis.pipelined do + redis.multi do + redis.zrem(:delayed_queue_schedule, timestamp.to_i) + end end else redis.redis.unwatch diff --git a/lib/resque/scheduler/scheduling_extensions.rb b/lib/resque/scheduler/scheduling_extensions.rb index f5aa8157..6af42f94 100644 --- a/lib/resque/scheduler/scheduling_extensions.rb +++ b/lib/resque/scheduler/scheduling_extensions.rb @@ -85,13 +85,17 @@ def all_schedules def set_schedule(name, config, reload = true) persist = config.delete(:persist) || config.delete('persist') - if persist - redis.hset(:persistent_schedules, name, encode(config)) - else - non_persistent_schedules[name] = decode(encode(config)) + redis.pipelined do + redis.multi do + if persist + redis.hset(:persistent_schedules, name, encode(config)) + else + non_persistent_schedules[name] = decode(encode(config)) + end + + redis.sadd(:schedules_changed, name) + end end - - redis.sadd(:schedules_changed, name) reload_schedule! if reload end @@ -104,8 +108,12 @@ def fetch_schedule(name) # Preventing a reload is optional and available to batch operations def remove_schedule(name, reload = true) non_persistent_schedules.delete(name) - redis.hdel(:persistent_schedules, name) - redis.sadd(:schedules_changed, name) + redis.pipelined do + redis.multi do + redis.hdel(:persistent_schedules, name) + redis.sadd(:schedules_changed, name) + end + end reload_schedule! if reload end