*
a / b / c / d
a — with more 1k commits
100 ≤ b < 1000
10 ≤ c < 100
d — all contributors
What about Throughput? Some benchmarks
time (s)
MBs/s
Msgs/s
confluent-kafka
5.4
17
183 000
kafka-python
68
1.4
15 000
confluent-kafka-python
Some preparations
# vars.py
import sys
server = sys.argv[1] # <-- kafka server url
registry = sys.argv[2] # <-- schema registry url
topic = sys.argv[3] # <-- topic name
group = sys.argv[0] # <-- consumer group id
Kafka Producer on Python
import vars
from confluent_kafka import Producer
conf = {'bootstrap.servers': vars.server}
producer = Producer(**conf)
for num in range(1000):
producer.produce(vars.topic, value=str(num))
# Wait until all messages have been delivered
producer.flush()
Kafka Producer on Python
import vars
from confluent_kafka import Producer
conf = {'bootstrap.servers': vars.server}
producer = Producer(**conf)
for num in range(1000):
producer.produce(vars.topic, value=str(num),
key=str(num))
# Wait until all messages have been delivered
producer.flush()
Kafka Consumer on Python
import vars
from confluent_kafka import Consumer
conf = {'bootstrap.servers': vars.server,
'group.id': vars.group,
'enable.auto.commit': 'true'}
consumer = Consumer(conf)
consumer.subscribe([vars.topic])
try:
while True:
message = c.poll(timeout=1.0)
if message is None: continue
else:
print(message)
finally:
consumer.close() # ... to commit final offsets.
import vars
from confluent_kafka.avro import AvroProducer
from avro_schemas import record_schema
conf = {'bootstrap.servers': vars.server,
'schema.registry.url': vars.registry}
producer = AvroProducer(conf, default_value_schema=record_schema)
for num in range(1000):
producer.produce(vars.topic,
value={"foo": str(num), "bar": num})
# Wait until all messages have been delivered
producer.flush()
Kafka Producer with Avro
import vars
from confluent_kafka.avro import AvroProducer
from avro_schemas import record_schema
conf = {'bootstrap.servers': vars.server,
'schema.registry.url': vars.registry}
producer = AvroProducer(conf, default_value_schema=record_schema)
for num in range(1000):
producer.produce(vars.topic,
value={"foo": str(num), "bar": num},
value_schema=record_schema)
# Wait until all messages have been delivered
producer.flush()
Kafka Consumer with Avro
import vars
from confluent_kafka.avro import AvroConsumer
conf = {'bootstrap.servers': vars.server, 'group.id': vars.group,
'schema.registry.url': vars.registry}
consumer = AvroConsumer(conf)
consumer.subscribe([vars.topic])
try:
while True:
message = c.poll(timeout=1.0)
if message is None: continue
else:
print(message.value()['bar'])
finally:
consumer.close() # ... to commit final offsets.