Skip to content

Commit c1ea301

Browse files
Merge branch 'main' into feature/clear-ar-connections-flag
2 parents fbeb3e5 + 176721e commit c1ea301

13 files changed

Lines changed: 377 additions & 30 deletions

File tree

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
solid_queue (1.3.2)
4+
solid_queue (1.4.0)
55
activejob (>= 7.1)
66
activerecord (>= 7.1)
77
concurrent-ruby (>= 1.3.1)

README.md

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
1717
- [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler)
1818
- [Fork vs. async mode](#fork-vs-async-mode)
1919
- [Configuration](#configuration)
20+
- [Optional scheduler configuration](#optional-scheduler-configuration)
2021
- [Queue order and priorities](#queue-order-and-priorities)
2122
- [Queues specification and performance](#queues-specification-and-performance)
2223
- [Threads, processes, and signals](#threads-processes-and-signals)
@@ -31,6 +32,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
3132
- [Puma plugin](#puma-plugin)
3233
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
3334
- [Recurring tasks](#recurring-tasks)
35+
- [Scheduling and unscheduling recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically)
3436
- [Inspiration](#inspiration)
3537
- [License](#license)
3638

@@ -209,7 +211,7 @@ By default, Solid Queue will try to find your configuration under `config/queue.
209211
bin/jobs -c config/calendar.yml
210212
```
211213

212-
You can also skip all recurring tasks by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`.
214+
You can also skip the scheduler process by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`.
213215

214216
This is what this configuration looks like:
215217

@@ -227,6 +229,10 @@ production:
227229
threads: 5
228230
polling_interval: 0.1
229231
processes: 3
232+
scheduler:
233+
dynamic_tasks_enabled: true
234+
polling_interval: 5
235+
230236
```
231237

232238
Everything is optional. If no configuration at all is provided, Solid Queue will run with one dispatcher and one worker with default settings. If you want to run only dispatchers or workers, you just need to include that section alone in the configuration. For example, with the following configuration:
@@ -271,6 +277,19 @@ It is recommended to set this value less than or equal to the queue database's c
271277
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
272278

273279

280+
### Optional scheduler configuration
281+
282+
Optionally, you can configure the scheduler process under the `scheduler` section in your `config/queue.yml` if you'd like to [schedule recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically).
283+
284+
```yaml
285+
scheduler:
286+
dynamic_tasks_enabled: true
287+
polling_interval: 5
288+
```
289+
290+
- `dynamic_tasks_enabled`: whether the scheduler should poll for [dynamically scheduled recurring tasks](#scheduling-and-unscheduling-recurring-tasks-dynamically). This is `false` by default. When enabled, the scheduler will poll the database at the given `polling_interval` to pick up tasks scheduled via `SolidQueue.schedule_recurring_task`.
291+
- `polling_interval`: how frequently (in seconds) the scheduler checks for dynamic task changes. Defaults to `5`.
292+
274293
### Queue order and priorities
275294

276295
As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`.
@@ -462,7 +481,7 @@ class MyJob < ApplicationJob
462481
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
463482
- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following:
464483
- (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires.
465-
- `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed.
484+
- `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded until the interval defined by `duration` has elapsed.
466485

467486
When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).
468487

@@ -472,7 +491,7 @@ Since something can happen that prevents the first job from releasing the semaph
472491

473492
It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.
474493

475-
When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore.
494+
When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for until the `duration` interval if something happens and a running job fails to release the semaphore.
476495

477496

478497
For example:
@@ -732,6 +751,38 @@ my_periodic_resque_job:
732751

733752
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.
734753

754+
### Scheduling and unscheduling recurring tasks dynamically
755+
756+
You can schedule and unschedule recurring tasks at runtime, without editing the configuration file. To enable this, you need to set `dynamic_tasks_enabled: true` in the `scheduler` section of your `config/queue.yml`, [as explained earlier](#optional-scheduler-configuration).
757+
758+
```yaml
759+
scheduler:
760+
dynamic_tasks_enabled: true
761+
```
762+
763+
Then you can use the following methods to add recurring tasks dynamically:
764+
765+
```ruby
766+
SolidQueue.schedule_recurring_task(
767+
"my_dynamic_task",
768+
class: "MyJob",
769+
args: [1, 2],
770+
schedule: "every 10 minutes"
771+
)
772+
```
773+
774+
This accepts the same options as the YAML configuration: `class`, `args`, `command`, `schedule`, `queue`, `priority`, and `description`.
775+
776+
To remove a dynamically scheduled task:
777+
778+
```ruby
779+
SolidQueue.unschedule_recurring_task("my_dynamic_task")
780+
```
781+
782+
Only dynamic tasks can be unscheduled at runtime. Attempting to unschedule a static task (defined in `config/recurring.yml`) will raise an `ActiveRecord::RecordNotFound` error.
783+
784+
Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them.
785+
735786
## Inspiration
736787

737788
Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot.

app/models/solid_queue/ready_execution.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ def select_and_lock(queue_relation, process_id, limit)
3030
end
3131

3232
def select_candidates(queue_relation, limit)
33-
queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id)
33+
# Force query execution here with #to_a to avoid unintended FOR UPDATE query executions
34+
queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id).to_a
3435
end
3536

3637
def lock_candidates(executions, process_id)

app/models/solid_queue/recurring_task.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class RecurringTask < Record
1111
validate :ensure_existing_job_class
1212

1313
scope :static, -> { where(static: true) }
14+
scope :dynamic, -> { where(static: false) }
1415

1516
has_many :recurring_executions, foreign_key: :task_key, primary_key: :key
1617

@@ -32,7 +33,15 @@ def from_configuration(key, **options)
3233
queue_name: options[:queue].presence,
3334
priority: options[:priority].presence,
3435
description: options[:description],
35-
static: true
36+
static: options.fetch(:static, true)
37+
end
38+
39+
def create_dynamic_task(key, **options)
40+
from_configuration(key, **options.merge(static: false)).save!
41+
end
42+
43+
def delete_dynamic_task(key)
44+
RecurringTask.dynamic.find_by!(key: key).destroy
3645
end
3746

3847
def create_or_update_all(tasks)

lib/solid_queue.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ module SolidQueue
4444

4545
delegate :on_start, :on_stop, :on_exit, to: Supervisor
4646

47+
def schedule_recurring_task(key, **options)
48+
RecurringTask.create_dynamic_task(key, **options)
49+
end
50+
51+
def unschedule_recurring_task(key)
52+
RecurringTask.delete_dynamic_task(key)
53+
end
54+
4755
[ Dispatcher, Scheduler, Worker ].each do |process|
4856
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
4957
process.on_start(&block)

lib/solid_queue/configuration.rb

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ def instantiate
2828
concurrency_maintenance_interval: 600
2929
}
3030

31+
SCHEDULER_DEFAULTS = {
32+
polling_interval: 5,
33+
dynamic_tasks_enabled: false
34+
}
35+
3136
DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
3237
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"
3338

@@ -137,8 +142,10 @@ def dispatchers
137142
end
138143

139144
def schedulers
140-
if !skip_recurring_tasks? && recurring_tasks.any?
141-
[ Process.new(:scheduler, recurring_tasks: recurring_tasks) ]
145+
return [] if skip_recurring_tasks?
146+
147+
if recurring_tasks.any? || dynamic_recurring_tasks_enabled?
148+
[ Process.new(:scheduler, { recurring_tasks: recurring_tasks, **scheduler_options.with_defaults(SCHEDULER_DEFAULTS) }) ]
142149
else
143150
[]
144151
end
@@ -154,17 +161,29 @@ def dispatchers_options
154161
.map { |options| options.dup.symbolize_keys }
155162
end
156163

164+
def scheduler_options
165+
@scheduler_options ||= processes_config.fetch(:scheduler, {}).dup.symbolize_keys
166+
end
167+
168+
def dynamic_recurring_tasks_enabled?
169+
scheduler_options.fetch(:dynamic_tasks_enabled, SCHEDULER_DEFAULTS[:dynamic_tasks_enabled])
170+
end
171+
157172
def recurring_tasks
158173
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
159-
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
174+
RecurringTask.from_configuration(id, **options.merge(static: true)) if options&.has_key?(:schedule)
160175
end.compact
161176
end
162177

163178
def processes_config
164179
@processes_config ||= config_from \
165-
options.slice(:workers, :dispatchers).presence || options[:config_file],
166-
keys: [ :workers, :dispatchers ],
167-
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
180+
options.slice(:workers, :dispatchers, :scheduler).presence || options[:config_file],
181+
keys: [ :workers, :dispatchers, :scheduler ],
182+
fallback: {
183+
workers: [ WORKER_DEFAULTS ],
184+
dispatchers: [ DISPATCHER_DEFAULTS ],
185+
scheduler: SCHEDULER_DEFAULTS
186+
}
168187
end
169188

170189
def recurring_tasks_config
@@ -173,7 +192,6 @@ def recurring_tasks_config
173192
end
174193
end
175194

176-
177195
def config_from(file_or_hash, keys: [], fallback: {}, env: Rails.env)
178196
load_config_from(file_or_hash).then do |config|
179197
config = config[env.to_sym] ? config[env.to_sym] : config

lib/solid_queue/processes/registrable.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,5 +59,9 @@ def heartbeat
5959
self.process = nil
6060
wake_up
6161
end
62+
63+
def reload_metadata
64+
wrap_in_app_executor { process&.update(metadata: metadata.compact) }
65+
end
6266
end
6367
end

lib/solid_queue/scheduler.rb

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ class Scheduler < Processes::Base
55
include Processes::Runnable
66
include LifecycleHooks
77

8-
attr_reader :recurring_schedule
8+
attr_reader :recurring_schedule, :polling_interval
99

1010
after_boot :run_start_hooks
1111
after_boot :schedule_recurring_tasks
@@ -14,7 +14,10 @@ class Scheduler < Processes::Base
1414
after_shutdown :run_exit_hooks
1515

1616
def initialize(recurring_tasks:, **options)
17-
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
17+
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)
18+
@dynamic_tasks_enabled = options[:dynamic_tasks_enabled]
19+
@polling_interval = options[:polling_interval]
20+
@recurring_schedule = RecurringSchedule.new(recurring_tasks, dynamic_tasks_enabled: @dynamic_tasks_enabled)
1821

1922
super(**options)
2023
end
@@ -24,13 +27,16 @@ def metadata
2427
end
2528

2629
private
27-
SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks
30+
31+
STATIC_SLEEP_INTERVAL = 60
2832

2933
def run
3034
loop do
3135
break if shutting_down?
3236

33-
interruptible_sleep(SLEEP_INTERVAL)
37+
reload_dynamic_schedule if dynamic_tasks_enabled?
38+
39+
interruptible_sleep(sleep_interval)
3440
end
3541
ensure
3642
SolidQueue.instrument(:shutdown_process, process: self) do
@@ -46,10 +52,23 @@ def unschedule_recurring_tasks
4652
recurring_schedule.unschedule_tasks
4753
end
4854

55+
def reload_dynamic_schedule
56+
recurring_schedule.reschedule_dynamic_tasks
57+
reload_metadata
58+
end
59+
60+
def dynamic_tasks_enabled?
61+
@dynamic_tasks_enabled
62+
end
63+
4964
def all_work_completed?
5065
recurring_schedule.empty?
5166
end
5267

68+
def sleep_interval
69+
dynamic_tasks_enabled? ? polling_interval : STATIC_SLEEP_INTERVAL
70+
end
71+
5372
def set_procline
5473
procline "scheduling #{recurring_schedule.task_keys.join(",")}"
5574
end

0 commit comments

Comments
 (0)