2017-08-30 433 views
2

我正在研究kafka用例,我需要在生產者&消費者一方擁有事務性語義。我能夠使用kafka事務API 0.11將事務性消息發佈到kafka集羣,但消費者我現在面臨這個問題身邊...我已isolation.level=read_committed屬性文件中,但我不能消耗it..I可以看到郵件被消耗與isolation.level=read_uncommitted但這不希望..Kafka transactional生產者和消費者

生產者代碼

package com.org.kafkaPro; 

import java.io.File; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URL; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.errors.OutOfOrderSequenceException; 
import org.apache.kafka.common.errors.ProducerFencedException; 

import kafka.common.AuthorizationException; 
import kafka.common.KafkaException; 

    public class ProducerWithTx 
    { 


     public static void main(String args[]) throws FileNotFoundException { 
      URL in = ProducerWithTx.class.getResource("producertx.properties"); 

      Properties props = new Properties(); 

      try { 
       props.load(new FileReader(new File(in.getFile()))); 
      } catch (IOException e1) { 
       // TODO Auto-generated catch block 
       e1.printStackTrace(); 
      } 


      Paymnt pay1= new Paymnt(); 
      pay1.setAccid(1); 
      pay1.setAccountstate("y"); 
      pay1.setAccountzipcode(111); 
      pay1.setBankid(12); 
      pay1.setCreditcardtype(15); 
      pay1.setCustomerid("2"); 
      SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd"); 
      Date t = null; 
      try { 
       t = ft.parse("2017-11-10"); 
      } catch (ParseException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      pay1.setPeriodid(t); 

      String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date()); 
      props.put("transactional.id", "accid=" + pay1.getAccid() + " custid=" +pay1.getCustomerid()+ " timestmp=" +timeStamp); 
      KafkaProducer<String, Paymnt> producer = new KafkaProducer(props); 
      producer.initTransactions(); 
      try{ 
       producer.beginTransaction(); 



       //RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get(); 

       producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)); 
       producer.commitTransaction(); 

       //System.out.println("written to" +metadata.partition()); 

      } 
      catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){ 

       // We can't recover from these exceptions, so our only option is to close the producer and exit. 
       producer.close(); 
      } 
      catch(KafkaException e) { 
       // For all other exceptions, just abort the transaction and try again. 
       producer.abortTransaction(); 
      } 
      producer.close(); 
     } 

    } 

producertx.properties

metadata.broker.list=localhost:9092 
bootstrap.servers=localhost:9092 
acks=all 
retries=1 
batch.size=16384 
linger.ms=1 
buffer.memory=33554432 
key.serializer=org.apache.kafka.common.serialization.StringSerializer 
value.serializer=com.org.kafkaPro.PaySerializer 
#transactional.id=1 
enable.idempotence=true 
num.partitions=3 

消費者

package com.org.kafkaPro; 

import java.io.File; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URL; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 

public class Consumer { 

    private static List<ConsumerMultiThreaded> consumersGroup; 


    public static void main(String args[]) throws IOException { 


     URL in = ProducerWithTx.class.getResource("consumer.properties"); 

     Properties props = new Properties(); 

     try { 
      props.load(new FileReader(new File(in.getFile()))); 
     } catch (IOException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 

     consumersGroup=new ArrayList<ConsumerMultiThreaded>(); 
     ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props); 

     consumersGroup.add(con1); 
     consumersGroup.add(con2); 
     consumersGroup.add(con3); 
     consumersGroup.add(con4); 

     for (ConsumerMultiThreaded consumer : consumersGroup) { 

      Thread t=new Thread(consumer); 
      t.start(); 

     } 

     while(true){ 
      try { 
       Thread.sleep(100000); 
      } catch (InterruptedException ie) { 

      } 


     } 
    } 
} 

消費者的Runnable

public class ConsumerMultiThreaded implements Runnable { 

private final AtomicBoolean closed = new AtomicBoolean(false); 
    private final KafkaConsumer<String, Paymnt> consumer; 
    private final int minBatchSize =3; 
    private final List<ConsumerRecord<String, Paymnt>> buffer; 


    public ConsumerMultiThreaded(Properties props){ 
     this.consumer= new KafkaConsumer<String, Paymnt>(props); 
     buffer = new ArrayList(minBatchSize); 
    } 

    @Override 
    public void run() { 
     try { 
      consumer.subscribe(Arrays.asList("test")); 
      while (!closed.get()) { 
       ConsumerRecords<String,Paymnt> records = consumer.poll(10000); 

       for (ConsumerRecord<String, Paymnt> record : records) { 
        buffer.add(record); 
       } 

       /*for (ConsumerRecord<String, Paymnt> record : records) 
       { 
        System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString()); 
       }*/ 
       if(buffer.size()>=minBatchSize){ 
        for (TopicPartition partition : records.partitions()) { 
         List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition); 
         for (ConsumerRecord<String, Paymnt> record : partitionRecords) { 
          System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString()); 
         } 
         long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
         consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
         buffer.clear(); 
        } 
       } 
      } 

     } catch (WakeupException e) { 
      // Ignore exception if closing 
      if (!closed.get()) throw e; 
     } 
     finally { 
      consumer.close(); 
     } 

    } 

    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

} 

consumer.properties

bootstrap.servers=localhost:9092 
session.timeout.ms=30000 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=com.org.kafkaPro.PayDeserializer 
enable.auto.commit=false 
auto.offset.reset=earliest 
group.id=test 
isolation.level=read_committed 

感謝您您help..Thank

回答

2

的In y我們正在使用的生產商物業#transactional.id=1(正如您提到的問題)在此您提到了#符號。這可能會造成問題。

如果不是您可以轉儲您的主題和__transaction_state主題的日誌段,並從那裏您可以輕鬆地調試出了什麼問題。

+0

如果你看到我的代碼,然後初始化kafka生產者,我會在運行時將事務ID。其實它可以抱怨當我打電話producer.beginTransaction()時沒有發現事務id ..這個問題是別的。 –

+1

我可以知道您的Kafka經紀人在哪個操作系統上?如果他們在窗戶上比有問題。 你可以分享你的控制檯調試日誌生產者。 並共享轉儲日誌段。使用此命令'kafka-run-class.bat kafka.tools.DumpLogSegments --files <.index,.log和.timestamp>' 放置像'--transaction-log-decoder'這樣的標誌如果你正在轉儲transaction_state主題日誌。這對我來說更有幫助,可以告訴你真正的問題。 –

+1

和消費者的調試日誌。 –