2016-08-03 111 views
4

我遇到問題,當我的連接入侵時,mqtt發佈不發送重新連接時,如何解決它?進出口以下this answer但不工作如何在離線時將數據mqtt存儲並在線發送時

我所做的

  • 林已經實現服務MQTT發送GPS定位和照常工作時在線。
  • 將Qos設置爲1.
  • Set ClientId fixed。
  • 集發佈的QoS 1.
  • 一套乾淨的會話虛假

但結果時,我還是重新發布數據時,我在網上&沒有發佈存儲持久性數據。

這裏是我的源代碼:

package id.trustudio.android.mdm.service; 

import android.app.Service; 
import android.content.Context; 
import android.content.Intent; 
import android.content.SharedPreferences; 
import android.content.pm.ApplicationInfo; 
import android.content.pm.PackageManager; 
import android.net.TrafficStats; 
import android.os.Handler; 
import android.os.IBinder; 
import android.support.annotation.Nullable; 
import android.util.Log; 

import org.eclipse.paho.android.service.MqttAndroidClient; 
import org.eclipse.paho.client.mqttv3.IMqttActionListener; 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 

import java.io.UnsupportedEncodingException; 

import id.trustudio.android.mdm.http.DetectConnection; 
import id.trustudio.android.mdm.util.Cons; 
import id.trustudio.android.mdm.util.Debug; 
import id.trustudio.android.mdm.util.GPSTracker; 
import id.trustudio.android.mdm.util.GPSTracker2; 

public class MqttService extends Service implements MqttCallback { 

    public static boolean isStarted = false; 

    private double latitude = 0; 
    private double longitude = 0; 
    private GPSTracker mGPSTracker; 
    private GPSTracker2 mGPSTracker2; 

    boolean isInternetPresent = false; 

    private SharedPreferences mPrivatePref; 
    private SharedPreferences.Editor editor; 

    private DetectConnection mDetectConnection; 
    String deviceID,Name; 
    int totalbyte; 
    String packages; 
    MemoryPersistence persistence; 
    String clientId; 
    MqttAndroidClient client; 

    @Nullable 
    @Override 
    public IBinder onBind(Intent intent) { 
     return null; 
    } 

    @Override 
    public void onCreate() { 
     super.onCreate(); 

     mPrivatePref = this.getSharedPreferences(Cons.PRIVATE_PREF, Context.MODE_PRIVATE); 
     editor = mPrivatePref.edit(); 

     deviceID = mPrivatePref.getString(Cons.APP_PACKAGE + "deviceid", ""); 
     Name = mPrivatePref.getString(Cons.APP_PACKAGE + "user", ""); 

     clientId = MqttClient.generateClientId(); 
     persistence = new MemoryPersistence(); 

     client = 
       new MqttAndroidClient(getApplicationContext(), "tcp://broker.administrasi.id:1883", 
         clientId, persistence); 

     client.setCallback(this); 

     try{ 
      MqttConnectOptions connOpts = new MqttConnectOptions(); 
      connOpts.setCleanSession(false); 
      client.connect(connOpts,null, new IMqttActionListener() { 

         @Override 
         public void onSuccess(IMqttToken asyncActionToken) { 

         } 

         @Override 
         public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 

         } 
        }); 
     }catch (Exception e){ 
      e.printStackTrace(); 
     } 

