2016-03-01 49 views
1

當delayed_job將新作業從隊列中拉出時,它是否先按優先級對隊列進行排序?如果沒有,那麼我猜測低優先級的作業可以在高優先級之前運行,因爲「read_ahead」。在delayed_job中,優先級如何與read_ahead進行交互?

從delayed_job的文檔:

的默認行爲是要找到一個 可用作業時,從隊列中讀取晚五點的工作。您可以通過設置 Delayed :: Worker.read_ahead來進行配置。

示例:我添加100個優先級爲10的作業(優先級較低的優先執行)。然後我添加1個優先級爲0的作業。如果我使用5的默認read_ahead,在找到我的高優先級作業之前,delayed_job是否需要先處理96個作業?

回答

2

我有一個類似的問題,挖入源代碼找到答案 - 這是假設你使用delayed_job_active_record。在後端/ active_record.rb:

class Job < ::ActiveRecord::Base 
    scope :by_priority, lambda { order("priority ASC, run_at ASC") } 

    def self.reserve(worker, max_run_time = Worker.max_run_time) # rubocop:disable CyclomaticComplexity 
     # scope to filter to records that are "ready to run" 
     ready_scope = ready_to_run(worker.name, max_run_time) 

     # scope to filter to the single next eligible job 
     ready_scope = ready_scope.where("priority >= ?", Worker.min_priority) if Worker.min_priority 
     ready_scope = ready_scope.where("priority <= ?", Worker.max_priority) if Worker.max_priority 
     ready_scope = ready_scope.where(queue: Worker.queues) if Worker.queues.any? 
     ready_scope = ready_scope.by_priority 

     reserve_with_scope(ready_scope, worker, db_time_now) 
    end 

    def self.reserve_with_scope(ready_scope, worker, now) 
     # Optimizations for faster lookups on some common databases 
     case connection.adapter_name 
     when "PostgreSQL" 
     quoted_table_name = connection.quote_table_name(table_name) 
     subquery_sql  = ready_scope.limit(1).lock(true).select("id").to_sql 
     reserved   = find_by_sql(["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql}) RETURNING *", now, worker.name]) 
     reserved[0] 
     when "MySQL", "Mysql2" 
     now = now.change(usec: 0) 
     count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) 
     return nil if count == 0 
     where(locked_at: now, locked_by: worker.name, failed_at: nil).first 
     when "MSSQL", "Teradata" 
     subsubquery_sql = ready_scope.limit(1).to_sql 
     subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x" 
     quoted_table_name = connection.quote_table_name(table_name) 
     sql = ["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})", now, worker.name] 
     count = connection.execute(sanitize_sql(sql)) 
     return nil if count == 0 
     where(locked_at: now, locked_by: worker.name, failed_at: nil).first 
     else 
     reserve_with_scope_using_default_sql(ready_scope, worker, now) 
     end 
    end 

    def self.reserve_with_scope_using_default_sql(ready_scope, worker, now) 
     # This is our old fashion, tried and true, but slower lookup 
     ready_scope.limit(worker.read_ahead).detect do |job| 
     count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name) 
     count == 1 && job.reload 
     end 
    end 

所以看起來優先級是否優先 - 當DelayedJob是找到下一個可用的工作保留並運行,它通過排名第一優先試圖限制由「read_ahead」結果前。

事實上,最後一個方法reserve_with_scope_using_default_sql是唯一提到「read_ahead」的地方,所以如果你使用的是Postgres或MySQL,它會自動選擇最高優先級的作業(限制1)並忽略「read_ahead」 。