Tuesday 20 July 2021

Streaming NRT data with kafka connect and Debezium

Event driven architecture principle is very popular for past few years in microservice systems. Providing Near Real Time (NRT) data to other services as soon as possible is core problem for all organisations. 
This is also known as Change data capture Design pattern where capturing each database change event, then wrapping it and pushing it to other pipeline or services. 
This is Duality of Streams and Tables where database table change events are pushed over a stream to other systems by an order of timestamp just to make a snapshot of database at other systems. 

Debezium and kafka connect provides fault tolerant and scalable solutions for above problems.

What is kafka connect

It is a framework for streaming data into Kafka and out of Kafka. The Confluent Platform ships with several built-in connectors that can be used to stream data to or from commonly used systems such as relational databases or HDFS.

What is Debezium

Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases.

Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. So Debezium provides input source implementation of kafka connect for various databases like mysql and postgres.



How Debezium works with database like mysql

In a master slave architecture like mysql, master node change events are synced to slave node in an asynchronous manner. This synching is achieved by pushing every DB change event to a bin log file on master node in an order by timestamp.

Slave nodes create a replica of binary log file of master binary log file and slave node updates its own DB state.

Debezium works by connecting to MySQL and pretending to be a replica. MySQL sends its replication data to Debezium, thinking it’s actually funneling data to another downstream MySQL instance.


Types of Mysql Replication

There are two ways of replicating a MySQL database:

  1. Statement-based replication (SBR)
  2. Row-based replication (RBR)

Statement-based replication : In this type of replication, sql statements are written to bin log files of master node and then these sql statements are copied to slave node bin log file.

Row-based replication : This type of replication contains actual data modified in rows of a table.

Row based replication is generally used for debezium and so make sure to enable row based logs on mysql slave server.

Types of events generated : Insert, Update, Delete and Refresh

These events contain before and after data of database operation.


What is GTID

For fail safe replication across all slave nodes, database requires a unique identifier associated with every transaction.

It is identical across all the servers in a given replication setup. It helps in the process of recovery of failure. Once a transaction with a given GTID is committed on a given server, any subsequent transaction having the same GTID is ignored by that server. Thus, a transaction committed on the source can be applied no more than once on the replica, which helps to guarantee consistency.

Make sure that you have enabled gtid on all mysql servers while setting up debezium. This feature was introduced in MySQL 5.6.

An example of debezium packet on update event on mysql table :

Update user_profile set first_name="Sachin Ramesh" where id=1005; 

       
{
  "before": {
    "id": 1005,
    "first_name": "Sachin",
    "last_name": "tendulkar",
    "email": "sachin@gmail.org"
  },
  "after": {
    "id": 1005,
    "first_name": "Sachin Ramesh",
    "last_name": "tendulkar",
    "email": "sachin@gmail.org"
  },
  "source": {
    "name": "mysql-server-1",
    "server_id": 226345,
    "ts_sec": 1265881,
    "gtid": null,
    "file": "mysql-bin.000004",
    "pos": 484,
    "row": 0,
    "snapshot": null
  },
  "op": "u",
  "ts_ms": 7462281029529
}


How to create or modify debezium connector

Connectors in Kafka Connect define where data should be copied to and from. A connector instance is a logical job that is responsible for managing the copying of data between Kafka and another system. Connector plugins are jars that add the classes that implement a connector. In distributed mode JSON payload is sent over REST apis to create or modify the connector. 

Below are few kafka connect http apis for configuring connectors :

// get all connectors name

curl -X GET http://localhost:8083/connectors

// get kafka connector configuration

curl -X GET http://localhost:8083/connectors/my_connector_name

// create or edit kafka connect configuration
curl -X POST   http://localhost:8083/connectors -d'
{
   "name":"my_debizium_mysql_connector",
   "config":{
      "connector.class":"io.debezium.connector.mysql.MySqlConnector",
      "database.user":"debezium",
      "database.server.name":"<database_name>",
      "database.port":"3306",
      "database.server.id":"<any_random_number>",
      "database.hostname":"<database_host>",
      "database.password":"<database_password>",

      "database.history.kafka.topic":"<topic_name>",
      "database.history.kafka.bootstrap.servers":"<kafka_broker_ip>:9092",
      "database.history.producer.enable.idempotence":"true",
      "event.deserialization.failure.handling.mode":"warn",
      "time.precision.mode":"connect",
      
      "include.schema.changes":"true",
      "table.whitelist":"<database_name>.<table_name>",
      "inconsistent.schema.handling.mode":"ignore",
      
      "name":"my_debizium_mysql_connector",
      "database.history.skip.unparseable.ddl":"true",
      "database.history.store.only.monitored.tables.ddl":"true",
      "database.whitelist":"<database_name>",
      "snapshot.mode":"schema_only"
   }
}
'

possible values of snapshot.mode are : schema_only , schema_only_recovery

kafka topic name where database table change events will be pushed : {database.whitelist} + "." + {table.whitelist}

For kafka producer config use database.history.producer.* 

// connector status

curl -X GET http://localhost:8083/connectors/my_connector_name/status

// delete connector

curl -X DELETE http://localhost:8083/connectors/my_connector_name

// pause connector

curl -X PUT http://localhost:8083/connectors/my_connector_name/pause

// resume connector

curl -X PUT http://localhost:8083/connectors/my_connector_name/resume


Reference links :

Mysql CDC projects 

Kafka connect 

Debezium

Debezium Architecture


I hope you find this information very helpful. Post your comments for any improvements.

2 comments:

  1. Nice. Very informative blog on kafka connect and debezium. Will wait for more blogs

    ReplyDelete
  2. Change data capture such an important concept, well explained

    ReplyDelete