2016-09-15 137 views
0

我需要在我的項目中將一些調用分組到我的服務器,因爲有一些「批處理」路由接受一個id列表,而不是一個簡單的ID。併發訪問列表

我的想法是建立一個有時會被清空的池。

所以我創建這個AbstractRepository(不知道這是很命名):

public abstract class AbstractRepository<T>{ 

    protected Context c; 

    //interval between 2 queue emptying 
    private final int POOL_DELAY = 200; 
    protected int downloadingTaskCount = 0; 

    final protected ArrayMap<String, T> memCache = new ArrayMap<>(); 
    final protected HashSet<String> queue = new HashSet<>(); 

    final protected ArrayMap<String, List<FetchedInterface<T>>> callbackMap = new ArrayMap<>(); 

    final protected List<PoolEmptiedInterface> emptinessWatchers = new ArrayList<>(); 


    protected AbstractRepository(Context c) { 
     handler.postDelayed(downloadRoutine, POOL_DELAY); 
     this.c = c; 
    } 

    public void cache(String id) { 
     if (!memCache.containsKey(memCache)) { 
      synchronized (queue) { 
       queue.add(id); 
      } 
     } 
    } 

    public void getCache(String id, FetchedInterface<T> callback) { 

     if (memCache.containsKey(id)) { 
      callback.fetched(memCache.get(id)); 
     } else { 

      synchronized (callbackMap) { 
       if (!callbackMap.containsKey(id)) { 
        callbackMap.put(id, new ArrayList<FetchedInterface<T>>()); 
       } 
       callbackMap.get(id).add(callback); 
      } 

      synchronized (queue) { 
       queue.add(id); 
      } 

     } 
    } 

    public void getCacheIdObj(List<IdObject> idsObj, final ListFetchedInterface<T> callback) { 
     ArrayList<String> ids = new ArrayList<>(); 
     for (IdObject idObj : idsObj) { 
      ids.add(idObj.getId()); 
     } 
     getCache(ids, callback); 
    } 

    public void getCache(List<String> ids, final ListFetchedInterface<T> callback) { 
     final CountDownLatch countDownLatch = new CountDownLatch(ids.size()); 
     final ArrayList<T> array = new ArrayList<>(); 
     for (String id : ids) { 
      getCache(id, new FetchedInterface<T>() { 
       @Override 
       public void fetched(T item) { 
        array.add(item); 
        countDownLatch.countDown(); 
       } 
      }); 

     } 

     new Thread(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        countDownLatch.await(); 
        callback.fetched(array); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 

    } 


    /** 
    * Exists for threads that want to be notified that the user queue has been flushed. 
    */ 
    public void getNotifiedWhenQueueIsEmptied(PoolEmptiedInterface<T> callback) { 
     if (downloadingTaskCount == 0 && queue.isEmpty()) { 
      callback.poolEmpty(); 
     } else { 
      synchronized (emptinessWatchers) { 
       emptinessWatchers.add(callback); 
      } 
     } 
    } 

    protected void doIt(
      final HashSet<String> processingQueue) { 
    } 

    /** 
    * Pool Loop 
    */ 
    Handler handler = new Handler(); 

    private Runnable downloadRoutine = new Runnable() { 
     @Override 
     public void run() { 

      if (!queue.isEmpty()) { 
       final HashSet<String> processingQueue = new HashSet<>(); 

       synchronized (queue) { 
        processingQueue.addAll(queue); 
        queue.clear(); 
       } 
       downloadingTaskCount++; 
       doIt(processingQueue); 
      } 

      handler.postDelayed(downloadRoutine, POOL_DELAY); 
     } 
    }; 

而且它的一個子UserRepository

public class UserRepository extends AbstractRepository<UserCache> { 

    private static volatile UserRepository instance; 

    public static UserRepository getInstance(Context c) { 
     synchronized (UserRepository.class) { 
      if (instance == null) { 
       instance = new UserRepository(c); 
      } 
      return instance; 
     } 
    } 

    private UserRepository(Context c) { 
     super(c); 
    } 

    @Override 
    protected void doIt(final HashSet<String> processingQueue) { 
     Api.getInstance().backend.getUsersCache(new IdListArguments(new ArrayList<>(processingQueue))) 
       .enqueue(new Callback<Map<String, UserCache>>() { 
        @Override 
        public void onResponse(Call<Map<String, UserCache>> call, Response<Map<String, UserCache>> responseParent) { 
         Map<String, UserCache> response = responseParent.body(); 
         Iterator<String> it = processingQueue.iterator(); 
         while (it.hasNext()) { 

          String id = it.next(); 
          if (response.containsKey(id)) { 
           memCache.put(id, response.get(id)); 
           if (callbackMap.containsKey(id)) { 
            for (FetchedInterface callback : callbackMap.get(id)) { 
             callback.fetched(response.get(id)); 
            } 
           } 
           it.remove(); 
          } 
         } 

         for (PoolEmptiedInterface watcher : emptinessWatchers) { 
          watcher.poolEmpty(); 
         } 
         downloadingTaskCount--; 
         queue.addAll(processingQueue); 
        } 

        @Override 
        public void onFailure(Call<Map<String, UserCache>> call, Throwable t) { 
         queue.addAll(processingQueue); 
        } 
       }); 
    } 
} 

我的例外:

Java.util.ConcurrentModificationException 
    at java.util.ArrayList$ArrayListIterator.next(ArrayList.java:573) 
    at com.m360.android.domain_layer.interactors.MemberInteractor.constructPagerMemberUsers(MemberInteractor.java:116) 
    at com.m360.android.domain_layer.interactors.MemberInteractor.access$000(MemberInteractor.java:29) 
    at com.m360.android.domain_layer.interactors.MemberInteractor$2$1.fetched(MemberInteractor.java:64) 
    at com.m360.android.datalayer.repositories.AbstractRepository$2.run(AbstractRepository.java:99) 

而且MemberInteractor只包含靜態方法,崩潰出現使用foll欠款2:

public static void getGroupUsers(String id, final Context c, final ErrorDisplayerInterface i, final MemberInteractorCallback callback) { 
    GroupRepository.getInstance(c).getCache(id, new AbstractRepository.FetchedInterface<Group>() { 
     @Override 
     public void fetched(Group item) { 
      UserRepository userRepositoryNew = UserRepository.getInstance(c); 
      userRepositoryNew.getCache(new ArrayList<>(item.getUsers()), new AbstractRepository.ListFetchedInterface<UserCache>() { 
       @Override 
       public void fetched(List<UserCache> items) { 
        callback.onFinish(constructPagerMemberUsers(items)); 
       } 
      }); 
     } 
    }); 
} 


private static List<PagerMemberUser> constructPagerMemberUsers(final List<UserCache> items) { 

    final List<PagerMemberUser> users = new ArrayList<>(); 
    for (UserCache item : items) { 
     users.add(new PagerMemberUser(item)); 
    } 
    return users; 
} 

對不起有很多的代碼,但我認爲所有這些都與我的問題有關。

那麼發生了什麼?我沒有看到問題。

回答

2

您得到一個ConcurrentModificationException用於更改您迭代的列表(通過for each loop)。

所以,簡單的辦法是在這種情況下,改變的foreach使列表的副本你迭代它之前:

for (UserCache item : new LinkedList<UserCache>(items)) { 
    users.add(new PagerMemberUser(item)); 
} 
+0

赫姆您的修復工作的確,但我會曏者優先同步修改方法。但我不明白它可能是什麼。 –