2016-03-03 152 views
0

我正在向HDFS發送流並嘗試使用spark讀取文本文件。Apache Spark將文件讀取爲正則表達式

JavaStreamingContext jssc = new JavaStreamingContext(jsc, new  
Duration(1000)); 
JavaPairInputDStream<LongWritable, Text> textStream = 
jssc.fileStream("hdfs://myip:9000/travel/FlumeData.[0-9]*", 
LongWritable.class, Text.class, TextInputFormat.class); 

在發送流HDFS一些FlumeData.1234.tmp文件被創建,一旦接收到完整的數據文件被轉換成適當的文件如。 FlumeData.1234

我想忽略這個.tmp文件,以便從spark中讀取。我嘗試使用正則表達式

HDFS:// MYIP:9000 /旅遊/ FlumeData [0-9] * HDFS:// MYIP:9000 /旅遊/ FlumeData .// d *

但他們不工作。我正在尋找這樣的東西 jssc.fileStream(「hdfs:// myip:9000/travel/FlumeData。[0-9] *」, LongWritable.class,Text.class,TextInputFormat.class);

fileStream不應從文件擴展名中讀取.tmp文件。

我也試過以下Hadoop的代碼來檢索蒼蠅

private String pathValue(String PathVariable) throws IOException{ 



     Configuration conf = new Configuration(); 
     Path path = new Path(PathVariable); 
     FileSystem fs = FileSystem.get(path.toUri(), conf); 
     System.out.println("PathVariable" + fs.getWorkingDirectory()); 

     return fs.getName(); 
    } 

的列表中,但文件系統對象FS不要有文件名()。由於新文件是在運行時創建的。我需要閱讀他們創造的。

回答

0

JavaPairInputDStream重載的fileStream方法需要過濾函數,我們可以編寫一個過濾函數來過濾掉目錄中的文件。

fileStream(directory, kClass, vClass, fClass, filter, newFilesOnly) 

JavaPairInputDStream<LongWritable, Text> lines = jssc.fileStream("hdfs://myip:9000/travel/", LongWritable.class, Text.class, TextInputFormat.class, new Function<Path,Boolean>() { 
     public Boolean call(Path path) throws Exception { 
      System.out.println("Is path :"+path.getName()); 
      Pattern pattern = Pattern.compile("FlumeData.[0-9]*"); 
      Matcher m = pattern.matcher(path.getName()); 
      System.out.println("Is path : " + path.getName().toString() + " matching " 
       + " ? , " + m.matches()); 
      return m.matches(); 
     }}, true); 

請使用上面的代碼運行,我希望能解決這個問題。

0

您需要使用()選擇器來選擇可以從匹配中保留的部分。如果您沒有指定任何部分,則返回整個匹配。

在你的情況,如果我不missunderstanding你要選擇在你的榜樣:

FlumeData.1234 from FlumeData.1234.tmp 

要做到這一點,你需要簡單的正則表達式是:

(.*).tmp 

,如果你想選擇.tmp擴展名前的所有內容。

+0

我試過使用選擇器,但得到了這個異常java.io.FileNotFoundException:文件hdfs:// myIP:9000/FlumeData。([0-9])*不存在。它讀作文本而不是正則表達式。我也更新了我的實際查詢。 – sangita

+0

也許它不接受正則表達式作爲param的一部分,我假設你的問題基本上是你不知道你想要流的文件的名稱,對吧?也許你可以在hdfs:// myip:9000/travel /中列出這些文件,並使用正則表達式過濾它們的名字,並檢索你想要的文件名? – spaniard

+0

我已經添加了一些代碼。請幫助獲取文件名。 – sangita