def initialize(params = {})
opts = {
:queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
:worker_interval => 10,
:master_interval => 5,
:lossy => false,
:mq_attr => nil,
:mq_umask => 0666,
:aggregate => Aggregate.new,
}.merge! params
@master_interval = opts[:master_interval]
@worker_interval = opts[:worker_interval]
@aggregate = opts[:aggregate]
@worker_queue = @worker_interval ? [] : nil
@mutex = Mutex.new
@mq_name = opts[:queue]
mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
Tempfile.open("raindrops_pmq") do |t|
@wr = File.open(t.path, "wb")
@rd = File.open(t.path, "rb")
end
@cached_aggregate = @aggregate
flush_master
@mq_send = if opts[:lossy]
@nr_dropped = 0
mq.nonblock = true
mq.method :trysend
else
mq.method :send
end
end