考慮多個閱讀器,有必要對其中記錄每個讀者已經接受了控制。
而且,它已經說的順序發送記錄讀者以及一個條件。因此,如果在更早的事務之前進行了一些進一步的事務處理,我們必須「停止」,並在事件發生時再次發送記錄,以維護髮送給讀者的記錄順序。
這就是說,檢查落實:
-- lets create our queue table
drop table if exists queue_records cascade;
create table if not exists queue_records
(
cod serial primary key,
date_posted timestamp default timeofday()::timestamp,
message text
);
-- lets create a table to save "checkpoints" per reader_id
drop table if exists queue_reader_checkpoint cascade;
create table if not exists queue_reader_checkpoint
(
reader_id text primary key,
last_checkpoint numeric
);
CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
RETURNS SETOF queue_records AS
$BODY$
DECLARE
vLAST_CHECKPOINT numeric;
vCHECKPOINT_EXISTS integer;
vRECORD queue_records%rowtype;
BEGIN
-- let's get the last record sent to the reader
SELECT last_checkpoint
INTO vLAST_CHECKPOINT
FROM queue_reader_checkpoint
WHERE reader_id = pREADER_ID;
-- if vLAST_CHECKPOINT is null (this is the very first time of reader_id),
-- sets it to the last cod from queue. It means that reader will get records from now on.
if (vLAST_CHECKPOINT is null) then
-- sets the flag indicating the reader does not have any checkpoint recorded
vCHECKPOINT_EXISTS = 0;
-- gets the very last commited record
SELECT MAX(cod)
INTO vLAST_CHECKPOINT
FROM queue_records;
else
-- sets the flag indicating the reader already have a checkpoint recorded
vCHECKPOINT_EXISTS = 1;
end if;
-- now let's get the records from the queue one-by-one
FOR vRECORD IN
SELECT *
FROM queue_records
WHERE COD > vLAST_CHECKPOINT
ORDER BY COD
LOOP
-- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then
-- let's save the last record read
vLAST_CHECKPOINT = vRECORD.COD;
-- and return it
RETURN NEXT vRECORD;
-- but, if it is not, then is out of order
else
-- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
-- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
exit;
end if;
END LOOP;
-- now we have to persist the last record read to be retrieved on next call
if (vCHECKPOINT_EXISTS = 0) then
INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
else
UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
end if;
end;
$BODY$ LANGUAGE plpgsql VOLATILE;
http://stackoverflow.com/q/6507475/330315和http://stackoverflow.com/q/22765054/330315可能的幫助。你可能也想看看這個:http://pgxn.org/dist/pg_message_queue/ –
至關重要的是所有的行都是按*順序處理的?那麼如何定義訂單*精確*?或者你只是想避免丟失行?然後這裏提出類似的解決方案應該工作:[Postgres的更新,限制1](http://dba.stackexchange.com/a/69497/3684) –
是否真的需要您這樣做PostgreSQL的?這是可以非常容易地通過redis完成的那種要求 – e4c5