class Delayed::Command
Constants
- DIR_PWD
Attributes
worker_count[RW]
worker_pools[RW]
Public Class Methods
new(args)
click to toggle source
# File lib/delayed/command.rb, line 18 def initialize(args) # rubocop:disable MethodLength @options = { :quiet => true, :pid_dir => "#{root}/tmp/pids", :log_dir => "#{root}/log" } @worker_count = 1 @monitor = false opts = OptionParser.new do |opt| opt.banner = "Usage: #{File.basename($PROGRAM_NAME)} [options] start|stop|restart|run" opt.on('-h', '--help', 'Show this message') do puts opt exit 1 end opt.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |_e| STDERR.puts 'The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/7' end opt.on('--min-priority N', 'Minimum priority of jobs to run.') do |n| @options[:min_priority] = n end opt.on('--max-priority N', 'Maximum priority of jobs to run.') do |n| @options[:max_priority] = n end opt.on('-n', '--number_of_workers=workers', 'Number of unique workers to spawn') do |worker_count| @worker_count = worker_count.to_i rescue 1 end opt.on('--pid-dir=DIR', 'Specifies an alternate directory in which to store the process ids.') do |dir| @options[:pid_dir] = dir end opt.on('--log-dir=DIR', 'Specifies an alternate directory in which to store the delayed_job log.') do |dir| @options[:log_dir] = dir end opt.on('-i', '--identifier=n', 'A numeric identifier for the worker.') do |n| @options[:identifier] = n end opt.on('-m', '--monitor', 'Start monitor process.') do @monitor = true end opt.on('--sleep-delay N', 'Amount of time to sleep when no jobs are found') do |n| @options[:sleep_delay] = n.to_i end opt.on('--read-ahead N', 'Number of jobs from the queue to consider') do |n| @options[:read_ahead] = n end opt.on('-p', '--prefix NAME', 'String to be prefixed to worker process names') do |prefix| @options[:prefix] = prefix end opt.on('--queues=queues', 'Specify which queue DJ must look up for jobs') do |queues| @options[:queues] = queues.split(',') end opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| @options[:queues] = queue.split(',') end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| parse_worker_pool(pool) end opt.on('--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do @options[:exit_on_complete] = true end end @args = opts.parse!(args) end
Public Instance Methods
daemonize()
click to toggle source
# File lib/delayed/command.rb, line 84 def daemonize # rubocop:disable PerceivedComplexity dir = @options[:pid_dir] FileUtils.mkdir_p(dir) unless File.exist?(dir) if worker_pools setup_pools elsif @options[:identifier] if worker_count > 1 raise ArgumentError, 'Cannot specify both --number-of-workers and --identifier' else run_process("delayed_job.#{@options[:identifier]}", @options) end else worker_count.times do |worker_index| process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}" run_process(process_name, @options) end end end
run(worker_name = nil, options = {})
click to toggle source
# File lib/delayed/command.rb, line 124 def run(worker_name = nil, options = {}) Dir.chdir(root) Delayed::Worker.after_fork Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log')) worker = Delayed::Worker.new(options) worker.name_prefix = "#{worker_name} " worker.start rescue => e STDERR.puts e.message STDERR.puts e.backtrace ::Rails.logger.fatal(e) if rails_logger_defined? exit_with_error_status end
run_process(process_name, options = {})
click to toggle source
# File lib/delayed/command.rb, line 116 def run_process(process_name, options = {}) Delayed::Worker.before_fork Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| $0 = File.join(options[:prefix], process_name) if @options[:prefix] run process_name, options end end
setup_pools()
click to toggle source
# File lib/delayed/command.rb, line 104 def setup_pools worker_index = 0 @worker_pools.each do |queues, worker_count| options = @options.merge(:queues => queues) worker_count.times do process_name = "delayed_job.#{worker_index}" run_process(process_name, options) worker_index += 1 end end end
Private Instance Methods
exit_with_error_status()
click to toggle source
# File lib/delayed/command.rb, line 163 def exit_with_error_status exit 1 end
parse_worker_pool(pool)
click to toggle source
# File lib/delayed/command.rb, line 142 def parse_worker_pool(pool) @worker_pools ||= [] queues, worker_count = pool.split(':') queues = ['*', '', nil].include?(queues) ? [] : queues.split(',') worker_count = (worker_count || 1).to_i rescue 1 @worker_pools << [queues, worker_count] end
rails_logger_defined?()
click to toggle source
# File lib/delayed/command.rb, line 159 def rails_logger_defined? defined?(::Rails.logger) end
rails_root_defined?()
click to toggle source
# File lib/delayed/command.rb, line 155 def rails_root_defined? defined?(::Rails.root) end
root()
click to toggle source
# File lib/delayed/command.rb, line 151 def root @root ||= rails_root_defined? ? ::Rails.root : DIR_PWD end