2016-09-21 82 views
1

我目前正在研究需要實現定製處理器以在csv記錄上應用轉換的NiFi流。Apache NiFi - NullPointerException在自定義處理器上設置多個線程

我注意到這種行爲,在我正在執行的一些基準測試中:如果只有一個線程分配給每個自定義處理器,一切正常。由於java.lang.NullPointerException,將更多線程分配給定製處理器導致未能處理會話。

由於錯誤不能用單個線程複製,所以我正在考慮處理流程文件中的一些問題,或者我不知道的NiFi的某些方面。

執行訪問流文件屬性的處理。從不讀取流文件內容,並在添加一些屬性後返回輸出流文件。以下是處理器代碼的相關部分的一個片段:

@Override 
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { 

    // Will hold all the processed attributes 
    Map<String, String> processedAttributes = new HashMap<>(); 
    FlowFile flowfile = session.get(); 
    ... 
    // Adds the attributes to the flowfile 
    flowfile = session.putAllAttributes(flowfile, processedAttributes); 
    session.transfer(flowfile, PROCESSED); 
    } 

我在m4.4xlarge Amazon EC2實例運行NiFi 0.7。由於我正在尋求高性能(誰沒有),我正在尋找一種安全的方式來增加線程數量。任何建議真的很感激。

預先感謝您。

+1

您能否提供關聯的堆棧跟蹤並確保提供了引用的代碼? – apiri

回答

2

對於某些線程,session.get()方法將返回null是可能的(特別是對於多線程/併發任務)。如果有兩個或多個線程因爲有可用的流文件而被調度,則會發生這種情況,然後一個線程獲取流文件(通過session.get()),並且下一個線程將變爲空。

在你的情況下,你可能沒有讀或寫流文件,但其他方法(如session.putAllAttributes())在流文件上調用方法。許多處理器添加一個檢查來查看flowfile == null是否會返回(因爲它需要一個流文件來操作),也許這也會解決您的問題。

+0

僅在一個定製處理器上單獨嘗試,並沒有拋出任何NullPointerException。更改了所有這些代碼,不會顯示任何異常。出於好奇,NiFi如何處理隊列中線程的併發性?感謝您的幫助 – riccamini

相關問題