我是HornetQ的新手,請耐心等待。我首先要告訴你我的要求:HornetQ消息在使用核心API後仍然在隊列中
我需要一個消息隊列中間件,它可以在不同的進程之間傳遞大小約爲1k的消息,具有低延遲和持久性(即應該能夠在系統崩潰時倖存)。我將有多個進程寫入相同的隊列,同樣的多個進程從同一個隊列讀取。
爲此,我選擇了HornetQ,因爲它具有持久性消息傳遞的最佳評分。
我目前usung HornetQ的v2.2.2Final爲獨立服務器。
我能夠成功地創建使用核心API(ClientSession中)持久/非持久隊列,併成功發佈消息排隊(ClientProducer)。
同樣,我可以使用核心API (ClientConsumer)從隊列中讀取消息。
問題出現在客戶端讀取完消息後,消息仍然保留在隊列中,即隊列中的消息數保持不變。也許我得到了這個錯誤,但我的印象是,一旦這條消息被消耗,就會從隊列中移除。但是這並不是發生在我的情況中,同樣的消息正在一遍又一遍地閱讀。
此外,我想告訴我,我已經嘗試使用非持久消息的非持久隊列。但問題依然存在。
規範生產,我現在用:
public class HQProducer implements Runnable {
private ClientProducer producer;
private boolean killme;
private ClientSession session;
private boolean durableMsg;
public HQProducer(String host, int port, String address, String queueName,
boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
this.durableMsg = durableMsg;
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
if (queueExists(queueName)) {
if (deleteQ) {
System.out.println("Deleting existing queue :: " + queueName);
session.deleteQueue(queueName);
System.out.println("Creating queue :: " + queueName);
session.createQueue(address, queueName, true);
}
} else {
System.out.println("Creating new queue :: " + queueName);
session.createQueue(address, queueName, durable);
}
producer = session.createProducer(SimpleString.toSimpleString(address), pRate);
killme = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killme) {
try {
ClientMessage message = session.createMessage(durableMsg);
message.getBodyBuffer().writeString("Hello world");
producer.send(message);
cnt++;
timediff = ((System.currentTimeMillis() - time)/1000);
if (timediff >= 1) {
System.out.println("Producer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killme) {
this.killme = killme;
}
private boolean queueExists(String qname) {
boolean res = false;
try {
//ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
if (queueQuery.isExists()) {
res = true;
}
} catch (HornetQException ex) {
res = false;
}
return res;
}
}
也爲消費者的代碼是:
public class HQConsumer implements Runnable {
private ClientSession session;
private ClientConsumer consumer;
private boolean killMe;
public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
session.start();
consumer = session.createConsumer(queueName, "",0,-1,browseOnly);
killMe = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killMe) {
try {
ClientMessage msgReceived = consumer.receive();
msgReceived.acknowledge();
//System.out.println("message = " + msgReceived.getBodyBuffer().readString());
cnt++;
timediff = ((System.currentTimeMillis() - time)/1000);
if (timediff >= 1) {
System.out.println("ConSumer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killMe) {
this.killMe = killMe;
}
}
HornetQ的服務器配置::
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
<connector name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>
的A/C [本](HTTP:// docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354)你需要在處理完畢後確認消息,你是否也這樣做? –