-1
我是Flink的新手。其實我試圖通過flink java api讀取文件和csv轉換。如何使用flink java api讀取目錄下的文件名(本地文件系統/ hdfs)
根據我們的要求。 一)需要通過文件夾作爲輸入參數,輸出參數爲CSV文件名 二)需要讀取從本地文件系統/ HDFS)文件 C寫的同一數據爲CSV
我的代碼:
public class WriteToCSV {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
List<String> paths = new ArrayList<String>();
File dir = new File("C://");
for (File f : dir.listFiles()) {
paths.add(f.getName());
}
DataSet<String> data = env.fromCollection(paths).rebalance();
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
data.flatMap(new MySplitter()).groupBy(0).sum(1);
System.out.println(" data -:"+data);
data.print();
counts.writeAsCsv("C://new.csv");
}
}
class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line into words
String[] tokens = value.split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
我能夠得到文件名(data.print())。但csv不創建,在服務器控制檯中也不例外。
DataSet和DataStream程序的print()行爲不同。調用print()時,DataSet程序觸發執行,並將結果寫入提交程序的客戶端的stdout。 DataStream程序不啓動程序(這需要'execute()')並打印到工作的stdout。 –
是的,我知道,但在提供的例子中'print()'在'writeAsCsv'之前被調用,所以我相信它打印得很好,但輸出不會寫入csv。 –
啊,是的。對不起,我沒有看到你回答正確。謝謝! –