最近Sidekiqを扱うことが割とあり、ActiveJob経由で使ったときの挙動をいまいち把握出来てなかったのでコードを読んでみて、内容とかをざっくりメモしておきます📝
ちなみに読んだのはrailsの6-stableブランチのコードです。
rails/activejob at 6-0-stable · rails/rails · GitHub
perform_laterの実行
まずperform_later
の定義されているところを見ていきます。job_or_instantiate
で引数を元に自分自身(Job)をオブジェクト化して、enqueue
を呼び出している。
module ActiveJob module Enqueuing extend ActiveSupport::Concern module ClassMethods def perform_later(*args) job_or_instantiate(*args).enqueue end
https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/enqueuing.rb#L21
enqueue
はqueue_adapter.enqueue self
を実行して、自分自身(Jobのオブジェクト)を引数にadapter
のenqueue
を呼び出している👀
module ActiveJob module Enqueuing extend ActiveSupport::Concern module ClassMethods def enqueue(options = {}) self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] self.scheduled_at = options[:wait_until].to_f if options[:wait_until] self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] self.priority = options[:priority].to_i if options[:priority] successfully_enqueued = false run_callbacks :enqueue do if scheduled_at self.class.queue_adapter.enqueue_at self, scheduled_at else self.class.queue_adapter.enqueue self end successfully_enqueued = true end if successfully_enqueued self else if self.class.return_false_on_aborted_enqueue false else ActiveSupport::Deprecation.warn( "Rails 6.1 will return false when the enqueuing is aborted. Make sure your code doesn't depend on it" \ " returning the instance of the job and set `config.active_job.return_false_on_aborted_enqueue = true`" \ " to remove the deprecations." ) self end end end
https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/enqueuing.rb#L48
adapterによるenqueueの実行
adapter
は下記のように色々あるが、
https://github.com/rails/rails/tree/6-0-stable/activejob/lib/active_job/queue_adapters
今回はsidekiqの場合を読んでいく。ActiveJobを使っても結局はSidekiq::Client.push
が実行されてRedisへのenqueueされている。引数に関しては、そのまま渡すのではなくserialize
を呼び出してシリアライズをかけている。
module ActiveJob module QueueAdapters class SidekiqAdapter def enqueue(job) #:nodoc: # Sidekiq::Client does not support symbols as keys job.provider_job_id = Sidekiq::Client.push \ "class" => JobWrapper, "wrapped" => job.class, "queue" => job.queue_name, "args" => [ job.serialize ] end
Sidekiq::Client.push
の挙動を知りたいかたはこちら
引数のシリアライズ
serialize
内では引数で渡す値を色々シリアライズをかけている。気になるarguments
のシリアライズはArguments.serialize
を呼び出している。
module ActiveJob module Core extend ActiveSupport::Concern def serialize { "job_class" => self.class.name, "job_id" => job_id, "provider_job_id" => provider_job_id, "queue_name" => queue_name, "priority" => priority, "arguments" => serialize_arguments_if_needed(arguments), "executions" => executions, "exception_executions" => exception_executions, "locale" => I18n.locale.to_s, "timezone" => Time.zone&.name, "enqueued_at" => Time.now.utc.iso8601 } end private def serialize_arguments_if_needed(arguments) if arguments_serialized? @serialized_arguments else serialize_arguments(arguments) end end def serialize_arguments(arguments) Arguments.serialize(arguments) end
https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/core.rb#L93
Arguments.serialize
の中で引数をserializer
でいろいろなシリアライズをかけている。だからActiveJobを使うとオブジェクトを引数に渡してもよしなにやってくれるんですね👀
module Arguments extend self def serialize(arguments) arguments.map { |argument| serialize_argument(argument) } end private def serialize_argument(argument) case argument when *PERMITTED_TYPES argument when GlobalID::Identification convert_to_global_id_hash(argument) when Array argument.map { |arg| serialize_argument(arg) } when ActiveSupport::HashWithIndifferentAccess serialize_indifferent_hash(argument) when Hash symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s) aj_hash_key = if Hash.ruby2_keywords_hash?(argument) RUBY2_KEYWORDS_KEY else SYMBOL_KEYS_KEY end result = serialize_hash(argument) result[aj_hash_key] = symbol_keys result when -> (arg) { arg.respond_to?(:permitted?) } serialize_indifferent_hash(argument.to_h) else Serializers.serialize(argument) end end
https://github.com/rails/rails/blob/6-0-stable/activejob/lib/active_job/arguments.rb#L33
※適用されるserializerは下記にまとめられている。 https://github.com/rails/rails/tree/6-0-stable/activejob/lib/active_job/serializers
おわりに
今回は、ActiveJobのperform_later
がどのようにadapter(Sidekiq等)を使ってenqueueしているのかを見てみました。 流れとしては、引数のシリアライズを行って、adapterを使ってenqueueしているようか動きなんですね。(Sidekiqの場合はSidekiq::Client.push
を呼び出している。
Sidekiqだと通常、シンボルやオブジェクトといった値を引数として指定できないのですが、ActiveJobを使っていると事前にシリアライズ処理が走るのでよしなにやってくれて上手くいくようになっているのは便利ですねー✨