Debezium and kafka connect provides fault tolerant and scalable solutions for above problems.
What is kafka connect
It is a framework for streaming data into Kafka and out of Kafka. The Confluent Platform ships with several built-in connectors that can be used to stream data to or from commonly used systems such as relational databases or HDFS.
What is Debezium
Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases.
Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. So Debezium provides input source implementation of kafka connect for various databases like mysql and postgres.
How Debezium works with database like mysql
In a master slave architecture like mysql, master node change events are synced to slave node in an asynchronous manner. This synching is achieved by pushing every DB change event to a bin log file on master node in an order by timestamp.
Slave nodes create a replica of binary log file of master binary log file and slave node updates its own DB state.
Debezium works by connecting to MySQL and pretending to be a replica. MySQL sends its replication data to Debezium, thinking it’s actually funneling data to another downstream MySQL instance.
Types of Mysql Replication
There are two ways of replicating a MySQL database:
- Statement-based replication (SBR)
- Row-based replication (RBR)
Statement-based replication : In this type of replication, sql statements are written to bin log files of master node and then these sql statements are copied to slave node bin log file.
Row-based replication : This type of replication contains actual data modified in rows of a table.
Row based replication is generally used for debezium and so make sure to enable row based logs on mysql slave server.
Types of events generated : Insert, Update, Delete and Refresh
These events contain before and after data of database operation.
What is GTID
For fail safe replication across all slave nodes, database requires a unique identifier associated with every transaction.
It is identical across all the servers in a given replication setup. It helps in the process of recovery of failure. Once a transaction with a given GTID is committed on a given server, any subsequent transaction having the same GTID is ignored by that server. Thus, a transaction committed on the source can be applied no more than once on the replica, which helps to guarantee consistency.
Make sure that you have enabled gtid on all mysql servers while setting up debezium. This feature was introduced in MySQL 5.6.
An example of debezium packet on update event on mysql table :
Update user_profile set first_name="Sachin Ramesh" where id=1005;
{
"before": {
"id": 1005,
"first_name": "Sachin",
"last_name": "tendulkar",
"email": "sachin@gmail.org"
},
"after": {
"id": 1005,
"first_name": "Sachin Ramesh",
"last_name": "tendulkar",
"email": "sachin@gmail.org"
},
"source": {
"name": "mysql-server-1",
"server_id": 226345,
"ts_sec": 1265881,
"gtid": null,
"file": "mysql-bin.000004",
"pos": 484,
"row": 0,
"snapshot": null
},
"op": "u",
"ts_ms": 7462281029529
}
How to create or modify debezium connector
Connectors in Kafka Connect define where data should be copied to and from. A connector instance is a logical job that is responsible for managing the copying of data between Kafka and another system. Connector plugins are jars that add the classes that implement a connector. In distributed mode JSON payload is sent over REST apis to create or modify the connector.
Below are few kafka connect http apis for configuring connectors :
// get all connectors name
curl -X GET http://localhost:8083/connectors
curl -X GET http://localhost:8083/connectors/my_connector_name
curl -X POST http://localhost:8083/connectors -d'
{
"name":"my_debizium_mysql_connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.user":"debezium",
"database.server.name":"<database_name>",
"database.port":"3306",
"database.server.id":"<any_random_number>",
"database.hostname":"<database_host>",
"database.password":"<database_password>",
"database.history.kafka.topic":"<topic_name>",
"database.history.kafka.bootstrap.servers":"<kafka_broker_ip>:9092",
"database.history.producer.enable.idempotence":"true",
"event.deserialization.failure.handling.mode":"warn",
"time.precision.mode":"connect",
"include.schema.changes":"true",
"table.whitelist":"<database_name>.<table_name>",
"inconsistent.schema.handling.mode":"ignore",
"name":"my_debizium_mysql_connector",
"database.history.skip.unparseable.ddl":"true",
"database.history.store.only.monitored.tables.ddl":"true",
"database.whitelist":"<database_name>",
"snapshot.mode":"schema_only"
}
}
'
possible values of snapshot.mode are : schema_only , schema_only_recovery
kafka topic name where database table change events will be pushed : {database.whitelist} + "." + {table.whitelist}
For kafka producer config use database.history.producer.*
// connector status
curl -X GET http://localhost:8083/connectors/my_connector_name/status
// delete connector
curl -X DELETE http://localhost:8083/connectors/my_connector_name
// pause connector
curl -X PUT http://localhost:8083/connectors/my_connector_name/pause
// resume connector
curl -X PUT http://localhost:8083/connectors/my_connector_name/resume
Reference links :
I hope you find this information very helpful. Post your comments for any improvements.
Nice. Very informative blog on kafka connect and debezium. Will wait for more blogs
ReplyDeleteChange data capture such an important concept, well explained
ReplyDeleteI can access my favorite channels on all my devices! Onqtv
ReplyDelete