みなさん、こんにちは。まどぎわです。
rubyで非同期処理やるときのデファクトスタンダード的なgemsidekiq
のコードを読んで、概要が割とつかめた気がしてきたので、どういう感じで動いてるか自分の理解の範囲でメモしてみました🙇
sidekiqの機能としては大きく分けて、
- Redisへのqueueのpush
- Redisからqueueのpopとjobの実行
だと思ったので、それについてsidekiqのコードと合わせて概要を整理してみました。
※記載しているコードについては、読みやすいコードを削除しているので全文が読みたい方は、それぞれのリンク先で確認いただけますと🙏
前提
今回調べたsidekiqのversionは、2019/04/28現在のmaster
である、6.0.0.pre1
です。
# frozen_string_literal: true module Sidekiq VERSION = "6.0.0.pre1" end
Redisへのqueueのpush
非同期処理の呼び出しは、perform_async
によって行われる。
引数にself
とargs
を与えて、client_push
を呼び出している。self
には、hogeJob
等のjob設定される。
def perform_async(*args) client_push("class" => self, "args" => args) end
client_push
ではredisへ接続するためのpool
を取得し、引数を文字列に変換してSidekiq::Client.new(pool).push(item)
を呼び出している。
def client_push(item) # :nodoc: # redisに接続するためのpoolを取得 pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool # stringify item.keys.each do |key| item[key.to_s] = item.delete(key) end # redisへのpush処理の呼び出し Sidekiq::Client.new(pool).push(item) end
push
メソッドでredisへの登録処理を呼び出している👀
module Sidekiq class Client def push(item) # to_sとかしてベーシックなjsonのような形式に変換 normed = normalize_item(item) # middrewareを実行してitemを返却 payload = process_single(item["class"], normed) if payload # radisのpush処理 raw_push([payload]) # jidを返す payload["jid"] end end private # radisへのpush処理 def raw_push(payloads) @redis_pool.with do |conn| conn.multi do atomic_push(conn, payloads) end end true end # jsonに変換してconnectionを使ってradisに登録 def atomic_push(conn, payloads) # scheduledの場合 if payloads.first["at"] conn.zadd("schedule", payloads.map { |hash| at = hash.delete("at").to_s [at, Sidekiq.dump_json(hash)] }) # 通常の場合 else queue = payloads.first["queue"] now = Time.now.to_f to_push = payloads.map { |entry| entry["enqueued_at"] = now Sidekiq.dump_json(entry) } # ここがredisへの登録処理の本丸 conn.sadd("queues", queue) conn.lpush("queue:#{queue}", to_push) end end
queueから取り出して実行
sidekiqの起動
sidekiqの起動はCLI.run
で行われる。run
の中で、Sidekiq::Launcher
のインスタンスが作成されてlauncher.run
が実行される。
module Sidekiq class CLI def run boot_system if environment == "development" && $stdout.tty? && Sidekiq.log_formatter.is_a?(Sidekiq::Logger::Formatters::Pretty) print_banner end # 省略: radisのversionのチェックとかいろいろやる launch(self_read) end def launch(self_read) # 省略 @launcher = Sidekiq::Launcher.new(options) begin launcher.run while (readable_io = IO.select([self_read])) signal = readable_io.first[0].gets.strip handle_signal(signal) end # 省略 end end
Launcher#run
の中では、スレッドの作成、ポーリングの開始、Manager#start
が呼ばれます。
※Pollerは、N秒に一回scheduleされたjobがあればradisのqueueに入れるようなことをやってる。
module Sidekiq class Launcher def initialize(options) @manager = Sidekiq::Manager.new(options) @poller = Sidekiq::Scheduled::Poller.new @done = false @options = options end def run @thread = safe_thread("heartbeat", &method(:start_heartbeat)) @poller.start @manager.start end end end
Manager#new
の中で並列実行数concurrency
の数だけ、worker(Processor.new(self)
)を作成して、Manager#start
で、全てstart
させる。
module Sidekiq class Manager def initialize(options = {}) # 省略 @count = options[:concurrency] || 10 @count.times do @workers << Processor.new(self) end # 省略 end def start @workers.each do |x| x.start end end
RadisからqueueのpupとJobの実行
Processor
がメインの処理、ここでqueueから取り出したjobを実行している。
ざっくりとした流れは、
run
が実行され、process_one
が終了されるまで実行され続ける。process_one
の中でredis
からdequeueして、引数work
としてprocess
に渡す。process
の中で、work
から各種情報を引き出し、dispath
を呼び出して、worker
(jobのインスタンス)を作成して、execute_job
でworker.perform
を実行し、処理を実行している。
※下記が読みやすいように、いろいろ省略したSidekiq:: Processor
のコードです。
module Sidekiq class Processor def initialize(mgr) # 省略 @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options) @reloader = Sidekiq.options[:reloader] @job_logger = (mgr.options[:job_logger] || Sidekiq::JobLogger).new @retrier = Sidekiq::JobRetry.new end # runが実行される。 def start @thread ||= safe_thread("processor", &method(:run)) end def run # ここがdoneになるまで繰り返されてるので、 # queueから取り出されてjobを実行するという処理が、ずっと続く。 process_one until @done @mgr.processor_stopped(self) # 省略 end # ここで、radisからqueを取り出してjobに入れてprocessで処理を開始 def process_one @job = fetch process(@job) if @job @job = nil end # 取得して、終了してたらもう一回queueに入れ直すような処理 def fetch j = get_one if j && @done j.requeue nil else j end end # BasicFetch#retrieve_workでradisからパースしたオブジェクトを取得 # https://github.com/mperham/sidekiq/blob/d16572bcdd0aa52985f9dc9e79f5179a6c828154/lib/sidekiq/fetch.rb#L36 def get_one work = @strategy.retrieve_work # 省略 work # 省略 end # jobの実行処理 def process(work) # このへんはradisから取り出したオブジェクトをいい感じにする処理 jobstr = work.job queue = work.queue_name job_hash = Sidekiq.load_json(jobstr) ack = true begin # この中でmiddrewareを実行して、workerと引数を渡してjobを実行してる。 dispatch(job_hash, queue) do |worker| Sidekiq.server_middleware.invoke(worker, job_hash, queue) do execute_job(worker, cloned(job_hash["args"])) end end rescue Sidekiq::Shutdown ack = false rescue Sidekiq::JobRetry::Handled => h raise e rescue Exception => ex raise e ensure work.acknowledge if ack end end # worker(jobのインスタンス)のworker.performを呼び出している。 def execute_job(worker, cloned_args) worker.perform(*cloned_args) end # ここでlog出したり、workerをjobのclassのインスタンスに変更してる。 def dispatch(job_hash, queue) pristine = cloned(job_hash) @job_logger.with_job_hash_context(job_hash) do @retrier.global(pristine, queue) do @job_logger.call(job_hash, queue) do stats(pristine, queue) do @reloader.call do klass = constantize(job_hash["class"]) worker = klass.new worker.jid = job_hash["jid"] @retrier.local(worker, pristine, queue) do yield worker end end end end end end end
おわりに
今回はsidekiqがどのように動いているか、コードを読みながら概要をまとめてみました。
Radisからpush/popしているためjson形式のやり取りになり、symbolやmodelのインスタンス等を引数で渡すと、いろいろ問題が起きてしまうんですね。(ActiveJobはこの辺をよしなにやってくれてそうです。)
またコードを読む中で変数名が省略されている箇所があったので、PR送ったらmergeしてもらえたので、sidekiqのコントリビューターになりました🙌
(また地味にwikiも修正してたりします)
https://github.com/mperham/sidekiq/wiki/Best-Practices/_history