2017-03-03 209 views
0

我有一個文件夾包含了很多gzip文件。每個gzip文件都包含xml文件。我曾使用flume將文件流式傳輸到HDFS。下面是我的配置文件:Flume流gz文件

agent1.sources = src 
agent1.channels = ch 
agent1.sinks = sink 

agent1.sources.src.type = spooldir 
agent1.sources.src.spoolDir = /home/tester/datafiles 
agent1.sources.src.channels = ch 
agent1.sources.src.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder 

agent1.channels.ch.type = memory 
agent1.channels.ch.capacity = 1000 
agent1.channels.ch.transactionCapacity = 1000 

agent1.sinks.sink.type = hdfs 
agent1.sinks.sink.channel = ch 
agent1.sinks.sink.hdfs.path = /user/tester/datafiles 
agent1.sinks.sink.hdfs.fileType = CompressedStream 
agent1.sinks.sink.hdfs.codeC = gzip 
agent1.sinks.sink.hdfs.fileSuffix = .gz 
agent1.sinks.sink.hdfs.rollInterval = 0 
agent1.sinks.sink.hdfs.rollSize = 122000000 
agent1.sinks.sink.hdfs.rollCount = 0 
agent1.sinks.sink.hdfs.idleTimeout = 1 
agent1.sinks.sink.hdfs.batchSize = 1000 

後,我流的文件到HDFS,我使用星火使用下面的代碼來閱讀:

df = sparkSession.read.format('com.databricks.spark.xml').options(rowTag='Panel', compression='gzip').load('/user/tester/datafiles') 

,但我有問題,讀它。如果我手動將一個gzip文件上傳到HDFS文件夾並重新運行上面的Spark代碼,它就能夠無任何問題地讀取它。我不確定它是由於水槽。

我試圖通過flume下載流文件並將其解壓縮,當我查看內容時,它不再顯示xml格式,它是一些不可讀的字符。任何人都可以點亮這個燈光嗎?謝謝。

+0

如何解壓縮文件?使用'gunzip'?你用什麼插件讀取文件時遇到什麼問題?您是否嘗試過手動specyfing模式spark-xml? – Mariusz

+0

Mariusz,我沒有解壓縮文件。我正在試圖在gz文件中進行流式處理並使用spark來讀取它。我沒有手動指定模式。當我讀取流入的gzip文件並顯示它的內容時,它會顯示一些特殊字符。但是當我嘗試手動上傳HDFS中的gzip文件時,我可以毫無問題地閱讀它,它可以顯示內容和模式,而不會有任何問題。我認爲這是由於Flume? – kcyea

回答

0

我認爲你在做錯!!!爲什麼?

看到你有一個來源是「不可拆分」郵政編碼。你可以通過記錄將它們部分讀作記錄,如果你沒有解壓縮,你會得到一個GZIPInputStream,你正在使用這個GZIPInputStream。

當您讀取GZIP輸入流作爲輸入記錄後,您將已經ziped的流保存到另一個GZIP流中,因爲您選擇壓縮的接收器類型。

所以你已經在HDFS中的Gzip中進行了Zipped Streamed。 :)

我建議在cron中安排一個腳本來執行從本地到HDFS的複製將解決您的問題。

+0

噢,我明白了......所以Flume沒有辦法讓gzip文件取代gzip文件並將其以原始格式存入HDFS中?我在考慮如果我要在cron中安排一個腳本來執行從本地到HDFS的複製,那麼它將在HDFS中擁有許多小文件。這將最終在HDFS中佔用很多塊。這是因爲每個gzip文件大小都只有20kb左右。 – kcyea

+0

你可以做的是,你可以使用spark來讀取這些zip文件,並在處理後將它寫入hadoop。 – RBanerjee