     mHandler.postDelayed(mUpdateTask, 1000); 
    } 


    public int onStartCommand(Intent intent, int flags, int startId) { 

     int res = super.onStartCommand(intent, flags, startId); 

     //check if your service is already started 
     if (isStarted){  //yes - do nothing 
      return Service.START_STICKY; 
     } else {    //no 
      isStarted = true; 
     } 

     return Service.START_STICKY; 

    } 

    private Handler mHandler = new Handler(); 
    private Runnable mUpdateTask = new Runnable() { 
     public void run() { 

      getLatLng(); 
      if (latitude == 0.0 || longitude == 0.0) getLatLngWifi(); 

         Debug.e("MQTT","Connect"); 
         String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
         Debug.e("MQTT CLIENT", clientId); 
         int qos = 1; 
         try { 
          IMqttToken subToken = client.subscribe(topic, qos); 
          subToken.setActionCallback(new IMqttActionListener() { 
           @Override 
           public void onSuccess(IMqttToken asyncActionToken) { 
            // The message was published 

            String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
            long CurrentTime = System.currentTimeMillis(); 

            String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime; 

            byte[] encodedPayload = new byte[0]; 
            try { 
             encodedPayload = payload.getBytes("UTF-8"); 
             MqttMessage message = new MqttMessage(encodedPayload); 
             client.publish(topic, message); 
             message.setRetained(true); 
             // set quality of service 
             message.setQos(1); 
             Log.d("TAG", "onSuccess"); 
            } catch (UnsupportedEncodingException | MqttException e) { 
             e.printStackTrace(); 
            } 
           } 

           @Override 
           public void onFailure(IMqttToken asyncActionToken, 
                 Throwable exception) { 
            // The subscription could not be performed, maybe the user was not 
            // authorized to subscribe on the specified topic e.g. using wildcards 

           } 
          }); 
         } catch (MqttException e) { 
          e.printStackTrace(); 
         } 

      mHandler.postDelayed(this, 20000); 
     } 
    }; 

    private void getLatLng() { 
     mGPSTracker2  = new GPSTracker2(this); 
     isInternetPresent = mDetectConnection.isConnectingToInternet(); 
     if (isInternetPresent == true) { 
      if (mGPSTracker2.canGetLocation()) { 
       latitude = mGPSTracker2.getLatitude(); 
       longitude = mGPSTracker2.getLongitude(); 

       if(latitude != 0.0 && longitude != 0.0) { 
        editor.putString(Cons.APP_LATITUDE, latitude+""); 
        editor.putString(Cons.APP_LONGITUDE, longitude+""); 
        editor.commit(); 
       } 
      } else { 
//    getLatLngWifi(); 
       Debug.i(Cons.TAG, "on gps failed, please check"); 

      } 
     } else { 
      Debug.i(Cons.TAG, "no connection"); 

      if(mGPSTracker2 != null) 
       mGPSTracker2.stopUsingGPS(); 
     } 
    } 

    private void getLatLngWifi() { 
     mGPSTracker   = new GPSTracker(this); 
     isInternetPresent = mDetectConnection.isConnectingToInternet(); 
     if (isInternetPresent == true) { 
      if (mGPSTracker.canGetLocation()) { 
       latitude = mGPSTracker.getLatitude(); 
       longitude = mGPSTracker.getLongitude(); 

       if(latitude != 0.0 && longitude != 0.0) { 
        editor.putString(Cons.APP_LATITUDE, latitude+""); 
        editor.putString(Cons.APP_LONGITUDE, longitude+""); 
        editor.commit(); 
       } 

      } else { 
       Debug.i(Cons.TAG, "wifi " + "on gps failed, please check"); 

      } 
     } else { 
      Debug.i(Cons.TAG, "wifi " + "no connection"); 

      if(mGPSTracker != null) 
       mGPSTracker.stopUsingGPS(); 
     } 
    } 

    @Override 
    public void connectionLost(Throwable cause) { 

    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 

    } 
} 

對不起,我英文不好

+0

在鏈接到的問題是如何獲得的信息傳遞給客戶的回答已斷開連接,而不是在重新連接時如何從斷開連接的客戶端發送消息。如果您嘗試在客戶端斷開連接時發佈消息,則它將引發異常,取決於您的異常並將消息存儲在重新連接後重新發送。 – hardillb

+0

另外,您在發佈消息後設置了QOS並保留了標誌,這將不起作用,因爲消息已經消失。 – hardillb

+0

有沒有對圖書館的任何回調這樣做,或者我必須執行我自己的腳本來做到這一點? –

回答

1

正如評論佈局。

這是你必須自己編碼的東西,不支持存儲尚未發送的消息,因爲客戶端在框架中斷開連接。 MQTT持久性僅用於確保在QOS握手完成之前與代理的連接關閉時,具有QOS 1/2的消息在運行中不會丟失。

如果嘗試在斷開連接時發佈消息,client.publish(topic, message)調用將引發異常,則需要捕獲此異常,然後安排在重新建立連接時存儲消息的內容。一旦連接恢復運行,您需要遍歷存儲的詳細信息並再次嘗試發送。

1

所以這裏的樣本,作爲hardillb回答我做我自己的工具來存儲數據到本地數據庫,併發送所有連接重新建立。

這裏我的源代碼

