Django в мире микросервисов и Kafka

Mikalai Saskavets

the App

Django is Awesome for Rapid Development

art by René Aigner

Monolithic Rapid Developed MVP

Microservices: Coupling and Cohesion?

Distributed Monolith?

Here we are

Synchronous Communication

Synchronous Communication

How to bring a pinch of Async to Django world?



Celery

Microservices

Introduce Celery

Synchronous Communication

~Async Communication (Celery)

~Async Communication (Celery)


shall we use
Kafka here???

What is Kafka?

Released in 2011 by LinkedIn

Distributed Streaming Platform

Kafka for what??

Kafka for what??

Alternatives for Kafka

1
2
3

Some throughput comparsion

Some throughput comparsion

Why Kafka???

Why Kafka???

Kafka: Overview

Kafka: Overview

Kafka Cluster

Let's start with something simple

1 topic
2 partitions per topic
1 consumers group
1 consumer in group
1 producer

Something simple ;-)

Messages flow

Strategies to put a message to a specific partiton

Kafka: Message (Record) Format

            
length: varint
attributes: int8
    bit 0~7: unused
timestampDelta: varint
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]
        

Kafka: Message


  key: byte[ ]
value: byte[ ]

Kafka: Message: Serialization

            
msg = Message()
msg['key'] = custom.serialize("some key value")
msg['value'] = custom.serialize( {
                                    "foo": "1",
                                    "bar": 1
                                 } )
        

Kafka: Serializers

JSON, Protobuf, Thrift, Avro

Avro: Pros

  1. platform independent
  2. binary compressed data
  3. schema (in JSON format)
  4. schema migration flow
  5. schema is in the message or somewhere outside
  6. ...

Kafka: Schema overhead

Fig. from confluent.io

Kafka: Schema overhead

Approach Schema Payload Total Overhead
JSON (Schemaless) 0 74 74 ~ 2x
Schema + Avro Payload 204 34 238 ~ 6x
Schema ID + Avro Payload 4 34 38    1x

Where to store Schema?

Kafka: Schema Registry Concept

Messages flow

Messages flow (with Schema Registry)

Messages flow. Consumer Side. Part 1

Messages flow. Consumer Side. Part 2

Messages flow. Consumer Side. Part 3

Messages flow. Consumer Side. Part 4

Messages flow. Consumer Side. Part 5

Messages flow. Consumer Side. Part 6

Let's see some code

Do it with Python!

Python libraries to work with Kafka

What to choose?

kafka-python pykafka confluent-kafka
Stars on GitHub ~ 3 400 ~ 1 000 ~ 1 500
Contributors* 0 / 4 / 13 / 186 1 / 5 / 10 / 77 0 / 1 / 6 / 57
Releases track fine hm… (2018???) fine
Development track fine hm… fine
*
  a / b / c / d
  a — with more 1k commits
  100 ≤ b < 1000
   10 ≤ c < 100
  d — all contributors

What about Throughput? Some benchmarks

time (s) MBs/s Msgs/s
confluent-kafka 5.4 17 183 000
kafka-python 68 1.4 15 000

confluent-kafka-python

Some preparations



        # vars.py            
        import sys
        
        
        server = sys.argv[1]    # <-- kafka server url
        registry = sys.argv[2]  # <-- schema registry url

        
        topic = sys.argv[3]     # <-- topic name
        
        group = sys.argv[0]     # <-- consumer group id
        

Kafka Producer on Python


import vars
from confluent_kafka import Producer
conf = {'bootstrap.servers': vars.server}







producer = Producer(**conf)

for num in range(1000):
    producer.produce(vars.topic, value=str(num))


# Wait until all messages have been delivered
producer.flush()
    

Kafka Producer on Python


import vars
from confluent_kafka import Producer
conf = {'bootstrap.servers': vars.server}







producer = Producer(**conf)

for num in range(1000):
    producer.produce(vars.topic, value=str(num),
                                   key=str(num))

# Wait until all messages have been delivered
producer.flush()
    

Kafka Consumer on Python


import vars
from confluent_kafka import Consumer
conf = {'bootstrap.servers': vars.server,
        'group.id': vars.group,
        'enable.auto.commit': 'true'}
        

consumer = Consumer(conf)
consumer.subscribe([vars.topic])
try:
    while True:
        message = c.poll(timeout=1.0)
        if message is None: continue
        else:
            print(message)
finally:
    consumer.close() # ... to commit final offsets.
        

Avro Schema for messages


# avro_schemas.py
from confluent_kafka import avro

record_schema = avro.loads("""
    {
        "namespace": "example.net.foobar.serialization.avro",
        "name": "FooBar",
        "type": "record",
        "fields": [
            {"name": "foo", "type": "string"},
            {"name": "bar", "type": "int"},
        ]
    }
""")
        

Kafka Producer with Avro


import vars
from confluent_kafka.avro import AvroProducer
from avro_schemas import record_schema

conf = {'bootstrap.servers': vars.server,
        'schema.registry.url': vars.registry}

producer = AvroProducer(conf, default_value_schema=record_schema)

for num in range(1000):
    producer.produce(vars.topic, 
                     value={"foo": str(num), "bar": num})


# Wait until all messages have been delivered
producer.flush()
    

Kafka Producer with Avro


import vars
from confluent_kafka.avro import AvroProducer
from avro_schemas import record_schema

conf = {'bootstrap.servers': vars.server,
        'schema.registry.url': vars.registry}

producer = AvroProducer(conf, default_value_schema=record_schema)

for num in range(1000):
    producer.produce(vars.topic, 
                     value={"foo": str(num), "bar": num},
                     value_schema=record_schema)

# Wait until all messages have been delivered
producer.flush()
    

Kafka Consumer with Avro


import vars
from confluent_kafka.avro import AvroConsumer


conf = {'bootstrap.servers': vars.server, 'group.id': vars.group,
        'schema.registry.url': vars.registry}

consumer = AvroConsumer(conf)
consumer.subscribe([vars.topic])
try:
    while True:
        message = c.poll(timeout=1.0)
        if message is None: continue
        else:
            print(message.value()['bar'])
finally:
    consumer.close() # ... to commit final offsets.
        

Launch Kafka Consumer with Django


            
            
            
        ./manage.py run_kafka_consumer
        

Microservices!
Go Ahead!

With Celery

With Kafka

~Async Communication (Celery)

~Async Communication (Kafka)

With Kafka

With Kafka. Don't forget about databases

~Async Communication (Kafka)

What if we have few databases?

Relational Database → Kafka

Stream data from RDBS to Kafka

Fig. from debezium.io

Stream data from PostgreSQL to Kafka

Fig. from debezium.io

With Kafka and databases

Introduce Debezium

What if we have few databases?

How Debezium helps us

Application Diagram

Application Diagram. Rearrange a bit

Application Diagram + External Data Handling

Application Diagram + Marketing Website

Summary?

Thanks!

Questions?

Kafka: Replication

some details on dzone.com

Kafka: Reset Consumer Group Offsets tooling


  Reset to Datetime
  Reset from Duration
  Reset to Earliest
  Reset to Latest
  Reset to Current Time
  Reset to Offset
  Shift Offset by 'n'
  Reset from file

            

More strategies to put a message to a specific partiton