Debezium を使用して MySQL から Elasticsearch にデータ変更をストリーミングする方法
リズキ・ヌグロホ · 従う
に発表されました · 10 分で読めます · 2020年9月4日
現在、電子カタログや電子商取引などを構築するには、高速検索機能、リアルタイムの統合データが不可欠です。ユーザーがポータルから情報を取得するためだけに多くの時間を費やしているために怒られることは望ましくありません。また、製品チームがさまざまなアプリケーションに入力した製品情報を、ユーザーや顧客がすぐに検索できるようにしたいと考えています。
たとえば、製品チームが MySQL をメイン データ ソースとして使用しているとします。そして、ポータルの検索エンジン サービスとして ElasticSearch を使用します。 MySQL の変更はすべて、ElasticSearch Index に即座に影響する必要があります。その要件を達成するにはどうすればよいでしょうか?
この最初の記事では、上記のユースケース要件を達成するために、Debezium、Kafka、および Confluent JDBC シンク コネクタを使用して MySQL のデータ変更を ElasticSearch にストリーミングする方法を説明します。
デベジウム
Debezium は、既存のデータベースをイベント ストリームに変換する分散プラットフォームであるため、アプリケーションはデータベース内の各行レベルの変更に迅速に対応できます。 Debezium は Kafka 上に構築されており、特定のデータベース管理システムを監視する Kafka Connect 互換コネクタを提供します。 Debezium は、データ変更の履歴を Kafka ログに記録するため、アプリケーションをいつでも停止および再起動でき、実行していない間に見逃したすべてのイベントを簡単に消費して、すべてのイベントが正しく完全に処理されることを保証します。
Debezium はオープンソースです。Apache ライセンス、バージョン 2.0
カフカ
Apache Kafka は、オープンソース ストリーム処理が開発したソフトウェアプラットフォームリンクトインに寄付しましたApache ソフトウェア財団、 で書かれているスカラ座とジャワ。このプロジェクトは、リアルタイム データ フィードを処理するための、高スループット、低遅延の統合プラットフォームを提供することを目的としています。
Confluent JDBC シンク コネクタ
Kafka Connect Elasticsearch シンク コネクタを使用すると、Apache Kafka® から Elasticsearch にデータを移動できます。 Apache Kafka® のトピックからトピックにデータを書き込みます。索引Elasticsearch では、トピックのすべてのデータは同じです
このチュートリアルでは、サービスごとに個別のコンテナを使用し、永続ボリュームは使用しません。 ZooKeeper と Kafka は通常、データをコンテナー内にローカルに保存するため、ホスト マシンにディレクトリをボリュームとしてマウントする必要があります。したがって、このチュートリアルでは、コンテナーが停止されると、永続化されたデータはすべて失われます。
ステップ 1 Zookeeper を実行する
debezium/zookeeper イメージを使用してコンテナ内でzookeeperを起動します。コンテナは次の名前で実行されます。動物園の番人
> docker run -it — namezookeeperdbz -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1
実行ログをチェックして、Zookeeper が正常に実行され、ポートでリッスンしていることを確認します。2181
スタンドアロンモードで起動するZooKeeper JMX がデフォルトで有効化構成の使用: /zookeeper/conf/zoo.cfg
。
。
。
020-05-13 17:18:28,564 - 情報 [main:NIOServerCnxnFactory@686] - ポート 0.0.0.0/0.0.0.0:2181 へのバインド
ステップ 2 Kafka を実行する
debezium/kafka docker イメージを使用してコンテナーで Kafka を開始します。コンテナは次の名前で実行されます。カフカブズ
> docker run -it — name kafkadbz -p 9092:9092 --linkzookeeperdbz:zookeeperdbz debezium/kafka
Kafkaサーバーが起動したことを確認する
ステップ 3 MySQL を実行する
このデモでは、Debezium が提供するサンプル データも含まれる事前設定された Docker イメージを使用します。
debezium/example-mysql イメージを使用してコンテナ内で MySQL を起動します。コンテナは名前で実行されますmysqldbz
> docker run -it -d --name mysqldbz -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql
上記のコマンドは、という名前のコンテナを作成します。mysqldbz。
次に、コンテナを実行して、コンテナ上で対話型の bash シェルに入りましょう。
> docker exec -it mysqldbz /bin/bash
MySQL で CDC のキャプチャを使用するには、Debezium で MySQL を有効にした bin_log 設定が必要です。 Debezium のおかげで、事前構成された MySQL Docker イメージを使用しているため、それを構成する必要がなくなりました。構成を確認してみましょう。
# もっと /etc/mysql/conf.d/mysql.cnf
ご覧のとおり、bin_log は有効になっていますが、デフォルトでは無効になっています。
サンプルDBを確認する
# mysql -u root -p
パスワードを入力してください: <パスワードを入力してください>mysql> インベントリを使用する
データベースが変更されましたmysql> テーブルを表示;
+---------------------+
|在庫内のテーブル |
+---------------------+
|住所 |
|顧客 |
|ジオム |
|注文 |
|製品 |
|手持ちの製品 |
+---------------------+6 行セット (0.00 秒)mysql> select * from顧客;+------+-----------+-----------+------ ------+
| ID |名 |姓 |メール |
+------+-----------+-----------+------ ------+
| 1001 |サリー |トーマス | sally.thomas@acme.com |
| 1002 |ジョージ |ベイリー | gbailey@foobar.com |
| 1003 |エドワード |ウォーカー | ed@walker.com |
| 1004 |アン |クレッチマー | annek@noanswer.org |
+------+-----------+-----------+------ ------+4 行セット (0.00 秒)
ステップ 4 Elastic Search サービスを開始する
ここでは、単一ノードのエラスティックとエラスティック バージョン 7.7 を使用します。コンテナは次の名前で実行されます。弾性dbz。
> docker run -it --name elasticdbz -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.7.0
ステップ 5 Debezium Kafka Connect サービスを開始します
このサービスは、Debezium MySQL コネクタを管理するための REST API を公開します。コンテナは次の名前で実行されます。コネクトデータベース。
> docker run -it --name connectdbz -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses--linkzookeeperdbz:zookeeperdbz --linkkafkadbz:kafkadbz --linkmysqldbz:mysqldbz --link elasticdbz:elasticdbzデベジウム/接続
このコンテナをkafkadbzにリンクすることを忘れないでください、このサービスは kafkadbz と通信する必要があるため、zookeeperdbz、elasticdbz、Zookeeperdbz、elasticdbz サービス。
CURL を使用して Debezium Kafka Connect Service のステータスを確認します。応答から、バージョン 2.4.0 を使用していることがわかります。
>curl -H "Accept:application/json" localhost:8083/
{"バージョン":"2.4.0","コミット":"77a89fcf8d7fa018","kafka_cluster_id":"XcbUOTN_TNG4hCftkY_j3w"}
CDCを監視するためのMySQLコネクタをインベントリDBに登録してみましょう
>curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"名前": "インベントリコネクタ",
"構成": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"データベース.ホスト名": "mysqldbz"、
"データベース.ポート": "3306",
"database.user": "debezium",
"データベース.パスワード": "dbz",
"データベース.サーバーID": "184054",
"データベース.サーバー名": "dbserver1",
"database.whitelist": "インベントリ",
"database.history.kafka.bootstrap.servers": "kafkadbz:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
コネクタがコネクタのリストに登録されていることを確認します
>curl -H "Accept:application/json" localhost:8083/connectors/
["インベントリコネクタ"]
これで、inventory-connector がコネクタのリストに登録されました
ステップ 6 Kafka コンソール コンシューマを起動して DB の変更を監視する
このステップは、DB 上の変更を監視するための単なる例です。トピックを使用したい場合は、Kafka Consumer を作成する必要があります。
Debezium MySQL コネクタをデプロイした後、監視を開始します在庫
データ変更イベント用のデータベース。
を視聴するにはdbserver1.inventory.customers
このトピックでは、Kafka コンソール コンシューマーを開始する必要があります。コンテナは次の名前で実行されます。監視者。
> docker run -it --rm --name watcher --linkzookeeperdbz:zookeeperdbz --link kafkadbz:kafkadbz debezium/kafka watch-topic -a -k dbserver1.inventory.customers
ウォッチャーを実行すると、Debezium がインベントリ データベースの監視を開始し、結果を次のように出力することがわかります。dbserver1.inventory.customers
トピック。
"ペイロード":{「前」:null、
"後":{"id":1004、
"first_name":"アン",
"last_name":"クレッチマー",
"電子メール":"annek@noanswer.org"
}、
"ソース":{"バージョン":"1.1.1.最終版",
"コネクタ":"mysql",
"名前":"データベースサーバー1",
"ts_ms":0、
"スナップショット":"true",
"db":"インベントリ",
"テーブル":"顧客",
"サーバーID":0、
"gtid":null、
"ファイル":"mysql-bin.000003",
「位置」:154、
「行」:0、
「スレッド」:null、
「クエリ」:null
}、
"で":"c"、
"ts_ms":1589504913171、
「トランザクション」:null}
テーブルinventory.customersと比較してみましょう
mysql> select * from顧客;+------+-----------+-----------+------ ------+
| ID |名 |姓 |メール |
+------+-----------+-----------+------ ------+
| 1001 |サリー |トーマス | sally.thomas@acme.com |
| 1002 |ジョージ |ベイリー | gbailey@foobar.com |
| 1003 |エドワード |ウォーカー | ed@walker.com |
|1004 |アン |クレッチマー | annek@noanswer.org|
+------+-----------+-----------+------ ------+
これは、顧客インベントリーテーブルのレコードと一致する Kafka トピックの最後のイベントのように見えます。
customer テーブルを更新してみましょう。
mysql > UPDATE `inventory`.`customers` SET `last_name` = 'Kretchmar クレッチマー' WHERE `id` = 1004;
そして、これがウォッチャーでの結果です
...
"ペイロード":{
"前":{
"id":1004、
"first_name":"アン",
"last_name":"クレッチマー",
"電子メール":"annek@noanswer.org"
}、
"後":{
"id":1004、
"first_name":"アン",
"last_name":"クレッチマー クレッチマー",
"電子メール":"annek@noanswer.org"
}、
...
このステップまでに何を達成したか?
このステップまでで、MySQL-Debezium-Kafka の統合が完了しました。MySQL に新しいデータまたは変更されたデータがあるときに、Kafka のトピックからストリーミング データを取得します。
次は何ですか?
Elastic Search と統合するには、Kafka Connect Elastic Sink Connector を Debezium Kafka 接続コンテナーにインストールする必要があります。
ステップ 7 Kafka Connect Elastic Sink コネクタをダウンロードするhttps://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
ステップ 8 ダウンロードした zip ファイルを解凍します。
ステップ 9 lib フォルダーの名前を kafka-connect-jdbc に変更します
ステップ 10 kafka-connect-jdbc を kafka-connect のコンテナである debezium にコピーします
> ドッカー cp /ファイルへのパス/confluentinc-kafka-connect-elasticsearch-5.5.0/kafka-connect-jdbc/* connectdbz:/kafka/connect/
ステップ 11 すべての依存関係がコピーされていることを確認します
> docker exec -it connectdbz /bin/bash
$ cd 接続/kafka-connect-jdbc/
$ ls -すべて
ステップ 12 Debezium Kafka Connect コンテナを再起動します
Kafka Connect が新しくインストールされたコネクタ プラグインを検出できるようにするには、Kafka Connect サービスを再起動する必要があります
> docker stop connectdbz
> docker start connectdbz
ステップ 13 ElasticsearchSinkConnector を登録する
>curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"名前": "弾性シンク",
"構成": {
"コネクタ.クラス":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"トピック": "dbserver1.inventory.customers"、
"接続.url": "http://弾性dbz:9200",
"変換": "ラップ解除、キー",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "顧客"
}
}'
ElasticsearchSinkConnector コネクタがコネクタのリストに登録されていることを確認します
>curl -H "Accept:application/json" localhost:8083/connectors/
["elastic-sink","inventory-connector"]
ステップ 14 MySQL ElasticSearch の同期を確認する
データベースと検索サーバーが同期されているかどうかを確認してみましょう。
> カール 'http://ローカルホスト:9200/dbserver1.inventory.customers/_検索?かなり'{
「取った」:12、
"タイムアウト" : false、
"_shards" : {
「合計」: 1、
「成功」: 1、
「スキップ」: 0、
「失敗」:0
}、
「ヒット」: {
"合計" : {
「値」: 4、
"関係" : "等価"
}、
「最大スコア」: 1.0、
「ヒット」:[
{
"_index" : "dbserver1.inventory.customers",
"_type" : "顧客",
"_id" : "1001",
"_スコア" : 1.0、
"_ソース" : {
「id」:1001、
"first_name" : "サリー",
"last_name" : "トーマス",
"メール" : "sally.thomas@acme.com"
}
}、
{
"_index" : "dbserver1.inventory.customers",
"_type" : "顧客",
"_id" : "1004",
"_スコア" : 1.0、
"_ソース" : {
「id」:1004、
"first_name" : "アン",
"last_name" : "クレッチマー・クレッチミー",
"メール" : "annek@noanswer.org"
}
}、
{
"_index" : "dbserver1.inventory.customers",
"_type" : "顧客",
"_id" : "1002",
"_スコア" : 1.0、
"_ソース" : {
「id」:1002、
"first_name" : "ジョージ",
"last_name" : "ベイリー",
"メール" : "gbailey@foobar.com"
}
}、
{
"_index" : "dbserver1.inventory.customers",
"_type" : "顧客",
"_id" : "1003",
"_スコア" : 1.0、
"_ソース" : {
「id」:1003、
"first_name" : "エドワード",
"last_name" : "ウォーカー",
「メールアドレス」:「ed@walker.com」
}
}
]
}
}
ご覧のとおり、これで MySQL 内のすべてのデータが同期されました。 MySQL のすべてのデータは、上記のエラスティック インデックスで見つけることができます。
新しいデータを Customers テーブルに挿入して、エラスティック インデックスで何が起こるかを見てみましょう。
mysql> 顧客の値に挿入 (デフォルト、'Rizqi'、'Nugrohon'、'rizqi.nugroho@example.com');
クエリは OK、1 行が影響を受ける (0.05 秒)mysql> select * from顧客;
+------+-----------+--------+-------- ---------------+| ID |名 |姓 |メール |+------+-----------+----------------------+------ ------------------+|| 1001 |サリー |トーマス | sally.thomas@acme.com |
| 1002 |ジョージ |ベイリー | gbailey@foobar.com |
| 1003 |エドワード |ウォーカー | ed@walker.com |
| 1004 |アンナ |クレッチマーannek@noanswer.org |
| 1005 |リズキ |ヌグロホン | rizqi.nugroho@example.com |+------+------------+---------+ ------------------------+
弾性指数をチェックする
> カール 'http://ローカルホスト:9200/dbserver1.inventory.customers/_検索?かなり'{
「取った」:1476年、
"タイムアウト" : false、
"_shards" : {
「合計」: 1、
「成功」: 1、
「スキップ」: 0、
「失敗」:0
}、
「ヒット」: {
"合計" : {
「値」: 5、
"関係" : "等価"
}、
「最大スコア」: 1.0、
「ヒット」:[
{
"_index" : "dbserver1.inventory.customers",
"_type" : "顧客",
"_id" : "1001",
"_スコア" : 1.0、
"_ソース" : {
「id」:1001、
"first_name" : "サリー",
"last_name" : "トーマス",
"メール" : "sally.thomas@acme.com"
}
}、
...
{
"_index" : "dbserver1.inventory.customers",
"_type" : "顧客",
"_id" : "1005",
"_スコア" : 1.0、
"_ソース" : {
「id」:1005、
"first_name" : "リズキ",
"last_name" : "ヌグロホン",
"メール" : "rizqi.nugroho@example.com"
}
}
]
}
}
Viola first_name Rizqi の新しいデータが挿入されます
UPDATE文はどうでしょうか
mysql> UPDATE `inventory`.`customers` SET `last_name` = 'Adhi Nugroho' WHERE `id` = 1005;
クエリは OK、1 行が影響を受ける (0.05 秒)mysql> select * from顧客;
+------+-----------+--------+-------- ---------------+| ID |名 |姓 |メール |+------+-----------+----------------------+------ ------------------+|| 1001 |サリー |トーマス | sally.thomas@acme.com |
| 1002 |ジョージ |ベイリー | gbailey@foobar.com |
| 1003 |エドワード |ウォーカー | ed@walker.com |
| 1004 |アンナ |クレッチマーannek@noanswer.org |
| 1005 |リズキ |アディ・ヌグロホ | rizqi.nugroho@example.com |+------+------------+---------+ ------------------------+
もう一度弾性指数を確認してください
> カール 'http://ローカルホスト:9200/dbserver1.inventory.customers/_検索?かなり'...
{
"_index" : "dbserver1.inventory.customers",
"_type" : "顧客",
"_id" : "1005",
"_スコア" : 1.0、
"_ソース" : {
「id」:1005、
"first_name" : "リズキ",
"last_name" : "アディ ヌグロホ",
"メール" : "rizqi.nugroho@example.com"
}
}
]
}
}
マンマ・ミーア、データ更新されました!!!
最後に、Debezium を使用して MySQL と ElasticSearch を統合しました。このデモが、MySQL DB と ElasticSearch の間のデータ遅延の問題の解決に役立つことを願っています。これで、MySQL DB で変更された内容はすべて、Elastic Index に即座に影響されるようになります。 PostgreSQL、Oracle、DB2、MSSQL などの別の DB を使用してみることができます。
参照 :
https://medium.com/@erfin.feluzy/tutorial-streaming-cdc-mysql-ke-kafka-dengan-debezium-3a1ec9150cf8
https://debezium.io/documentation/reference/1.1/tutorial.html
https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/index.html