-1

我使用批量將數據插入Cassandra。當我運行這項工作時,我得到了異常。Cassandra批量InvalidQueryException - 批量太大

caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:136) 
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)  
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)  
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)  
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798) 
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)  
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005) 
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)  
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)  
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)  
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 

我已經閱讀了很多關於這個問題的博客。但這沒有幫助。我嘗試在初始化時將spark.cassandra.output.batch.size.bytes設置爲spark conf。仍然這不解決我的問題。我收到了同樣的錯誤。我的批處理有大約1000條插入語句。

請在下面找到我的代碼。

CassandraConnector connector = CassandraConnector.apply(javaSparkContext.getConf()); 
pairRDD.mapToPair(earnCalculatorKeyIterableTuple2 -> { 
      if (condition) { 
       do something...... 
      } 
      else { 
       Session session = connector.openSession(); 
       BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);   batch.setConsistencyLevel(ConsistencyLevel.valueOf(LOCAL_QUOROM)); 
       PreparedStatement statement = session.prepare('my insert query'); 
       for (condition) { 
        if (!condition) { 
         break; 
        } 
        Tuple2._2.forEach(s -> { 
         if (!condition) { 
          LOG.info(message); 
         } 
         else { 
          BoundStatement boundStatement = statement.bind("bind variables"); 
          batch.add(boundStatement); 
         } 
        }); 
        session.execute(batch); 
        batch.clear(); 
       } 
       session.close(); 
      } 
      return Tuple2; 
     }); 
     return s; 
    } 

感謝任何幫助。

+0

你實際使用Spark嗎?我問,因爲您的跟蹤似乎沒有任何Spark Cassandra Connector級別,並且更改batch.size.bytes會更改插入語句的數量。 – RussS

+0

是的,我正在使用spark-cassandra連接器。我試着給batch.size.bytes = auto。仍然沒有解決這個問題。 – sandy

+2

你真的可以提供一個代碼示例嗎? – RussS

回答

1

您正在手動創建批次,批次太大。將更少的行添加到每個batche。有很多方法可以手動執行此操作,但最簡單的方法就是添加一個每次添加X語句都會提交一個批次的計數器。

您正在更改的參數僅與saveToCassandra完成的自動批量有關。