31

我有一個過程,通常需要幾秒鐘才能完成,所以我試圖使用delayed_job異步處理它。這項工作本身運作良好,我的問題是如何去投票工作,以確定它是否完成。輪詢與delayed_job

我可以簡單地把它賦值給一個變量得到的delayed_job的ID:

工作= Available.delay.dosomething(:VAR => 1234)

+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+ 
| id | priority | attempts | handler | last_error | run_at  | locked_at | failed_at | locked_by | created_at | updated_at | 
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+ 
| 4037 | 0  | 0  | --- !ru... |   | 2011-04-... |   |   |   | 2011-04... | 2011-04-... | 
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+ 

但只要它完成這項工作它刪除,並查找已完成備案返回一個錯誤:

@job=Delayed::Job.find(4037) 

ActiveRecord::RecordNotFound: Couldn't find Delayed::Backend::ActiveRecord::Job with ID=4037 

@job= Delayed::Job.exists?(params[:id]) 

我應該刻意去改變這一點,也許推遲的完整記錄的刪除?我不知道我還能得到它的狀態通知。或者投票死亡記錄作爲完成證明好嗎?其他人面對類似的東西嗎?

+0

我面對的另一個問題或障礙是,我正在抵消這項工作,因爲它會捆綁我的服務器。我詢問數據庫是否存在某些具有日期的數據,如果他們不存在或過期我獲取新數據,但將它作爲單獨的作業來使用,我使用AJAX,然後輪詢它直到完成...然後運行再次查詢新數據。希望更快,但也更復雜。 – holden 2011-04-13 20:00:36

+0

是否會使用resque&redis之類的東西,並基本緩存返回的對象,爲數據庫節省額外往返時間並使輪詢速度更快?我從來沒有碰過redis或resque,所以我想我會把它扔到那裏。 – holden 2011-04-13 20:06:15

回答

14

我最終使用了Delayed_Job和after(job)回調的組合,它使用與創建的作業相同的ID填充一個memcached對象。通過這種方式,我最小化了我訪問數據庫時詢問作業狀態的次數,而不是輪詢memcached對象。它包含了我完成的工作所需的全部對象,所以我甚至沒有往返請求。我從github的一篇文章中得到了這個想法,他們做了幾乎相同的事情。

https://github.com/blog/467-smart-js-polling

,並用於輪詢一個jQuery插件,該調查不那麼頻繁,並有一定的重試次數

https://github.com/jeremyw/jquery-smart-poll

似乎工作大後放棄。

def after(job) 
    prices = Room.prices.where("space_id = ? AND bookdate BETWEEN ? AND ?", space_id.to_i, date_from, date_to).to_a 
    Rails.cache.fetch(job.id) do 
     bed = Bed.new(:space_id => space_id, :date_from => date_from, :date_to => date_to, :prices => prices) 
    end 
    end 
1

我建議,如果它得到了作業已完成通知是很重要的,然後寫一個自定義的工作對象和隊列而不是依賴於當你調用Available.delay.dosomething是被排隊的默認工作。創建一個對象是這樣的:

class DoSomethingAvailableJob 

    attr_accessor options 

    def initialize(options = {}) 
    @options = options 
    end 

    def perform 
    Available.dosomething(@options) 
    # Do some sort of notification here 
    # ... 
    end 
end 

與排隊吧:

Delayed::Job.enqueue DoSomethingAvailableJob.new(:var => 1234) 
+0

是的,我知道我可以編寫之後,成功或錯誤回調,但工作創建/更新〜50-100成功記錄在數據庫中,因爲我正在輪詢它,我正在尋找的東西來查找,而不是能夠推動某些事物。如果這是有道理的。 – holden 2011-04-13 19:11:52

+0

我正在使用ajax請求來判斷它是否完成,這就是爲什麼我不能推動它,除非我用套接字做了一些瘋狂的事情。 – holden 2011-04-13 19:44:08

+0

我在想更多的「更新某處的標誌」,您的AJAX請求可以進行輪詢。 – nickgrim 2011-04-13 19:53:22

1

的delayed_jobs表在您的應用程序旨在提供運行的狀態,只有排隊的作業。它不是一張永久性表格,出於性能原因,它應該儘可能小。這就是爲什麼完成後立即刪除工作。

相反,您應該將字段添加到您的Available模型中,表示該作業已完成。由於我通常對工作需要處理的時間感興趣,因此我添加了start_time和end_time字段。然後我的dosomething方法看起來像這樣:

def self.dosomething(model_id) 

model = Model.find(model_id) 

    begin 
    model.start! 

    # do some long work ... 

    rescue Exception => e 
     # ... 
    ensure 
     model.finish! 
    end 
end 

開始!並完成!方法只需記錄當前時間並保存模型。然後我會有一個completed?方法,您的AJAX可以輪詢以查看該作業是否完成。

def completed? 
    return true if start_time and end_time 
    return false 
end 

有很多方法可以做到這一點,但我覺得這種方法很簡單,適用於我。

44

讓我們從API開始。我想有以下的東西。

@available.working? # => true or false, so we know it's running 
@available.finished? # => true or false, so we know it's finished (already ran) 

現在讓我們來寫這份工作。

class AwesomeJob < Struct.new(:options) 

    def perform 
    do_something_with(options[:var]) 
    end 

end 

到目前爲止好。我們有一份工作。現在讓我們編寫將它排入隊伍的邏輯。由於Available是負責這項工作的模型,讓我們教它如何開始這項工作。

class Available < ActiveRecord::Base 

    def start_working! 
    Delayed::Job.enqueue(AwesomeJob.new(options)) 
    end 

    def working? 
    # not sure what to put here yet 
    end 

    def finished? 
    # not sure what to put here yet 
    end 

