2016-09-16 704 views
0

我試圖從TOPICS取消訂閱持久訂閱者。使用ActiveMQ取消訂閱持久訂閱者

我的應用程序是一種社交網絡:每個用戶都是其他用戶的話題。所以,每次用戶做某事時,他的朋友都會收到通知。當然,訂閱者可以取消訂閱某個主題,不想接收關於用戶的通知。

每次我試圖退訂主題的訂戶,我有一個錯誤告訴我:「的javax.jms.JMSException:耐用消費品在使用

這裏是我的2類,SENDER一個和RECEIVER一個。有人能告訴我我做錯了什麼嗎?

SENDER類:

package com.citizenweb.classes; 

import java.util.Date; 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageFormatException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 
import javax.jms.ObjectMessage; 
import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 

import com.citizenweb.interfaces.PostIF; 
import com.citizenweb.interfaces.UserIF; 

public class Sender { 

    private ActiveMQConnectionFactory factory = null; 
    private ActiveMQConnection connection = null; 
    private ActiveMQSession session = null; 
    private Destination destination = null; 
    private MessageProducer producer = null; 

    public Sender() { 
    } 

    public void connect(){ 
     try{ 
      factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
      // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production 
      factory.setTrustAllPackages(true); 
      connection = (ActiveMQConnection) factory.createConnection(); 
      connection.start(); 
      session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     } catch (JMSException e){ 
      e.printStackTrace(); 
     } 
    } 

    public void sendPost(UserIF user,PostIF post) { 
     if(session==null){connect();} 
     try { 
      destination = session.createTopic(user.toString()); 
      producer = session.createProducer(destination); 
      ObjectMessage postMessage = session.createObjectMessage(); 
      postMessage.setObject(post); 
      producer.send(postMessage); 
      System.out.println("\n SENDER Object message sent"); 



     } catch (MessageFormatException e) { 
      e.printStackTrace(); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void sendInformation(UserIF user,String info){ 
     if(session==null){connect();} 
     try { 
      destination = session.createTopic(user.toString()); 
      producer = session.createProducer(destination); 
      TextMessage infoMessage = session.createTextMessage(); 
      infoMessage.setText(info); 
      producer.send(infoMessage); 
      System.out.println("\n SENDER Information message sent"); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    /** 
    * @param args 
    * @throws Exception 
    */ 
    public static void main(String[] args) throws Exception { 

     UserIF u1, u2, u3; 
     String[] nom = new String[5]; 
     String[] prenom = new String[5]; 
     String[] login = new String[5]; 
     String[] password = new String[5]; 
     Date[] naiss = new Date[5]; 
     String[] mail = new String[5]; 
     for (int i = 0; i < 5; i++) { 
      nom[i] = "nom_" + i; 
      prenom[i] = "prenom_" + i; 
      login[i] = "login_" + i; 
      password[i] = "password_" + i; 
      naiss[i] = new Date(); 
      mail[i] = "mail_" + i; 
     } 

     System.out.println("\n SENDER AFFECTATION DES NOMS"); 
     u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]); 
     u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]); 
     u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]); 


     Sender sender = new Sender(); 

     sender.sendInformation(u1, "U1 notification"); 
     sender.sendInformation(u2, "U2 notification"); 
     sender.sendInformation(u3, "U3 notification"); 
     //PostIF post = new Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User) u1).getIdUser(),0); 
     //sender.sendPost(user, post); 
     sender.session.close(); 
     sender.connection.close(); 

    } 

} 

接收機類:

package com.citizenweb.classes; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.ObjectMessage; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.broker.region.Destination; 
import com.citizenweb.interfaces.PostIF; 
import com.citizenweb.interfaces.UserIF; 
import com.citizenweb.classes.Post; 

public class Receiver implements MessageListener, Serializable { 

    private static final long serialVersionUID = 1L; 
    private ActiveMQConnectionFactory factory = null; 
    private ActiveMQConnection connection = null; 
    private ActiveMQSession session = null; 
    private Topic destination = null; 
    private MessageConsumer consumer = null; 

    UserIF userTopic = new User(); 
    UserIF userSubscriber = new User(); 
    List<Message> listeMsg = new ArrayList<Message>(); 

    public Receiver(UserIF subscriber) { 
     this.userSubscriber = subscriber; 
    } 

