Madogiwa Blog

主に技術系の学習メモに使っていきます。

RubyonRails: ActiveJobのperform_laterが実行されてadapterがenqueueするまでの挙動を追ってみた。

最近Sidekiqを扱うことが割とあり、ActiveJob経由で使ったときの挙動をいまいち把握出来てなかったのでコードを読んでみて、内容とかをざっくりメモしておきます📝

ちなみに読んだのはrailsの6-stableブランチのコードです。

rails/activejob at 6-0-stable · rails/rails · GitHub

perform_laterの実行

まずperform_laterの定義されているところを見ていきます。job_or_instantiateで引数を元に自分自身(Job)をオブジェクト化して、enqueueを呼び出している。

module ActiveJob
  module Enqueuing
    extend ActiveSupport::Concern

    module ClassMethods
      def perform_later(*args)
        job_or_instantiate(*args).enqueue
      end

https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/enqueuing.rb#L21

enqueuequeue_adapter.enqueue selfを実行して、自分自身(Jobのオブジェクト)を引数にadapterenqueueを呼び出している👀

module ActiveJob
  module Enqueuing
    extend ActiveSupport::Concern
    module ClassMethods

    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      successfully_enqueued = false

      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end

        successfully_enqueued = true
      end

      if successfully_enqueued
        self
      else
        if self.class.return_false_on_aborted_enqueue
          false
        else
          ActiveSupport::Deprecation.warn(
            "Rails 6.1 will return false when the enqueuing is aborted. Make sure your code doesn't depend on it" \
            " returning the instance of the job and set `config.active_job.return_false_on_aborted_enqueue = true`" \
            " to remove the deprecations."
          )

          self
        end
      end
    end

https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/enqueuing.rb#L48

adapterによるenqueueの実行

adapterは下記のように色々あるが、

https://github.com/rails/rails/tree/6-0-stable/activejob/lib/active_job/queue_adapters

今回はsidekiqの場合を読んでいく。ActiveJobを使っても結局はSidekiq::Client.pushが実行されてRedisへのenqueueされている。引数に関しては、そのまま渡すのではなくserializeを呼び出してシリアライズをかけている。

module ActiveJob
  module QueueAdapters
    class SidekiqAdapter
      def enqueue(job) #:nodoc:
        # Sidekiq::Client does not support symbols as keys
        job.provider_job_id = Sidekiq::Client.push \
          "class"   => JobWrapper,
          "wrapped" => job.class,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end

https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb#L20

Sidekiq::Client.pushの挙動を知りたいかたはこちら

madogiwa0124.hatenablog.com

引数のシリアライズ

serialize内では引数で渡す値を色々シリアライズをかけている。気になるargumentsシリアライズArguments.serializeを呼び出している。

module ActiveJob
  module Core
    extend ActiveSupport::Concern
    def serialize
      {
        "job_class"  => self.class.name,
        "job_id"     => job_id,
        "provider_job_id" => provider_job_id,
        "queue_name" => queue_name,
        "priority"   => priority,
        "arguments"  => serialize_arguments_if_needed(arguments),
        "executions" => executions,
        "exception_executions" => exception_executions,
        "locale"     => I18n.locale.to_s,
        "timezone"   => Time.zone&.name,
        "enqueued_at" => Time.now.utc.iso8601
      }
    end

    private
      def serialize_arguments_if_needed(arguments)
        if arguments_serialized?
          @serialized_arguments
        else
          serialize_arguments(arguments)
        end
      end

      def serialize_arguments(arguments)
        Arguments.serialize(arguments)
      end

https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/core.rb#L93

Arguments.serializeの中で引数をserializerでいろいろなシリアライズをかけている。だからActiveJobを使うとオブジェクトを引数に渡してもよしなにやってくれるんですね👀

  module Arguments
    extend self
    def serialize(arguments)
      arguments.map { |argument| serialize_argument(argument) }
    end

    private

      def serialize_argument(argument)
        case argument
        when *PERMITTED_TYPES
          argument
        when GlobalID::Identification
          convert_to_global_id_hash(argument)
        when Array
          argument.map { |arg| serialize_argument(arg) }
        when ActiveSupport::HashWithIndifferentAccess
          serialize_indifferent_hash(argument)
        when Hash
          symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s)
          aj_hash_key = if Hash.ruby2_keywords_hash?(argument)
            RUBY2_KEYWORDS_KEY
          else
            SYMBOL_KEYS_KEY
          end
          result = serialize_hash(argument)
          result[aj_hash_key] = symbol_keys
          result
        when -> (arg) { arg.respond_to?(:permitted?) }
          serialize_indifferent_hash(argument.to_h)
        else
          Serializers.serialize(argument)
        end
      end

https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/arguments.rb#L33

※適用されるserializerは下記にまとめられている。 https://github.com/rails/rails/tree/6-0-stable/activejob/lib/active_job/serializers

おわりに

今回は、ActiveJobのperform_laterがどのようにadapter(Sidekiq等)を使ってenqueueしているのかを見てみました。 流れとしては、引数のシリアライズを行って、adapterを使ってenqueueしているようか動きなんですね。(Sidekiqの場合はSidekiq::Client.pushを呼び出している。

Sidekiqだと通常、シンボルやオブジェクトといった値を引数として指定できないのですが、ActiveJobを使っていると事前にシリアライズ処理が走るのでよしなにやってくれて上手くいくようになっているのは便利ですねー✨