end 

那麼我們如何知道工作是否正常工作?有幾種方法,但在rails中,當我的模型創建某些東西時,它通常與此相關。我們如何聯繫?在數據庫中使用ID。讓我們在Available模型上添加一個job_id

雖然我們處於這個狀態,但我們如何知道這項工作是因爲工作已經完成或者尚未開工?一種方法是實際檢查工作實際上做了什麼。如果它創建了一個文件,請檢查文件是否存在。如果它計算出一個值,檢查結果是否寫入。有些工作不容易檢查,因爲他們的工作可能沒有明確的可證實的結果。對於這種情況,您可以在模型中使用標誌或時間戳。假設這是我們的情況,讓我們添加一個job_finished_at時間戳來區分尚未運行作業從已完成一。

class AddJobIdToAvailable < ActiveRecord::Migration 
    def self.up 
    add_column :available, :job_id, :integer 
    add_column :available, :job_finished_at, :datetime 
    end 

    def self.down 
    remove_column :available, :job_id 
    remove_column :available, :job_finished_at 
    end 
end 

好的。所以,現在讓我們通過修改start_working!方法,在我們排隊工作後立即將Available與其工作聯繫起來。

def start_working! 
    job = Delayed::Job.enqueue(AwesomeJob.new(options)) 
    update_attribute(:job_id, job.id) 
end 

太好了。在這一點上,我可以寫belongs_to :job,但我們並不真的需要。

所以現在我們知道如何編寫working?方法,這麼簡單。

def working? 
    job_id.present? 
end 

但是,我們如何標記工作完成?沒有人知道工作比工作本身更好。因此,讓我們將available_id傳入作業(作爲選項之一)並將其用於作業。爲此,我們需要修改start_working!方法來傳遞id。

def start_working! 
    job = Delayed::Job.enqueue(AwesomeJob.new(options.merge(:available_id => id)) 
    update_attribute(:job_id, job.id) 
end 

而且我們應該添加邏輯到作業當它完成更新我們job_finished_at時間戳。

class AwesomeJob < Struct.new(:options) 

    def perform 
    available = Available.find(options[:available_id]) 
    do_something_with(options[:var]) 

    # Depending on whether you consider an error'ed job to be finished 
    # you may want to put this under an ensure. This way the job 
    # will be deemed finished even if it error'ed out. 
    available.update_attribute(:job_finished_at, Time.current) 
    end 

end 

有了這個代碼在地方,我們知道該怎麼寫我們finished?方法。

def finished? 
    job_finished_at.present? 
end 

我們完成了。現在我們只需輪詢@available.working?@available.finished?此外,您還可以通過檢查@available.job_id來獲知爲您的可用創建的確切工作的便利性。通過說belongs_to :job可以很容易地將它變成真正的關聯。

13

我認爲最好的方法是使用delayed_job中可用的回調函數。 這些是: :成功,:錯誤和:之後。 所以你可以把一些代碼在你的模型後:

class ToBeDelayed 
    def perform 
    # do something 
    end 

    def after(job) 
    # do something 
    end 
end 

因爲如果你堅持使用obj.delayed.method的,那麼你就必須猴補丁延遲:: PerformableMethod並添加after方法。恕我直言,它遠遠好於輪詢一些可能甚至是後端特定的值(例如ActiveRecord與Mongoid)。

+0

回調工作很好。當作業完成或失敗時,現在我可以做我自己的邏輯。謝謝。 – 2011-12-20 06:30:17

+0

這些服務器端回調將如何幫助客戶嘗試確定作業何時完成? – Yarin 2014-02-02 17:44:51

+1

後回調可以:1.設置一些標誌,2.客戶端將輪詢此標誌變爲true,或者使用消息系統作爲faye通過websocket通知客戶端該進程已完成。 – Roman 2014-02-04 06:57:26

5

解決這個問題的最簡單的方法是改變你的投票行動,類似於下面的內容:

def poll 
    @job = Delayed::Job.find_by_id(params[:job_id]) 

    if @job.nil? 
    # The job has completed and is no longer in the database. 
    else 
    if @job.last_error.nil? 
     # The job is still in the queue and has not been run. 
    else 
     # The job has encountered an error. 
    end 
    end 
end 

爲什麼這項工作?當Delayed::Job從隊列中運行作業時,如果成功,則將其從數據庫中刪除。如果作業失敗,記錄將保留在隊列中以便稍後再次運行,並且last_error屬性設置爲遇到的錯誤。使用上述兩項功能,您可以可以檢查已刪除的記錄以查看它們是否成功。

上述方法的好處是:

  • 你得到你要找的人在你原來的職位
  • 使用一個簡單的邏輯分支輪詢效果,可以向用戶提供的反饋,如果有在處理作業錯誤

你可以做一些類似以下封裝在一個模型的方法這個功能:

# Include this in your initializers somewhere 
class Queue < Delayed::Job 
    def self.status(id) 
    self.find_by_id(id).nil? ? "success" : (job.last_error.nil? ? "queued" : "failure") 
    end 
end 

# Use this method in your poll method like so: 
def poll 
    status = Queue.status(params[:id]) 
    if status == "success" 
     # Success, notify the user! 
    elsif status == "failure" 
     # Failure, notify the user! 
    end 
end 
+0

非常簡單,看似有效。這種技術有哪些缺點或缺點? – Leopd 2012-05-30 15:50:49

+1

唯一的缺點是,檢查已刪除的記錄似乎有些「黑客」,而不是使用回調/觀察者系統,這在Rails中是首選。 – 2012-05-30 18:45:40

+1

+1,因爲它很簡單。 – 2012-07-11 16:29:51