    public void connect() { 
     try { 
      factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
      // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production 
      factory.setTrustAllPackages(true); 
      connection = (ActiveMQConnection) factory.createConnection(); 
      // ClientID : 
      // https://qnalist.com/questions/2068823/create-durable-topic-subscriber 
      connection.setClientID(userSubscriber.toString()); 
      connection.start(); 
      session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void receiveMessage(UserIF topic) { 
     try { 
      if (session == null) { 
       connect(); 
      } 
      destination = session.createTopic(topic.toString()); 
      String nomAbonnement = topic.toString() + "->" + userSubscriber.toString(); 
      //String nomAbonnement = userSubscriber.toString(); 
      consumer = session.createDurableSubscriber(destination, nomAbonnement); 
      consumer.setMessageListener(this); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void unsubscribe(UserIF topic) { 
     try { 
      if (session == null) { 
       connect(); 
      } 
      System.out.println("\n RECEIVER Désinscription du topic " + topic.toString()); 
      //consumer.close(); 
      String nomAbonnement = topic.toString() + "->" + userSubscriber.toString(); 
      //String nomAbonnement = userSubscriber.toString(); 
      System.out.println("\n RECEIVER Abonnement à clore = " + nomAbonnement); 
      session.unsubscribe(nomAbonnement); 
      System.out.println("\n RECEIVER " + userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void onMessage(Message message) { 
     System.out.println("\n RECEIVER OnMessage triggered for " + userSubscriber.toString()); 
     listeMsg.add(message); 
     System.out.println("\n RECEIVER Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size()); 
     String classe = message.getClass().getSimpleName(); 
     System.out.println("\n RECEIVER Classe de message : " + classe); 
     try { 
      if (message instanceof TextMessage) { 
       TextMessage text = (TextMessage) message; 
       System.out.println("\n RECEIVER Information : " + text.getText()); 
      } 
      if (message instanceof ObjectMessage) { 
       System.out.println("\n RECEIVER ObjectMessage"); 
       ObjectMessage oMessage = (ObjectMessage) message; 
       if (oMessage.getObject() instanceof PostIF) { 
        PostIF post = (PostIF) oMessage.getObject(); 
        String s = ((Post) post).getCorpsMessage(); 
        System.out.println("\n RECEIVER Post : " + s); 
       } 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public static void main(String[] args) throws JMSException { 

     /* 
     * EACH USER IS A TOPIC FOR OTHER USERS 
     * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS 
     */ 

     //CREATE USER 
     UserIF u1, u2, u3; 
     String[] nom = new String[5]; 
     String[] prenom = new String[5]; 
     String[] login = new String[5]; 
     String[] password = new String[5]; 
     Date[] naiss = new Date[5]; 
     String[] mail = new String[5]; 
     for (int i = 0; i < 5; i++) { 
      nom[i] = "nom_" + i; 
      prenom[i] = "prenom_" + i; 
      login[i] = "login_" + i; 
      password[i] = "password_" + i; 
      naiss[i] = new Date(); 
      mail[i] = "mail_" + i; 
     } 

     u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]); 
     u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]); 
     u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]); 

     /* 
     * MAKE EACH USER A SUBSCRIBER 
     */ 
     Receiver receiver1 = new Receiver(u1); 
     Receiver receiver2 = new Receiver(u2); 
     Receiver receiver3 = new Receiver(u3); 

     /* 
     * PUT A MESSAGE LISTENER FOR EACH USER 
     */ 
     receiver1.receiveMessage(u2); 
     receiver1.receiveMessage(u3); 
     receiver2.receiveMessage(u1); 
     receiver2.receiveMessage(u3); 
     receiver3.receiveMessage(u1); 
     receiver3.receiveMessage(u2); 

     /* 
     * CALL THE SENDER CLASS TO SEND MESSAGES 
     */ 
     try { 
      Sender.main(args); 
     } catch (Exception e1) { 
      e1.printStackTrace(); 
     } 

     /* 
     * A SLEEP TO HAVE ENOUGH TIME TO LOOK AT THE ACTIVEMQ CONSOLE 
     * CAN BE REMOVE 
     */ 
     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
      return; 
     } 

     /* 
     * UNSUBSCRIBE SUBSCRIBERS FROM TOPICS 
     */ 
     receiver1.unsubscribe(u2); 
     receiver1.unsubscribe(u3); 
     receiver2.unsubscribe(u1); 
     receiver2.unsubscribe(u3); 
     receiver3.unsubscribe(u1); 
     receiver3.unsubscribe(u2); 
    } 

} 

回答

1

如果目前從它不消耗活躍用戶只能取消持久訂閱。它看起來像你的代碼創建了幾個訂閱,並沒有阻止消費者,所以當然退訂會失敗,如果你關閉消費者,然後做退訂,你應該得到你正在尋找的結果。

持久訂閱取消訂閱的示例是here

+0

您好蒂姆和thanx爲您提供幫助。 當你說「如果沒有活動用戶正在使用持久訂閱,你只能取消訂閱」,那麼你似乎在討論**主題**。 我的問題是**一個主題**可能有**許多持久訂閱者**。其中一位用戶可能想要刪除此主題,並且沒有理由停止爲同一主題訂閱其他訂閱者。 我想停止訂閱某個主題,而不必去除主題本身 這就是我想要做的用我的Receiver類的unsubscribe()方法 難道不可能嗎? – Lovegiver

+0

這不是我說的,仔細閱讀。處於活動狀態的主題訂閱無法取消訂閱,您需要關閉正在使用您要取消訂閱的訂閱的使用者。 –

+0

行Tim 所以如果我做我的代碼中顯示的session.unsubscribe(durableID),這應該停止給定訂閱的訂閱,但只有當我做了consumer.close()之前? 我試了很多,但總是得到相同的錯誤信息:耐用消費者正在使用 我該怎麼做比這更多: 'consumer.close();' 'session.unsubscribe(nomAbonnement);' – Lovegiver

1

解決方案

你好,

我已經得到了解決方案及其說明。

每個連接都需要一個唯一的ClientIDconnection.setClientID(「clientID」);

我的錯誤在於瞭解給定客戶端的這個唯一性。

當客戶端訂閱主題時,此主題有一個連接。因此,對於訂閱了3個主題的給定客戶端(例如),需要3個ClientID,因爲需要3個連接。 ClientID必須是唯一的,因爲它標識一個主題的一個客戶端的一個連接。

這就是爲什麼我有很多JMSExceptions告訴Durable Consumer在我想結束它的訂閱時正在使用。

向所有給我時間和支持的人表示感謝。