2017-09-20 96 views
1

我想使用java 8 CompletionStages連續執行2個異步方法,以便第二個在第一個失敗時不會執行。但是,當我打電話thenCompose,傳遞的功能似乎開始之前,前一個功能齊全(如:兩個功能並行錯誤地執行以下是代碼:CompletionStage.thenCompose沒有連續執行

public CompletionStage<Graph> create(Payload payload) { 
    CompletionStage<BlobInfo> fileFuture = createFile(payload); 
    CompletionStage<Entity> metadataFuture = createMetadata(payload); 
    return fileFuture 
     .thenCompose(ignore -> metadataFuture) 
     .thenApply(entity -> 
      buildFromEntity(objectMapper, entity)); 
    } 

    public CompletionStage<BlobInfo> createFile(Payload payload) { 
    return CompletableFuture.supplyAsync(() -> { 
     try { 
     return 
      storage.create(
       BlobInfo 
        .newBuilder(payload.bucket, payload.name) 
        .build(), 
       payload.data.getBytes()); 
     } catch (StorageException e) { 
     LOG.error("Failed to write to storage: " + e); 
     throw new RequestHandlerException(StatusCode.SERVER_ERROR, 
      "Failed to write to storage."); 
     } 
    }); 
    } 


    public CompletionStage<Entity> createMetadata(Payload payload) { 
    return CompletableFuture.supplyAsync(() -> createSync(payload)); 
    } 

    private Entity createMetadataSync(Payload payload) { 
    Key key = keyFactory.newKey(payload.id); 
    Entity.Builder entityBuilder = GraphPayload.buildEntityFromGraph(payload, key); 
    Entity entity = entityBuilder.build(); 
    LOG.error("Metadata.createSync"); 

    try { 
     datastore.add(entity); 
    } catch (DatastoreException e) { 
     LOG.error("Failed to write initial metadata: " + e); 
     throw new RequestHandlerException(StatusCode.SERVER_ERROR, 
      "Failed to write initial metadata."); 
    } 
    return entity; 
    } 

OUTPUT:

16:57:47.530 [ForkJoinPool.commonPool-worker-3] ERROR com.spotify.nfgraphstore.store.FileStore - CreateFile 
16:57:47.530 [ForkJoinPool.commonPool-worker-2] ERROR com.spotify.nfgraphstore.store.MetadataStore - Metadata.createSync 
16:57:47.530 [ForkJoinPool.commonPool-worker-3] ERROR com.spotify.nfgraphstore.store.FileStore - Failed to write initial graph to storage: com.google.cloud.storage.StorageException: X 

記錄的輸出表明Metadata.createSync是越來越執行存儲異常得到投擲之前,這一結論也被測試(未顯示),這是爲了顯示與元數據DB零個互動脫胎於如果寫入到文件存儲數據庫失敗,該測試有時會失敗,提示競爭狀況。

所以我留下來想想thenCompose不保證串行執行。但是我在java文檔中讀到的所有內容都暗示執行應該是串行的:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#thenCompose-java.util.function.Function-

有誰知道爲什麼執行不能保證是串行的,或者推薦其他可能更符合我的意圖的函數?

回答

5

createMetadata的調用立即啓動任務,因爲它不作爲傳遞給thenCompose的lambda表達式的一部分進行調用。

也許你的意思是這樣:

.thenCompose(ignore -> createMetadata(payload)) 
+3

...或只是'thenApplyAsync(忽略 - > createSync(有效載荷));',因爲沒有理由來包裝操作到另一個'CompletableFuture'。 – Holger

+0

謝謝。這個答案似乎解決了這個問題。我不明白爲什麼。我添加了更多的調試語句,並且創建元數據未來的行絕對不會導致createMetadataSync被調用。所以它必須被thenCompose立即調用。那麼爲什麼在上一階段完成之前調用完成階段呢? – jsarma

+0

感謝您的額外解答Holger。我已經證實了這一點。 – jsarma