Madogiwa Blog

主に技術系の学習メモに使っていきます。

Sidekiqがどうやって動いているのか、コードを読んで概要を掴めた気がしたのでメモしてみる

みなさん、こんにちは。まどぎわです。
rubyで非同期処理やるときのデファクトスタンダード的なgemsidekiqのコードを読んで、概要が割とつかめた気がしてきたので、どういう感じで動いてるか自分の理解の範囲でメモしてみました🙇

github.com

sidekiqの機能としては大きく分けて、

  • Redisへのqueueのpush
  • Redisからqueueのpopとjobの実行

だと思ったので、それについてsidekiqのコードと合わせて概要を整理してみました。
※記載しているコードについては、読みやすいコードを削除しているので全文が読みたい方は、それぞれのリンク先で確認いただけますと🙏

前提

今回調べたsidekiqのversionは、2019/04/28現在のmasterである、6.0.0.pre1です。

# frozen_string_literal: true

module Sidekiq
  VERSION = "6.0.0.pre1"
end

Redisへのqueueのpush

非同期処理の呼び出しは、perform_asyncによって行われる。 引数にselfargsを与えて、client_pushを呼び出している。selfには、hogeJob等のjob設定される。

def perform_async(*args)
  client_push("class" => self, "args" => args)
end

https://github.com/mperham/sidekiq/blob/b76dfb9e056d6c17d7c2fe66dd9ab3f38aa5423f/lib/sidekiq/worker.rb#L92

client_pushではredisへ接続するためのpoolを取得し、引数を文字列に変換してSidekiq::Client.new(pool).push(item)を呼び出している。

def client_push(item) # :nodoc:
  # redisに接続するためのpoolを取得
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end
  
  # redisへのpush処理の呼び出し
  Sidekiq::Client.new(pool).push(item)
end

https://github.com/mperham/sidekiq/blob/b76dfb9e056d6c17d7c2fe66dd9ab3f38aa5423f/lib/sidekiq/worker.rb#L142

pushメソッドでredisへの登録処理を呼び出している👀

module Sidekiq
  class Client
    def push(item)
      # to_sとかしてベーシックなjsonのような形式に変換
      normed = normalize_item(item)
      # middrewareを実行してitemを返却
      payload = process_single(item["class"], normed)
    
      if payload
        # radisのpush処理
        raw_push([payload])
        # jidを返す
        payload["jid"]
      end
    end
    
    private
    
    # radisへのpush処理
    def raw_push(payloads)
      @redis_pool.with do |conn|
        conn.multi do
          atomic_push(conn, payloads)
        end
      end
      true
    end
    
    # jsonに変換してconnectionを使ってradisに登録
    def atomic_push(conn, payloads)
      # scheduledの場合
      if payloads.first["at"]
        conn.zadd("schedule", payloads.map { |hash|
          at = hash.delete("at").to_s
          [at, Sidekiq.dump_json(hash)]
        })
      # 通常の場合
      else
        queue = payloads.first["queue"]
        now = Time.now.to_f
        to_push = payloads.map { |entry|
          entry["enqueued_at"] = now
          Sidekiq.dump_json(entry)
        }
        # ここがredisへの登録処理の本丸
        conn.sadd("queues", queue)
        conn.lpush("queue:#{queue}", to_push)
      end
    end

https://github.com/mperham/sidekiq/blob/d16572bcdd0aa52985f9dc9e79f5179a6c828154/lib/sidekiq/client.rb#L69

queueから取り出して実行

sidekiqの起動

sidekiqの起動はCLI.runで行われる。runの中で、Sidekiq::Launcherインスタンスが作成されてlauncher.runが実行される。

module Sidekiq
  class CLI
   def run
      boot_system
      if environment == "development" && $stdout.tty? && Sidekiq.log_formatter.is_a?(Sidekiq::Logger::Formatters::Pretty)
        print_banner
      end
      # 省略: radisのversionのチェックとかいろいろやる
      launch(self_read)
    end
    
    def launch(self_read)
      # 省略
      @launcher = Sidekiq::Launcher.new(options)
      begin
        launcher.run
        while (readable_io = IO.select([self_read]))
          signal = readable_io.first[0].gets.strip
          handle_signal(signal)
        end
        # 省略
      end
    end

https://github.com/mperham/sidekiq/blob/d16572bcdd0aa52985f9dc9e79f5179a6c828154/lib/sidekiq/cli.rb#L36

Launcher#runの中では、スレッドの作成、ポーリングの開始、Manager#startが呼ばれます。 ※Pollerは、N秒に一回scheduleされたjobがあればradisのqueueに入れるようなことをやってる。

module Sidekiq
  class Launcher
    def initialize(options)
      @manager = Sidekiq::Manager.new(options)
      @poller = Sidekiq::Scheduled::Poller.new
      @done = false
      @options = options
    end

    def run
      @thread = safe_thread("heartbeat", &method(:start_heartbeat))
      @poller.start
      @manager.start
    end
  end
