Ruby Sidekiq的使用

Sidekiq 作为后台的一种异步定时任务队列处理服务,是通过什么方式去处理队列任务的,和Redis是怎么配合的?涉及到的一些技术原理一直比较模糊,有空总结了一下查看Sidekiq源码学到的东西。

Sidekiq启动时做了什么事情?

  1. require Rails APP上面的environment.rb里面的东西,可以让sidekiq中有rails中的执行环境。
  2. 新建默认的concurrency的数目,每个对应一个线程。
  3. 启动28个线程,一个是主线程,一个健康检查线程,一个scheduler线程,25个processor线程

什么时候去执行任务?

在manager start的时候去start processor,那些线程会有个循环在那里不间断的去队列里取任务出来执行。

从任务进入redis到任务执行都经过了哪些步骤?

  1. 开始时都是生成一些参数,其中参数会添加一些字段上去,有class, queue, jid, created_at等字段。
  2. 调用栈会调用到atomic_push这个方法,这个方法会区分开字段中是否有’at’那个字段,如果有那个字段,则把payloads批量的任务按[at, hash_json] 的格式加入到scheduled队列中去。否则的话,就把队列的名字加入到queues队列中去,同时把类参数那些东西加入到 queue:#{q}" 这个名字的队列中去。
  3. 有定时和立即两种情况,过段时间执行和立即执行,过段时间执行是按队列schedule权重去排序加入的,而立即执行时把队列加入到queues这个队列中,然后任务加入到 “queue:#{q}”中
  4. 至此任务加入队列就比较清晰了。

重试队列和定时队列是怎么样的?

在任务加入队列之前会先判断任务执行的时间,如果是调用perform_async方法的就是立即执行任务,任务的队列会被安排在queues队列中,同时队列的hash会被加入到名字加queue:#{q} 的队列中去,如果是调用perform_in 或者 perform_at 的,则会通过时间戳的方式去判断是否需要在item中加入 at 字段,如果是要立即执行就不需要加,如果是后面执行的,则会在把item先加入到scheduled队列中去。

scheduled队列中的任务什么时候加入到具体队列中去让线程执行?

这部分就是poller的工作了。poller单独开了个线程,不断的进行循环,遍历retry和schedule两个队列,这个类的工作是循环,然后取出两个队列中可以执行的任务,把那个任务加入到执行队列中,然后那25个线程遍历后就会把那些任务取出来执行,至此从任务的进入到安排执行的整个过程就可以走通了

重试的原理

在processor执行dispatch方法的block中拦截错误,看是什么情况的错误,如果重试的开关没有关闭而且可以重试就吧任务加入到retry队列中去。

为什么执行立即执行任务和有时间点执行的任务加入队列的方式不一样?

当时定时任务时,就按照时间的顺序排序加入到集合当中,如果是立即任务,则立即加入到任务队列中去,而这时候调用的sadd方法其实是在队列queues 加入不重复的元素。所有会有两种不一样的处理方式,而把执行队列加入到 queues 中是为了给api使用,入web界面。

定时任务执行的原理

其实是把sidekiq的laucher类的加入队列的方法给重写了,另外起了一个poller实例去循环监控定时任务队列的任务,如果有可以执行的,就加入到队列中去。

ActiveJob和sidekiq

  • 通过求得adapter,然后:

    def enqueue(job) #:nodoc:
      # Sidekiq::Client does not support symbols as keys
      job.provider_job_id = Sidekiq::Client.push \
        "class"   => JobWrapper,
        "wrapped" => job.class.to_s,
        "queue"   => job.queue_name,
        "args"    => [ job.serialize ]
    end
    
    

    加入到队列中去,连接起了activejob和sidekiq

activejob是sidekiq和requeue等其它后台服务的中间适配器,异步任务的参数可以用object是在activejob中用globalid来处理过的,在从队列中取出来后再反序列化处理把object拿出来使用就可以了。但是sidekiq中的使用确实通过 perform_asyncperform_inperform_at 等方法来进行加入队列的,所以这些对参数的处理不一样,如果是通过activejob来处理,则是上面的处理方法,如果是通过sidekiq来出来,却是通过上面的那三中方法来出来的。

总结

在调用代码是先把任务加入到队列中,这是需要判断任务是哪种类型,如果是立即的,那就直接放入到队列中,如果是定时的,则先把任务放入到scheduled队列中,然后再由poller按时间顺序轮询安排,如果有需要执行的任务就放入队列中去。然后开启的25个workers就会轮询队列中的任务,不间断的取出来执行。到此,整个流程就走通了。