2017-08-24 87 views
0

我已經寫了簡單的樣品甚至實現AppendTableSink接口後下沉表,但得到這個例外在Apache中弗林克實施AppendTableStream接口後獲得下面的錯誤。即使在阿帕奇弗林克

package com.cc.flink.functionUtils; 

    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.Collection; 
    import java.util.Iterator; 

    import org.apache.flink.api.common.functions.IterationRuntimeContext; 
    import org.apache.flink.api.common.functions.MapFunction; 
    import org.apache.flink.api.common.functions.RichFunction; 
    import org.apache.flink.api.common.io.OutputFormat; 
    import org.apache.flink.api.common.typeinfo.TypeInformation; 
    import org.apache.flink.api.java.io.LocalCollectionOutputFormat; 
    import org.apache.flink.api.java.tuple.Tuple2; 
    import org.apache.flink.api.java.typeutils.TupleTypeInfo; 
    import org.apache.flink.configuration.Configuration; 
    import org.apache.flink.contrib.streaming.DataStreamUtils; 
    import org.apache.flink.streaming.api.datastream.DataStream; 
    import org.apache.flink.streaming.api.datastream.DataStreamSink; 
    import org.apache.flink.streaming.api.datastream.DataStreamSource; 
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    import org.apache.flink.streaming.api.functions.sink.SinkFunction; 
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; 
    import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; 
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 
    import org.apache.flink.table.api.Table; 
    import org.apache.flink.table.api.TableEnvironment; 
    import org.apache.flink.table.api.java.StreamTableEnvironment; 
    import org.apache.flink.table.sinks.AppendStreamTableSink; 
    import org.apache.flink.table.sinks.RetractStreamTableSink; 
    import org.apache.flink.table.sinks.TableSink; 
    import org.apache.flink.types.Row; 




    public class MyTable implements AppendStreamTableSink<Row>{ 



     @Override 
     public TableSink<Row> configure(String[] arg0, TypeInformation<?>[] arg1) { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public String[] getFieldNames() { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public TypeInformation<?>[] getFieldTypes() { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public TypeInformation<Row> getOutputType() { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public void emitDataStream(DataStream<Row> arg0) { 
      // TODO Auto-generated method stub 
      arg0.print(); 

     } 




     public static void main(String[] args) throws Exception { 

      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
      final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() 
        .setHost("localhost") 
        .setVirtualHost("/") 
        .setUserName("guest") 
        .setPassword("guest") 
        .setPort(5672) 
        .build(); 


      final DataStream<String> stream = env 
        .addSource(new RMQSource<String>(
          connectionConfig,   // config for the RabbitMQ connection 
          "test",     // name of the RabbitMQ queue to consume 
          true,      // use correlation ids; can be false if only at-least-once is required 
          new SimpleStringSchema())) // deserialization schema to turn messages into Java objects 
        .setParallelism(1); 



      final ArrayList<String> values = new ArrayList<>(); 
      StreamTableEnvironment StreamTableEnv = TableEnvironment.getTableEnvironment(env); 
      Table fromDataStream = StreamTableEnv.fromDataStream(stream, 
        "member_id"); 
      StreamTableEnv.registerTable("emp1",fromDataStream); 
      Table output =StreamTableEnv.sql("select count(*) from emp1 where member_id Like '%test%'"); 
      fromDataStream.writeToSink(new MyTable()); 
      env.execute(); 

     } 

    }  

的log4j:警告沒有附加目的地可以爲記錄器(org.apache.calcite.sql.parser)中找到。 log4j:WARN請正確初始化log4j系統。 的log4j:WARN看http://logging.apache.org/log4j/1.2/faq.html#noconfig在線程的詳細信息

異常 「主要」 org.apache.flink.table.api.TableException:流表只能由AppendStreamTableSink發出,RetractStreamTable

在org.apache。 flink.table.api.StreamTableenvironment.writeToSink(StreamTableenvironment.scala:219)

在org.apache.flink.table.api.Table.writeToSink(table.scala:800)

在org.apache。 flink.table.api.Table.writeToSink(table.scala:773)

在com.cc.flink.functionutils.MyTable.main(MyTable.java:103)

回答

0

在你的榜樣的問題是,你要使用的AppendTableSink但你的查詢產生的撤稿。這是因爲在你的語句COUNT(*)。每當新行到達時,舊的發射計數不再有效,需要收回。

如果這純粹是一個SELECT *,比每傳入行會產生只有一個輸出行不影響以前行。

+0

嗨twalthr感謝您的答覆,如果你看到的代碼我試圖下沉命名錶「fromDataStream」不表命名。首先我試圖實現retractTableSink它不會工作,然後我試圖執行appendTableSink「輸出」 &試圖沉沒fromDataStream,但在兩種情況下都得到相同的錯誤 –