2015-12-14 110 views
0

我試圖實現UDF函數來處理各種源/輸入文件。輸入文件因列數而異。我的意圖是具有通用的UDF功能。每個豬腳本運行一種類型的輸入文件(相同數量的記錄用'|'分隔)Apache Pig UDF和outputSchema定製

UDF函數應該讀取由分隔符(|)分隔的所有輸入記錄並生成一個包含兩個元組的袋子一些條件,例如 輸入(1,2,3,4,5,6)輸出 a){(1,3),(2,4,5,6)} 或 b){(2, 3,4),(1,5,6)}

我無法擴展outputSchema方法來處理創建大小不同的元組。無法將額外參數傳遞給outputSchema方法。它不可能使用定義爲EvalFunc類定義一部分的臨時變量,因爲每次運行它的值都是空值。

任何提示?謝謝

UPDATE:

我執行下面使用咕嚕的命令時,inputSchema提供,你可以經過 「AS」

sourceData = foreach sourceData generate com.pig.Data('test.json', *) as (t:(s:(VIN: chararray,Birthdate: chararray), n:(name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray))); 

的UDF代碼是在這裏看到...

public Schema outputSchema(Schema input) { 

(line 233)System.out.println(「----------------------」+ input.getFields()。size());

錯誤:

Pig Stack Trace 
--------------- 
ERROR 1200: java.lang.NullPointerException 

Failed to parse: java.lang.NullPointerException 
     at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:201) 
     at org.apache.pig.PigServer$Graph.validateQuery(PigServer.java:1707) 
     at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1680) 
     at org.apache.pig.PigServer.registerQuery(PigServer.java:623) 
     at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:1082) 
     at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:505) 
     at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230) 
     at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205) 
     at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:66) 
     at org.apache.pig.Main.run(Main.java:565) 
     at org.apache.pig.Main.main(Main.java:177) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.hadoop.util.RunJar.run(RunJar.java:221) 
     at org.apache.hadoop.util.RunJar.main(RunJar.java:136) 
Caused by: java.lang.RuntimeException: java.lang.NullPointerException 
     at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:306) 
     at org.apache.pig.newplan.logical.expression.UserFuncExpression.getFieldSchema(UserFuncExpression.java:244) 
     at org.apache.pig.newplan.logical.optimizer.FieldSchemaResetter.execute(SchemaResetter.java:264) 
     at org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor.visit(AllSameExpressionVisitor.java:143) 
     at org.apache.pig.newplan.logical.expression.UserFuncExpression.accept(UserFuncExpression.java:113) 
     at org.apache.pig.newplan.ReverseDependencyOrderWalker.walk(ReverseDependencyOrderWalker.java:70) 
     at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52) 
     at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visitAll(SchemaResetter.java:67) 
     at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:122) 
     at org.apache.pig.newplan.logical.relational.LOGenerate.accept(LOGenerate.java:245) 
     at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75) 
     at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:114) 
     at org.apache.pig.parser.LogicalPlanBuilder.buildForeachOp(LogicalPlanBuilder.java:1055) 
     at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15896) 
     at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933) 

     at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560) 
     at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421) 
     at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191) 
     ... 16 more 
Caused by: java.lang.NullPointerException 
     at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:233) 
     ... 34 more 
================================================================================ 

UPDATE2:

確定,輸入模式從先前豬命令傳播...

sourceData =負載 'test.csv' 使用PigStorage(」, ')as(VIN:chararray,出生日期:chararray,名字:chararray,customerId:chararray,里程:chararray,Fuel_Consumption:chararray); (t:(s:(VIN:chararray,出生日期:chararray),n :(名稱:chararray,customerId:chararray,里程:chararray,Fuel_Consumption:chararray)));

這是沒有用的 - (因爲它不可能傳播任何額外的屬性或它不可能創建內部 outputSchema方法;-(

回答

0

任何其他更復雜的邏輯在outputSchema功能,您可以訪問到輸入模式,並使用輸入模式信息動態基於輸入生成您的輸出模式(如果輸入某種程度上反映了預期的輸出) 例:

public Schema outputSchema(Schema input) { 
    Schema mySchema = new Schema(); 
    if (input.getFields().size() == 3) { 
     mySchema.add(new Schema.FieldSchema("data1", DataType.DOUBLE)); 
     mySchema.add(new Schema.FieldSchema("data2", DataType.DOUBLE)); 
     mySchema.add(new Schema.FieldSchema("data3", DataType.DOUBLE)); 
    } else { 
     mySchema.add(new Schema.FieldSchema("data", DataType.CHARARRAY)); 
    } 
    return mySchema; 
    } 

我希望這有助於

+0

嗨,這並不像預期的那樣工作;-(查看更新... – heap