2016-09-14 287 views
1

我一直在嘗試使用python kafka庫,現在不能讓生產者工作。python kafka庫的編碼/格式化問題

經過一番研究後,我發現卡夫卡向消費者發送了一個額外的5字節頭(一個0字節,一長包含模式註冊表的模式ID) 。我設法通過簡單地刪除第一個字節來讓消費者工作。

我在編寫製作人時應該預先寫上一個類似的頭文件嗎?

散發出來的異常下面:

[2016-09-14 13:32:48,684] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) 
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 
    Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! 

我同時使用卡夫卡和python-卡夫卡的最新穩定版本。

編輯

消費者

from kafka import KafkaConsumer 
import avro.io 
import avro.schema 
import io 
import requests 
import struct 

# To consume messages 
consumer = KafkaConsumer('hadoop_00', 
         group_id='my_group', 
         bootstrap_servers=['hadoop-master:9092']) 

schema_path = "resources/f1.avsc" 
for msg in consumer: 
    value = bytearray(msg.value) 
    schema_id = struct.unpack(">L", value[1:5])[0] 
    response = requests.get("http://hadoop-master:8081/schemas/ids/" + str(schema_id)) 
    schema = response.json()["schema"] 
    schema = avro.schema.parse(schema) 
    bytes_reader = io.BytesIO(value[5:]) 
    # bytes_reader = io.BytesIO(msg.value) 
    decoder = avro.io.BinaryDecoder(bytes_reader) 
    reader = avro.io.DatumReader(schema) 
    temp = reader.read(decoder) 
    print(temp) 

生產者

from kafka import KafkaProducer 
import avro.schema 
import io 
from avro.io import DatumWriter 

producer = KafkaProducer(bootstrap_servers="hadoop-master") 

# Kafka topic 
topic = "hadoop_00" 

# Path to user.avsc avro schema 
schema_path = "resources/f1.avsc" 
schema = avro.schema.parse(open(schema_path).read()) 
range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
for i in range: 
    producer.send(topic, b'{"f1":"value_' + str(i)) 
+0

您的生產者和代碼消費者請。這將有助於把所有的事情都集中在一起。 – thiruvenkadam

+0

@thiruvenkadam你去吧 –

回答

1

我能有我的Python生產者將消息發送到卡夫卡連接用模式註冊地:

... 
import avro.datafile 
import avro.io 
import avro.schema 
from kafka import KafkaProducer 

producer = KafkaProducer(bootstrap_servers='kafka:9092') 
with open('schema.avsc') as f: 
    schema = avro.schema.Parse(f.read()) 

def post_message(): 
    bytes_writer = io.BytesIO() 
    # Write the Confluent "Magic Byte" 
    bytes_writer.write(bytes([0])) 
    # Should get or create the schema version with Schema-Registry 
    ... 
    schema_version = 1 
    bytes_writer.write(
     int.to_bytes(schema_version, 4, byteorder='big')) 

    # and then the standard Avro bytes serialization 
    writer = avro.io.DatumWriter(schema) 
    encoder = avro.io.BinaryEncoder(bytes_writer) 
    writer.write({'key': 'value'}, encoder) 
    producer.send('topic', value=bytes_writer.getvalue()) 

文檔關於「魔字節」: https://github.com/confluentinc/schema-registry/blob/master/docs/serializer-formatter.rst

+0

不得不更新到python 3.5並使用avro-python3庫來使其運行,謝謝! –

0

既然你與BinaryDecoder和DatumReader閱讀,如果英語新以相反的方式結束數據(使用BinaryEncoder作爲編碼器的DatumWriter),我想你的消息會很好。

事情是這樣的:

生產者

from kafka import KafkaProducer 
import avro.schema 
import io 
from avro.io import DatumWriter, BinaryEncoder 
producer = KafkaProducer(bootstrap_servers="hadoop-master") 

# Kafka topic 
topic = "hadoop_00" 

# Path to user.avsc avro schema 
schema_path = "resources/f1.avsc" 
schema = avro.schema.parse(open(schema_path).read()) 
# range is a bad variable name. I changed it here 
value_range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
for i in value_range: 
    datum_writer = DatumWriter(schema) 
    byte_writer = io.BytesIO() 
    datum_encoder = BinaryEncoder(byte_writer) 
    datum_writer.write({"f1" : "value_%d" % (i)}, datum_encoder) 
    producer.send(topic, byte_writer.getvalue()) 

我所做的一些變化是:

  • 使用DatumWriter和BinaryEncoder
  • 取而代之的是JSON的,我在字節流中發送一個字典(你可能需要用普通的字典來檢查你的代碼也可能工作;但我不確定)
  • 使用字節流將消息發送到kafka主題(對於我來說,有時會失敗,在這種情況下,我將.getvalue方法分配給變量並在producer.send中使用該變量。我不知道失敗的原因,但分配給一個變量始終工作)

我無法測試我添加的代碼。但這是我以前使用avro編寫的代碼片段。如果它不適合你,請在評論中告訴我。這可能是因爲我的生鏽的記憶。一旦我到達我的家我可以測試代碼,我會用一個工作更新這個答案。

+0

嘿!謝謝你的幫助。可悲的是我對它進行了測試,並得到了關於這個「魔術字節」的相同例外。 –

+0

此外,我現在使用kafka java api在java中編寫了一個小生產者,並且我得到了完全相同的錯誤。 –