11

我想爲DataSet中的Row類型編寫一個編碼器,用於我正在執行的映射操作。本質上,我不懂如何編寫編碼器。用於行類型Spark數據集的編碼器

下面是一個地圖操作的示例:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() { 
      @Override 
      public Iterator<String> call(Row row) throws Exception { 

       ArrayList<String> obj = //some map operation 
       return obj.iterator(); 
      } 
     },Encoders.STRING()); 

我理解而不是字符串編碼器,需要如下要被寫入:

Encoder<Row> encoder = new Encoder<Row>() { 
     @Override 
     public StructType schema() { 
      return join.schema(); 
      //return null; 
     } 

     @Override 
     public ClassTag<Row> clsTag() { 
      return null; 
     } 
    }; 

然而,我不理解編碼器中的clsTag(),我試圖找到一個可以解釋類似事物的運行示例(即一個行類型的編碼器)

編輯 - 這不是上述問題的副本:Encoder error while trying to map dataframe row to updated row作爲答案在Spark 2.x中使用Spark 1.x(我沒有這樣做),我也在尋找一個編碼器而不是解決錯誤。最後,我正在尋找Java中的解決方案,而不是在Scala中。

回答

9

答案是使用RowEncoder和使用TypeStruct的數據集的模式。

下面是一個數據集A flatmap操作的工作示例:

StructType structType = new StructType(); 
    structType = structType.add("id1", DataTypes.LongType, false); 
    structType = structType.add("id2", DataTypes.LongType, false); 

    ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); 

    Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() { 
     @Override 
     public Iterator<Row> call(Row row) throws Exception { 
      // a static map operation to demonstrate 
      List<Object> data = new ArrayList<>(); 
      data.add(1l); 
      data.add(2l); 
      ArrayList<Row> list = new ArrayList<>(); 
      list.add(RowFactory.create(data.toArray())); 
      return list.iterator(); 
     } 
    }, encoder); 
+0

應該不是這個無法在集羣模式,因爲ArrayList的不序列化 – user482963