2017-04-05 548 views
-1

我是Flink的新手。其實我試圖通過flink java api讀取文件和csv轉換。如何使用flink java api讀取目錄下的文件名(本地文件系統/ hdfs)

根據我們的要求。 一)需要通過文件夾作爲輸入參數,輸出參數爲CSV文件名 二)需要讀取從本地文件系統/ HDFS)文件 C寫的同一數據爲CSV

我的代碼:

public class WriteToCSV { 

    public static void main(String[] args) throws Exception { 
     final ParameterTool params = ParameterTool.fromArgs(args); 
     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
     env.getConfig().setGlobalJobParameters(params); 
     List<String> paths = new ArrayList<String>(); 
     File dir = new File("C://"); 
     for (File f : dir.listFiles()) { 
       paths.add(f.getName()); 
     } 
     DataSet<String> data = env.fromCollection(paths).rebalance(); 

     DataSet<Tuple2<String, Integer>> counts = 
        // split up the lines in pairs (2-tuples) containing: (word,1) 
        data.flatMap(new MySplitter()).groupBy(0).sum(1); 

     System.out.println(" data -:"+data); 
     data.print(); 
     counts.writeAsCsv("C://new.csv"); 
    } 
} 


class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { 
     // normalize and split the line into words 
     String[] tokens = value.split("\\W+"); 

     // emit the pairs 
     for (String token : tokens) { 
      if (token.length() > 0) { 
       out.collect(new Tuple2<String, Integer>(token, 1)); 
      } 
     } 
    } 
} 

我能夠得到文件名(data.print())。但csv不創建,在服務器控制檯中也不例外。

回答

1

爲什麼沒有寫在代碼的CSV的原因是,你不叫env.execute()counts.writeAsCsv("C://new.csv");

後,爲了進一步您可以使用env.readTextFile(path)它接受一個目錄路徑和讀取所有文件改進代碼該目錄爲每一行生成記錄。

+0

DataSet和DataStream程序的print()行爲不同。調用print()時,DataSet程序觸發執行,並將結果寫入提交程序的客戶端的stdout。 DataStream程序不啓動程序(這需要'execute()')並打印到工作的stdout。 –

+0

是的,我知道,但在提供的例子中'print()'在'writeAsCsv'之前被調用,所以我相信它打印得很好,但輸出不會寫入csv。 –

+0

啊,是的。對不起,我沒有看到你回答正確。謝謝! –

相關問題