2016-11-23 672 views
0

我在單元測試中使用以下工作將Avro/Parquet中的單個對象寫入Cloudera/HDFS羣集中的文件。如何使用Avro/Parquet將實時數據寫入HDFS?

這就是說,考慮到Parquet是一種列式格式,它似乎只能在批處理模式下寫出整個文件(不支持更新)。

那麼,什麼是實時寫入數據文件(通過ActiveMQ/Camel)的最佳實踐(1k msg /秒的小信息等)?

我想我可以聚合我的消息(緩衝區在內存或其他臨時存儲),並使用動態文件名在批處理模式下寫出來,但我覺得我錯過了手動分區/文件命名的東西,等等......

Configuration conf = new Configuration(false); 
conf.set("fs.defaultFS", "hdfs://cloudera-test:8020/cm/user/hive/warehouse"); 

conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); 
AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class); 

Path path = new Path("/cm/user/hive/warehouse/test1.data"); 

MyObject object = new MyObject("test"); 

Schema schema = ReflectData.get().getSchema(object.getClass()); 

ParquetWriter<InboundWirelessMessageForHDFS> parquetWriter = AvroParquetWriter.<MyObject>builder(path) 
    .withSchema(schema) 
    .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) 
    .withDataModel(ReflectData.get()) 
    .withDictionaryEncoding(false) 
    .withConf(conf) 
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) //required because the filename doesn't change for this test 
    .build(); 

parquetWriter.write(object); 
parquetWriter.close(); 

回答

0

基於我的(有限的)研究...我假設文件不能被附加到(設計)...所以我只是必須批量實時數據(在內存或其他地方)在寫出實木複合地板上的文件之前...

How to append data to an existing parquet file