2013-01-11 74 views
3

我是Hive和MapReduce的新手,非常感謝您的回答,並且提供了正確的方法。從MapReduce作業向Hive添加分區

我已經定義了一個外部表logs在hive分區日期和原始服務器上的外部位置hdfs /data/logs/。我有一個MapReduce作業,它獲取這些日誌文件並將它們拆分並存儲在上述文件夾下。像

"/data/logs/dt=2012-10-01/server01/" 
"/data/logs/dt=2012-10-01/server02/" 
... 
... 

從MapReduce工作,我想補充分區蜂巢中的表格日誌。我知道這兩種方法

  1. alter table命令 - 太多的ALTER TABLE命令
  2. 添加動態分區

對於方法有兩個我只看到INSERT OVERWRITE例子這是不是我的選擇。有沒有辦法在作業結束後將這些新分區添加到表中?

回答

3

要在Map/Reduce作業中執行此操作,我建議使用Apache HCatalog,這是一個在Hadoop下加蓋的新項目。

HCatalog確實是HDFS上的一個抽象層,因此您可以用標準方式編寫輸出,無論是Hive,Pig還是M/R。如果這是你的圖片,你可以使用輸出格式HCatOutputFormat從你的Map/Reduce作業直接將數據加載到Hive中。以下是取自the official website的示例。

寫了一個特定的分區(A = 1,B = 1)會去像這樣的電流代碼示例:

Map<String, String> partitionValues = new HashMap<String, String>(); 
partitionValues.put("a", "1"); 
partitionValues.put("b", "1"); 
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues); 
HCatOutputFormat.setOutput(job, info); 

和寫入到多個分區,單獨的作業將不得不與上述每個開始。

您也可以在HCatalog中使用動態分區,在這種情況下,您可以在同一個作業中加載任意多個分區!

我建議您在上面提供的網站上進一步閱讀HCatalog,如果需要,應該會提供更多詳細信息。

+0

我使用Cloudera的分佈,它不具有與捆綁HCatalog。 Oozie可以成爲一種選擇嗎?如果是這樣的話,有什麼想法? – user1971133

+0

如果你不想要很多alter table語句,不想做一個插入覆蓋,也不能使用HCatalog,這在我看來會變得很複雜。 Oozie只是一個工作流程調度器,你仍然需要在某個地方定義一個工作。 –

3

事實上,事情比這更復雜一點,這很不幸,因爲它在官方消息中是沒有文檔的(截至目前),並且需要花費幾天的時間才能弄清楚。

我發現,我需要做下面讓HCatalog MapReduce作業與寫入動態分區工作:

在我的工作(通常是減速)的我的記錄寫入階段,我必須要手動將我的動態分區(HCatFieldSchema)添加到我的HCatSchema對象。

麻煩的是,HCatOutputFormat.getTableSchema(config)實際上並不返回分區字段。他們需要手動添加

HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null); 
HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null); 
schema.append(hfs1); 
schema.append(hfs2); 
0

下面是寫與動態劃分多個表中使用HCatalog一個工作的代碼,代碼已經在Hadoop 2.5.0,蜂巢0.13測試。1:

// ... Job setup, InputFormatClass, etc ... 
String dbName = null; 
String[] tables = {"table0", "table1"}; 

job.setOutputFormatClass(MultiOutputFormat.class); 
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job); 

List<String> partitions = new ArrayList<String>(); 
partitions.add(0, "partition0"); 
partitions.add(1, "partition1"); 

HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null); 
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null); 

for (String table : tables) { 
    configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class); 

    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null); 
    outputJobInfo.setDynamicPartitioningKeys(partitions); 

    HCatOutputFormat.setOutput(
     configurer.getJob(table), outputJobInfo 
    ); 

    HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration()); 
    schema.append(partition0); 
    schema.append(partition1); 

    HCatOutputFormat.setSchema(
     configurer.getJob(table), 
     schema 
    ); 
} 
configurer.configure(); 

return job.waitForCompletion(true) ? 0 : 1; 

映射:

public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> { 
    @Override 
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     HCatRecord record = new DefaultHCatRecord(3); // Including partitions 
     record.set(0, value.toString()); 

     // partitions must be set after non-partition fields 
     record.set(1, "0"); // partition0=0 
     record.set(2, "1"); // partition1=1 

     MultiOutputFormat.write("table0", null, record, context); 
     MultiOutputFormat.write("table1", null, record, context); 
    } 
}