end

https://github.com/mperham/sidekiq/blob/d16572bcdd0aa52985f9dc9e79f5179a6c828154/lib/sidekiq/launcher.rb#L34

Manager#newの中で並列実行数concurrencyの数だけ、worker(Processor.new(self))を作成して、Manager#startで、全てstartさせる。

module Sidekiq
  class Manager
    def initialize(options = {})
      # 省略
      @count = options[:concurrency] || 10
      @count.times do
        @workers << Processor.new(self)
      end
      # 省略
    end

    def start
      @workers.each do |x|
        x.start
      end
    end

https://github.com/mperham/sidekiq/blob/d16572bcdd0aa52985f9dc9e79f5179a6c828154/lib/sidekiq/manager.rb#L43

RadisからqueueのpupとJobの実行

Processorがメインの処理、ここでqueueから取り出したjobを実行している。
ざっくりとした流れは、

  1. runが実行され、process_oneが終了されるまで実行され続ける。
  2. process_oneの中でredisからdequeueして、引数workとしてprocessに渡す。
  3. processの中で、workから各種情報を引き出し、dispathを呼び出して、worker(jobのインスタンス)を作成して、execute_jobworker.performを実行し、処理を実行している。

※下記が読みやすいように、いろいろ省略したSidekiq:: Processorのコードです。

module Sidekiq
  class Processor
    def initialize(mgr)
      # 省略
      @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
      @reloader = Sidekiq.options[:reloader]
      @job_logger = (mgr.options[:job_logger] || Sidekiq::JobLogger).new
      @retrier = Sidekiq::JobRetry.new
    end
    
    # runが実行される。
    def start
      @thread ||= safe_thread("processor", &method(:run))
    end
    
    def run
      # ここがdoneになるまで繰り返されてるので、
      # queueから取り出されてjobを実行するという処理が、ずっと続く。
      process_one until @done
      @mgr.processor_stopped(self)
      # 省略
    end
    
    # ここで、radisからqueを取り出してjobに入れてprocessで処理を開始
    def process_one
      @job = fetch
      process(@job) if @job
      @job = nil
    end
    
    # 取得して、終了してたらもう一回queueに入れ直すような処理
    def fetch
      j = get_one
      if j && @done
        j.requeue
        nil
      else
        j
      end
    end
    
    # BasicFetch#retrieve_workでradisからパースしたオブジェクトを取得
    # https://github.com/mperham/sidekiq/blob/d16572bcdd0aa52985f9dc9e79f5179a6c828154/lib/sidekiq/fetch.rb#L36
    def get_one
      work = @strategy.retrieve_work
      # 省略
      work
      # 省略
    end

    # jobの実行処理
    def process(work)
      # このへんはradisから取り出したオブジェクトをいい感じにする処理
      jobstr = work.job
      queue = work.queue_name
     job_hash = Sidekiq.load_json(jobstr)

      ack = true
      begin
        # この中でmiddrewareを実行して、workerと引数を渡してjobを実行してる。
        dispatch(job_hash, queue) do |worker|
          Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
            execute_job(worker, cloned(job_hash["args"]))
          end
        end
      rescue Sidekiq::Shutdown
        ack = false
      rescue Sidekiq::JobRetry::Handled => h
        raise e
      rescue Exception => ex
        raise e
      ensure
        work.acknowledge if ack
      end
    end
    
    # worker(jobのインスタンス)のworker.performを呼び出している。
    def execute_job(worker, cloned_args)
      worker.perform(*cloned_args)
    end
    
    # ここでlog出したり、workerをjobのclassのインスタンスに変更してる。
    def dispatch(job_hash, queue)
      pristine = cloned(job_hash)

      @job_logger.with_job_hash_context(job_hash) do
        @retrier.global(pristine, queue) do
          @job_logger.call(job_hash, queue) do
            stats(pristine, queue) do
              @reloader.call do
                klass  = constantize(job_hash["class"])
                worker = klass.new
                worker.jid = job_hash["jid"]
                @retrier.local(worker, pristine, queue) do
                  yield worker
                end
              end
            end
          end
        end
      end
    end

https://github.com/mperham/sidekiq/blob/d16572bcdd0aa52985f9dc9e79f5179a6c828154/lib/sidekiq/processor.rb#L60

おわりに

今回はsidekiqがどのように動いているか、コードを読みながら概要をまとめてみました。

Radisからpush/popしているためjson形式のやり取りになり、symbolやmodelのインスタンス等を引数で渡すと、いろいろ問題が起きてしまうんですね。(ActiveJobはこの辺をよしなにやってくれてそうです。)

またコードを読む中で変数名が省略されている箇所があったので、PR送ったらmergeしてもらえたので、sidekiqのコントリビューターになりました🙌

github.com

(また地味にwikiも修正してたりします)
https://github.com/mperham/sidekiq/wiki/Best-Practices/_history