Building a microservices-based application using Kafka and Django
PiterPy 2019
Microservices: Coupling and Cohesion?
Distributed Monolith?
Microservices: Synchronous Communication
( REST API )
Microservices: Synchronous Communication
( REST API )
Microservices: Async Communication (Celery)
Microservices: Async Communication (Celery)
should we use
Kafka here???
What is
Kafka?
What is
Kafka?
Distributed Streaming Platform
Initially conceived as a messaging queue... Now Apache Kafka is a full-fledged distributed event streaming platform capable of handling trillions of events a day.
[↪]
Kafka for what??
Messaging
Metrics
Website Activity Tracking
Log Aggregation
Stream Processing
Event Sourcing
Commit Log
Queue
Alternatives for Kafka
RabbitMQ
ActiveMQ
NATS
Apache Pulsar
NSQ
AWS Kinesis
Kafka: Overview
Kafka: Partitions for Topics
Kafka: Partitions: few consumer groups
Kafka: Consumers Offsets Example for 1 Partition
┌───[Consumer Group 1]
│
1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18
│
└───[Consumer Group 2]
Kafka: Consumers Offsets Example for 1 Partition
┌───[Consumer Group 1]
│
1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20
│
└───[Consumer Group 2]
Kafka: Consumers Offsets Example for 1 Partition
┌───[Consumer Group 1]
│
1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23
│ │
│ └───[Consumer Group 2]
│
└───[Consumer Group 3]
Kafka: Consumers Offsets Example for 1 Partition
[Consumer Group 4]───┐
│
┌───[Consumer Group 1] │
│ │
1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23
│ │
│ └───[Consumer Group 2]
│
└───[Consumer Group 3]
Kafka: Reset Consumer Group Offsets tooling
Reset to Datetime
Reset from Duration
Reset to Earliest
Reset to Latest
Reset to Current Time
Reset to Offset
Shift Offset by 'n'
Reset from file
Kafka: Consumers Offsets Example for 3 Partitions
┌───[CG 1 C 1]
│
Partition 1 >> 11|42|73|104|135|166|197|228|259
│
└───[CG 2 C 1]
┌───[CG 1 C 2]
│
Partition 2 >> 21|52|83|114|145|176|207|238
│
└───[CG 2 C 2]
┌───[CG 1 C 3]
│
Partition 3 >> 31|62|93|124|155|186|217|248
│
└───[CG 2 C 3]
Kafka: Overview, again :-)
Kafka: Cluster
Kafka: Partition Leaders
Kafka: Replication
Kafka: Message structure
key, value
byte[], byte[]
Kafka: Serializers
JSON, Protobuf, Thrift, Avro
Kafka: Schema overhead
Kafka: Schema Registry Concept
Microservices: Async Communication (Celery)
Microservices: Async Communication (Kafka)
Microservices: Async. What about databases?
Relational Database → Kafka
Stream data from RDBS to Kafka
Stream data from PostgreSQL to Kafka
Microservices: Async. What about databases?
Microservices: Async. Debezium
Microservices: Async. Debezium. Full
Application Diagram
Application Diagram. External Data Handling
kafka-python
pykafka
confluent-kafka-python
aiokafka
What to choose?
|
kafka-python |
pykafka |
confluent-kafka |
Stars |
~ 3 400 |
~ 1 000 |
~ 1 500 |
Contributors* |
0 / 4 / 13 / 186 |
1 / 5 / 10 / 77 |
0 / 1 / 6 / 57 |
Releases track |
fine |
hm… (2018???) |
fine |
Development track |
fine |
hm… |
fine |
What about Throughput?
|
|
time (seconds) |
MBs/s |
Msgs/s |
producer |
confluent_kafka_producer |
5.4 |
17 |
183 000 |
producer |
pykafka_producer_rdkafka |
16 |
6.1 |
64 000 |
producer |
pykafka_producer |
57 |
1.7 |
17 000 |
producer |
python_kafka_producer |
68 |
1.4 |
15 000 |
consumer |
confluent_kafka_producer |
5.4 |
17 |
183 000 |
consumer |
pykafka_producer_rdkafka |
16 |
6.1 |
64 000 |
consumer |
pykafka_producer |
57 |
1.7 |
17 000 |
consumer |
python_kafka_producer |
68 |
1.4 |
15 000 |
Let's code!
from confluent_kafka import Producer
import sys
if __name__ == '__main__':
broker = sys.argv[1]
topic = sys.argv[2]
producer = Producer(**{'bootstrap.servers': broker})
delivery_callback = lambda err, msg: print(err or msg)
for something in range(1000):
try:
producer.produce(topic, str(something),
callback=delivery_callback)
except BufferError:
print("Local producer queue is full, try again")
# Serve delivery callback queue.
producer.poll(0)
# Wait until all messages have been delivered
producer.flush()
Let's code!
from confluent_kafka import Consumer, KafkaException
import sys
if __name__ == '__main__':
broker, group, topics = sys.argv[1], sys.argv[2], sys.argv[3:]
conf = {'bootstrap.servers': broker, 'group.id': group,
'auto.offset.reset': 'earliest'}
clb = lambda consumer, p: print('Assigned partition:', p)
consumer = Consumer(conf)
consumer.subscribe(topics, on_assign=clb)
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
print(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value())
finally:
consumer.close() # ... to commit final offsets.