2015-10-05 46 views
11

我想實現一個可靠的隊列中有多個作家和使用Postgres數據庫多的讀者。如何在隊列讀取器掃描表時避免丟失行,然後在讀取後正在進行的事務提交。PostgreSQL的 - 實現一個可靠的隊列

我們有一個讀者選擇使用「檢查點」時,每個批次一個批次最後的時間戳之後獲得的行分批行,我們缺少的行。 (原因:時間戳值基於INSERT發生的時間(00.00.00)。在重負載下,如果事務需要更長的時間,比如插入10秒(00.00.10),讀者將錯過這一行(ROW1),如果它有10秒內讀取並找到列有它插入時間比ROW1稍後的時間(00.00.05)問題的完整描述是一個類似於寫在這個博客。http://blog.thefourthparty.com/stopping-time-in-postgresql/

上下文相關的現有問題:Postgres LISTEN/NOTIFY - low latency, realtime?

更新:我更新了從具有單一的讀者可以對不同讀者的問題。讀者閱讀的順序很重要。

+1

http://stackoverflow.com/q/6507475/330315和http://stackoverflow.com/q/22765054/330315可能的幫助。你可能也想看看這個:http://pgxn.org/dist/pg_message_queue/ –

+0

至關重要的是所有的行都是按*順序處理的?那麼如何定義訂單*精確*?或者你只是想避免丟失行?然後這裏提出類似的解決方案應該工作:[Postgres的更新,限制1](http://dba.stackexchange.com/a/69497/3684) –

+0

是否真的需要您這樣做PostgreSQL的?這是可以非常容易地通過redis完成的那種要求 – e4c5

回答

3

考慮多個閱讀器,有必要對其中記錄每個讀者已經接受了控制。

而且,它已經說的順序發送記錄讀者以及一個條件。因此,如果在更早的事務之前進行了一些進一步的事務處理,我們必須「停止」,並在事件發生時再次發送記錄,以維護髮送給讀者的記錄順序。

這就是說,檢查落實:

-- lets create our queue table 
drop table if exists queue_records cascade; 
create table if not exists queue_records 
(
    cod serial primary key, 
    date_posted timestamp default timeofday()::timestamp, 
    message text 
); 


-- lets create a table to save "checkpoints" per reader_id 
drop table if exists queue_reader_checkpoint cascade; 
create table if not exists queue_reader_checkpoint 
(
    reader_id text primary key, 
    last_checkpoint numeric 
); 



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text) 
RETURNS SETOF queue_records AS 
$BODY$ 
DECLARE 
    vLAST_CHECKPOINT numeric; 
    vCHECKPOINT_EXISTS integer; 
    vRECORD   queue_records%rowtype; 
BEGIN 

    -- let's get the last record sent to the reader 
    SELECT last_checkpoint 
    INTO vLAST_CHECKPOINT 
    FROM queue_reader_checkpoint 
    WHERE reader_id = pREADER_ID; 

    -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
    -- sets it to the last cod from queue. It means that reader will get records from now on. 
    if (vLAST_CHECKPOINT is null) then 
     -- sets the flag indicating the reader does not have any checkpoint recorded 
     vCHECKPOINT_EXISTS = 0; 
     -- gets the very last commited record 
     SELECT MAX(cod) 
     INTO vLAST_CHECKPOINT 
     FROM queue_records; 
    else 
     -- sets the flag indicating the reader already have a checkpoint recorded 
     vCHECKPOINT_EXISTS = 1; 
    end if; 

    -- now let's get the records from the queue one-by-one 
    FOR vRECORD IN 
      SELECT * 
      FROM queue_records 
      WHERE COD > vLAST_CHECKPOINT 
      ORDER BY COD 
    LOOP 

     -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order 
     if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then 

      -- let's save the last record read 
      vLAST_CHECKPOINT = vRECORD.COD; 

      -- and return it 
      RETURN NEXT vRECORD; 

     -- but, if it is not, then is out of order 
     else 
      -- the reason is some transaction did not commit yet, but there's another further transaction that alread did. 
      -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already; 
      exit; 
     end if; 
    END LOOP; 


    -- now we have to persist the last record read to be retrieved on next call 
    if (vCHECKPOINT_EXISTS = 0) then 
     INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT); 
    else   
     UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID; 
    end if; 
end; 
$BODY$ LANGUAGE plpgsql VOLATILE; 
+0

我確實需要有一些「檢查點」。你是否建議我爲此使用「鱈魚」?也順序將完全搞砸了。 – Chandra

+0

是命令一個條件來閱讀更多的記錄? – Christian

+0

我認爲在@ Chandra的用例中,有多個閱讀器都在閱讀同一張表,但可能在不同的時間和不同的速度。我不清楚鱈魚如何幫助他的用例。 –