Parent

Delayed::Worker

Attributes

name_prefix[RW]

name_prefix is ignored if name is set directly

Public Class Methods

backend=(backend) click to toggle source
# File lib/delayed/worker.rb, line 32
def self.backend=(backend)
  if backend.is_a? Symbol
    require "delayed/serialization/#{backend}"
    require "delayed/backend/#{backend}"
    backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
  end
  @@backend = backend
  silence_warnings { ::Delayed.const_set(:Job, backend) }
end
guess_backend() click to toggle source
# File lib/delayed/worker.rb, line 42
def self.guess_backend
  self.backend ||= :active_record if defined?(ActiveRecord)
end
new(options={}) click to toggle source
# File lib/delayed/worker.rb, line 46
def initialize(options={})
  @quiet = options.has_key?(:quiet) ? options[:quiet] : true
  self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
  self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
  self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
end

Public Instance Methods

failed(job) click to toggle source
# File lib/delayed/worker.rb, line 147
def failed(job)
  job.hook(:failure)
  if job.respond_to?(:on_permanent_failure)
    warn "[DEPRECATION] The #on_permanent_failure hook has been renamed to #failure."
  end
  self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
end
max_attempts(job) click to toggle source
# File lib/delayed/worker.rb, line 161
def max_attempts(job)
  job.max_attempts || self.class.max_attempts
end
name() click to toggle source

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.

# File lib/delayed/worker.rb, line 57
def name
  return @name unless @name.nil?
  "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end
name=(val) click to toggle source

Sets the name of the worker. Setting the name to nil will reset the default worker name

# File lib/delayed/worker.rb, line 64
def name=(val)
  @name = val
end
reschedule(job, time = nil) click to toggle source

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.

# File lib/delayed/worker.rb, line 135
def reschedule(job, time = nil)
  if (job.attempts += 1) < max_attempts(job)
    time ||= job.reschedule_at
    job.run_at = time
    job.unlock
    job.save!
  else
    say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
    failed(job)
  end
end
run(job) click to toggle source
# File lib/delayed/worker.rb, line 118
def run(job)
  runtime =  Benchmark.realtime do
    Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
    job.destroy
  end
  say "#{job.name} completed after %.4f" % runtime
  return true  # did work
rescue DeserializationError => error
  job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}"
  failed(job)
rescue Exception => error
  handle_failed_job(job, error)
  return false  # work failed
end
say(text, level = Logger::INFO) click to toggle source
# File lib/delayed/worker.rb, line 155
def say(text, level = Logger::INFO)
  text = "[Worker(#{name})] #{text}"
  puts text unless @quiet
  logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
end
start() click to toggle source
# File lib/delayed/worker.rb, line 68
def start
  say "Starting job worker"

  trap('TERM') { say 'Exiting...'; $exit = true }
  trap('INT')  { say 'Exiting...'; $exit = true }

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if $exit

    if count.zero?
      sleep(self.class.sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if $exit
  end

ensure
  Delayed::Job.clear_locks!(name)
end
work_off(num = 100) click to toggle source

Do num jobs and return stats on success/failure. Exit early if interrupted.

# File lib/delayed/worker.rb, line 100
def work_off(num = 100)
  success, failure = 0, 0

  num.times do
    case reserve_and_run_one_job
    when true
        success += 1
    when false
        failure += 1
    else
      break  # leave if no work could be done
    end
    break if $exit # leave if we're exiting
  end

  return [success, failure]
end

Protected Instance Methods

handle_failed_job(job, error) click to toggle source
# File lib/delayed/worker.rb, line 167
def handle_failed_job(job, error)
  job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}"
  say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
  reschedule(job)
end
reserve_and_run_one_job() click to toggle source

Run the next job we can get an exclusive lock on. If no jobs are left we return nil

# File lib/delayed/worker.rb, line 175
def reserve_and_run_one_job
  job = Delayed::Job.reserve(self)
  run(job) if job
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.