2013-11-04 35 views
0

我需要將MR作業的結果輸出到多個CQL3列族。使用CQL3的Hadoop多輸出

在我的reducer中,我使用MultipleOutputs指定CF,但所有結果都寫入作業的OutputCQL語句中定義的一個CF中。

工作definiton:

... 
job.setOutputFormatClass(CqlOutputFormat.class); 
ConfigHelper.setOutputKeyspace(job.getConfiguration(), "keyspace1"); 
MultipleOutputs.addNamedOutput(job, "CF1", CqlOutputFormat.class, Map.class, List.class); 
MultipleOutputs.addNamedOutput(job, "CF2", CqlOutputFormat.class, Map.class, List.class); 
CqlConfigHelper.setOutputCql(job.getConfiguration(), "UPDATE keyspace1.CF1 SET value = ? "); 
... 

減速類設置:

mos = new MultipleOutputs(context); 

減少方法(psudo代碼):

keys = new LinkedHashMap<>(); 
keys.put("key", ByteBufferUtil.bytes("rowKey")); 
keys.put("name", ByteBufferUtil.bytes("columnName")); 

List<ByteBuffer> variables = new ArrayList<>(); 
variables.add(ByteBufferUtil.bytes("columnValue")); 

mos.write("CF2", keys, variables); 

的問題是,我減速忽略CF I指定在mos.write()中,而是必須運行outputCQL。所以在上面的例子中,所有內容都寫入CF1。

我試着使用準備好的語句將CF插入outputCQL,沿着「UPDATE keyspace1。?SET value =?」的行,但我不認爲它可以像這樣使用CF的佔位符。

有沒有什麼辦法可以覆蓋reducer類內的outputCQL?

回答

0

所以簡單的答案是,你不能將結果從一個mr作業輸出到多個CFs。但是,有必要這樣做實際上突出了方法中的缺陷,而不是Hadoop中缺少的功能。

而不是處理一堆記錄,並試圖在一個通道中產生2個不同的結果集,更好的方法是迭代地得到所需的結果集。基本上,這意味着有多個工作迭代以前的工作結果,直到達到預期的結果。