2015-03-03 76 views
3

我已經在MySql Table和Java程序的幫助下實現了一個隊列。我想用Apache ActiveMQ實現以下程序,任何建議都非常感謝。如何在Java中使用Apache ActiveMQ實現基於條件的隊列

表名:XXXXX

Col's : id | Msg | key_id |Status 
----------------------------|------ 
     | 1 | Msg1| 1  | 1 
     | 2 | Msg2| 2  | 1 
     | 3 | Msg3| 1  | 0 
     | 4 | Msg4| 1  | 0 
     | 5 | Msg5| 4  | 0 
     | 6 | Msg6| 3  | 0 


while (true) { 

     try { 
      Thread.sleep(5000); 
      Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements . 
      add key_id in list. 
      update status of fetched record to 1 
      process the fetched record ... some business logic 
      set status to 2 if business logic works as expected,else status as 3 if exception occured 


      } catch (Exception ex) { 

} 

如何與Apache ActiveMQ的實施呢?
我多努力,這

 package com.jms.activemq.examples; 

    import javax.jms.Connection; 
    import javax.jms.ConnectionFactory; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.MessageProducer; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 
    import org.apache.activemq.ActiveMQConnection; 
    import org.apache.activemq.ActiveMQConnectionFactory; 

public class Sender { 

private ConnectionFactory factory = null; 
private Connection connection = null; 
private Session session = null; 
private Destination destination = null; 
private MessageProducer producer = null; 

public Sender() { 

} 

public void sendMessage() { 

    try { 
     factory = new ActiveMQConnectionFactory(
       ActiveMQConnection.DEFAULT_BROKER_URL); 
     connection = factory.createConnection(); 
     connection.start(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

     destination = session.createQueue("SAMPLEQUEUE"); 
     producer = session.createProducer(destination); 
     TextMessage message = session.createTextMessage(); 

     while (true) { 

      try { 
       Thread.sleep(5000); 
      Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements . 
      add key_id in list. 
      update status of fetched record to 1 
       message.setText("some data from Database"); 
       producer.send(message); 

       System.out.println("Sent: " + message.getText()); 
      } catch (Exception ex) { 

      } 
     } 


     }catch (JMSException e) { 
     e.printStackTrace(); 
    } 
    } 



    public static void main(String[] args) { 
     Sender sender = new Sender(); 
     sender.sendMessage(); 

    } 

    } 

器和接收器

 package com.jms.activemq.examples; 

    import javax.jms.Connection; 
    import javax.jms.ConnectionFactory; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.Message; 
    import javax.jms.MessageConsumer; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 
    import org.apache.activemq.ActiveMQConnection; 
    import org.apache.activemq.ActiveMQConnectionFactory; 

    public class Receiver { 

     private ConnectionFactory factory = null; 
     private Connection connection = null; 
     private Session session = null; 
     private Destination destination = null; 
     private MessageConsumer consumer = null; 

     public Receiver() { 

     } 

     public void receiveMessage() { 
      try { 
       factory = new ActiveMQConnectionFactory(
         ActiveMQConnection.DEFAULT_BROKER_URL); 
       connection = factory.createConnection(); 
       connection = factory.createConnection(); 
       connection.start(); 
       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
       destination = session.createQueue("SAMPLEQUEUE"); 
       consumer = session.createConsumer(destination); 
       Message message = consumer.receive(); 

       if (message instanceof TextMessage) { 
      process the fetched record ... some business logic 
      set status to 2 if business logic works as expected,else status as 3 if exception occured 

        TextMessage text = (TextMessage) message; 
        System.out.println("Message is : " + text.getText()); 

       } 
      } catch (JMSException e) { 
       e.printStackTrace(); 
      } 
     } 

     public static void main(String[] args) { 
      Receiver receiver = new Receiver(); 
      receiver.receiveMessage(); 
     } 
    } 

任何建議非常讚賞。

+0

而不是-ve投票plz建議的東西 – LMK 2015-03-03 07:39:52

+0

@mins謝謝,我有e dited – LMK 2015-03-03 10:13:21

回答

1

如果我正確地瞭解您的需求,您可以使用簡單的JDBC連接器,如:

public class SimpleJDBCConnector { 
    static final String JDBC_DRIVER = "org.postgresql.Driver"; 
    static final String DB_URL = "jdbc:postgresql://localhost:5432/postgres"; 
    static final String USER = "postgres"; 
    static final String PASS = "admin"; 

    Connection conn = null; 
    Statement stmt = null; 
    ResultSet rs = null; 

    public ResultSet executeQuery(String sql) throws SQLException { 
     stmt = conn.createStatement(); 
     return stmt.executeQuery(sql); 
    } 

    public void closeConnection() throws SQLException { 
     conn.close(); 
    } 

    public boolean connect() throws ClassNotFoundException, SQLException { 
      Class.forName(JDBC_DRIVER); 
      conn = DriverManager.getConnection(DB_URL, USER, PASS); 
     return true; 
    } 
} 

而下面的SQL查詢:

Set<Integer> set = new HashSet<Integer>(); 
int status=0; 
int id=0; 
int key_id=0; 

String sql1 = "SELECT id,msg,key_id,status FROM xxxx WHERE key_id NOT IN (" 
     +set.toString().replaceAll("[\\]\\[\\ ]","")+") AND status = "+status+ + " LIMIT 1;"; 

String sql2 = "UPDATE xxxx SET status="+status+" WHERE id="+id; 

爲了從響應中提取數據,你可以這樣做如下:

ResultSet rs = db.executeQuery(sql1); 
     while(rs.next()){ 
      id = rs.getInt("id"); 
      String msg = rs.getString("msg"); 
      key_id = rs.getInt("key_id"); 
      status = rs.getInt("status"); 
     }