2014-11-06 71 views
0

我正在編寫一個程序來管理文件,即將文件(如對象)從directory1移動到2.我在級聯中寫了一段代碼,我在考慮如何編寫「管道」來創建流。如何創建管道?

有人可以幫我嗎?

謝謝。

// access path to the file input and output and archiving 
String inputPath = args[0];   //Directory in HDFS of the Input 
String outputPath = args[1];  //Directory in HDFS of the Input 

File inFile = new File(inputPath); 
File outFile = new File(outputPath); 

FileUtils.moveFile(inFile, outFile); 

//Set up the configuration Properties 
Properties properties = new Properties(); 
AppProps.setApplicationJarClass(properties, MoveToArchive.class); 
FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties); 

//Create Sources and Sinks Taps 
Tap inputTap = new Hfs(new TextLine(), inputPath); 
Tap outputTap = new Hfs(new TextLine(), outputPath); 

Pipe copyPipe = new Pipe("copy"); 
copyPipe = new Each(copyPipe, SelectFileFunction(inFile)); 

FlowDef flowDef = FlowDef.flowDef() 
      .setName("archive") 
      .addSource("input", inputTap) 
      .addSource("output", outputTap); 

flowConnector.connect(flowDef).complete(); 

try{ 
    if(outFile.exists()){ 
     System.err.println("file has been moved successfully!"); 
    } 
} catch(Exception e){ 
     System.err.println("file not found in Archive Directory"); 
} 

回答

0

您的管道未連接到任何源或接收器。認爲管道。

更換

 .addSource("input", inputTap) 
     .addSource("output", outputTap); 

 .addSource(copyPipe, inputTap) 
     .addTailSink(copyPipe, outputTap);