private Handler mHandler = new Handler(); 
     private Runnable mUpdateTask = new Runnable() { 
     public void run() { 

      getLatLng(); 
      if (latitude == 0.0 || longitude == 0.0) getLatLngWifi(); 

         Debug.e("MQTT","Connect"); 
         String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
         Debug.e("MQTT CLIENT", clientId); 
         int qos = 1; 
         try { 
          IMqttToken subToken = client.subscribe(topic, qos); 
          subToken.setActionCallback(new IMqttActionListener() { 
           @Override 
           public void onSuccess(IMqttToken asyncActionToken) { 
            // The message was published 
            mList = getLocationAll();//call all data stored on sqlite 
         Debug.e("MQTT","Connected. Size list = "+mList.size()); 

         if(mList.size() > 0){//if data found then send in looping 
            for (int i = 0; i < mList.size() ; i++) { 
            final String Latitude = mList.get(i).latitude; 
            final String Longitude = mList.get(i).longitude; 
            final String timestamps = mList.get(i).CurrentTimes; 

            String payload = deviceID + "|" + timestamps + "|" + Name + "|" + Latitude + "|" + Longitude; 

           byte[] encodedPayload = new byte[0]; 
           try { 
            encodedPayload = payload.getBytes("UTF-8"); 
            MqttMessage message = new MqttMessage(encodedPayload); 
            // set quality of service 
            client.publish(topic, message); 
           } catch (UnsupportedEncodingException | MqttException e) { 
            e.printStackTrace(); 
           } 
           } 

           DeleteAllLocation(); 

          } 
            String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
            long CurrentTime = System.currentTimeMillis(); 

            String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime; 

            byte[] encodedPayload = new byte[0]; 
            try { 
             encodedPayload = payload.getBytes("UTF-8"); 
             MqttMessage message = new MqttMessage(encodedPayload); 
             client.publish(topic, message); 
             message.setRetained(true); 
             // set quality of service 
             message.setQos(1); 
             Log.d("TAG", "onSuccess"); 
            } catch (UnsupportedEncodingException | MqttException e) { 
             e.printStackTrace(); 
            } 
           } 

           @Override 
           public void onFailure(IMqttToken asyncActionToken, 
                 Throwable exception) { 
            // The subscription could not be performed, maybe the user was not 
            // authorized to subscribe on the specified topic e.g. using wildcards 
            long CurrentTime = System.currentTimeMillis(); 
            addLocation(deviceID, CurrentTime+"", Name,  latitude+"" , longitude+""); //add data to sqlite when offline 
            Debug.e("MQTT","Failure"); 

           } 
          }); 
         } catch (MqttException e) { 
          e.printStackTrace(); 
         } 

      mHandler.postDelayed(this, 20000); 
     } 
    }; 

這裏我的SQLite存儲&刪除數據

public void addLocation(String device_id, String timestamp, String user_id, String latitude, String longitude) { 
    if (sqLite == null) { 
     Debug.i(Cons.TAG, "null database"); 
     return; 
    } 

    ContentValues values = new ContentValues(); 

    values.put("device_id", device_id); 
    values.put("timestamp", timestamp); 
    values.put("user_id", user_id); 
    values.put("latitude", latitude); 
    values.put("longitude", longitude); 

    Debug.i(Cons.TAG, "Insert location : title = " + device_id); 

    sqLite.insert("tbl_location", null, values); 
} 

public ArrayList<LocationModel> getLocationAll() { 
    ArrayList<LocationModel> result = new ArrayList<LocationModel>(); 
    if (sqLite == null || result == null) { 
     return result; 
    } 

    String sql = "SELECT * FROM tbl_location ORDER BY timestamp ASC"; 

    Cursor c = sqLite.rawQuery(sql, null); 

    int device_id  = c.getColumnIndex("device_id"); 
    int timestamp  = c.getColumnIndex("timestamp"); 
    int userid   = c.getColumnIndex("user_id"); 
    int latitude  = c.getColumnIndex("latitude"); 
    int longitude   = c.getColumnIndex("longitude"); 

    if (c != null) { 
     if (c.moveToFirst()) { 

      while (c.isAfterLast() == false) { 
       LocationModel mApps = new LocationModel(); 

       mApps.DeviceId  = c.getInt(device_id); 
       mApps.CurrentTimes = c.getString(timestamp); 
       mApps.UserId  = c.getString(userid); 
       mApps.latitude  = c.getString(latitude); 
       mApps.longitude  = c.getString(longitude); 

       result.add(mApps); 

       c.moveToNext(); 
      } 

      c.close(); 
     } 
    } 

    return result; 
} 

public void DeleteAllLocation() { 
    if (sqLite == null) 
     return; 

    sqLite.delete("tbl_location", null, null); 
}