Post

Docker Kafka & Debezium

Rodando Kafka com Debezium

Muitas vezes, estamos lidando com um legado e, de alguma forma, precisamos escutar ou saber das operações que estão ocorrendo em determinadas tabelas. Uma forma de fazer isso é trabalhar com o Debezium, que irá ler todos os logs que ocorrerem em seu banco de dados, claro, apenas nas tabelas listadas em sua configuração.

Como subir o Kafka com Debezium localmente?

Para configurar toda a infraestrutura necessária, vamos criar um docker-compose.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
version: "3.7"

services:
  mysql-kafka:
    image: mysql:5.7
    environment:
      - MYSQL_DATABASE=kafka-test
      - MYSQL_ROOT_PASSWORD=root
    ports:
      - 3306:3306
    volumes:
      - .data/mysql:/var/lib/mysql
    networks:
      - kafka-test-network

  connect:
    image: debezium/connect:latest
    depends_on:
      - kafka
      - mysql-kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    ports:
      - 8083:8083
      - 5005:5005
    networks:
      - kafka-test-network

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-test-network
    deploy:
      resources:
        limits:
          cpus: "0.8"
          memory: 800M
        reservations:
          memory: 100M

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
    networks:
      - kafka-test-network
    deploy:
      resources:
        limits:
          cpus: "0.8"
          memory: 800M
        reservations:
          memory: 100M

networks:
  kafka-test-network:
    driver: bridge
    name: kafka-test-network

A primeira imagem que precisamos é do próprio MySQL. Nele, vamos criar um banco para posteriormente listar os logs.

Em seguida, iniciamos a imagem do Debezium e adicionamos as dependências do Kafka e do Zookeeper.

Obs.: Este docker-compose é apenas um exemplo. O recomendado é usar um arquivo .env para adicionar os parâmetros e configurações de nossas imagens.

Subindo o kafka

Agora que já preparamos nossas imagens, vamos subir o Kafka. Para isso:

1
2
3
$docker-compose up

A partir desse momento, se tudo der certo, teremos toda a nossa infraestrutura rodando localmente para testarmos.

Para ver o estado dos containers ativos, você pode executar:

1
docker ps

Se tudo ocorreu bem, vamos ver uma saída parecida com esta em nosso terminal:

image

Script SQL

Agora vamos criar uma tabela no banco para posteriormente inserirmos dados e ler como tópicos de criação no Debezium.

1
2
3
4
5
6
7
8
9
use `kafka-test`;
drop table if exists users;
CREATE table users(
    id int not null auto_increment,
    name varchar(50) not null,
    age int default null,
    surname varchar(40) default null,
    primary key (id)
);

Connector

O Debezium possui sua própria API REST e diversos plugins. O que vamos precisar para ele começar a ler as alterações dessa tabela é:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
curl --location 'localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
    "name": "database-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql-kafka",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "root",
        "database.server.id": "184054",
        "topic.prefix": "connector",
        "database.include.list": "kafka-test.users",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}'

Nesse ponto perceba que adicionamos como connector o prefixo de nossos tópicos gerados pelo debezium, outro ponto é que apontamos todas as configurações de conexão com banco e kafka diretamente pela rede interna que criamos no nosso docker-compose.

Inserindo dados

Agora que temos nosso plugin ativo, podemos adicionar dados no banco. Esses dados vão produzir tópicos que vamos poder consumir e posteriormente gerar eventos ou mesmo trabalhar com streams.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
INSERT INTO users (name, age, surname) VALUES
('Alice', 25, 'Smith'),
('Bob', 30, 'Johnson'),
('Charlie', 35, 'Williams'),
('David', 40, 'Jones'),
('Emma', 22, 'Brown'),
('Frank', 28, 'Davis'),
('Grace', 33, 'Miller'),
('Henry', 45, 'Wilson'),
('Isabella', 27, 'Moore'),
('Jack', 32, 'Taylor'),
('Kate', 29, 'Anderson'),
('Liam', 38, 'Thomas'),
('Mia', 26, 'Jackson'),
('Noah', 31, 'White'),
('Olivia', 34, 'Harris'),
('Patrick', 37, 'Clark'),
('Quinn', 41, 'Allen'),
('Riley', 36, 'Scott'),
('Sophia', 39, 'Adams'),
('Thomas', 42, 'Lee'),
('Uma', 23, 'Parker'),
('Vincent', 44, 'Robinson'),
('Willow', 43, 'Nelson'),
('Xander', 24, 'Evans'),
('Yasmine', 21, 'Murphy'),
('Zoe', 46, 'Carter'),
('Aaron', 20, 'Baker'),
('Bella', 47, 'Wright'),
('Caleb', 19, 'Garcia'),
('Diana', 48, 'King'),
('Ethan', 50, 'Hall'),
('Fiona', 49, 'Adams'),
('Gabriel', 51, 'Mitchell'),
('Hannah', 52, 'Thompson'),
('Ian', 53, 'Green'),
('Julia', 54, 'Phillips'),
('Kevin', 55, 'Cook'),
('Laura', 56, 'Rivera'),
('Michael', 57, 'Perez'),
('Nora', 58, 'Torres'),
('Oscar', 59, 'Hill'),
('Penelope', 60, 'Young'),
('Quentin', 61, 'Gonzalez'),
('Rachel', 62, 'Hughes'),
('Samuel', 63, 'Turner'),
('Tiffany', 64, 'Parker'),
('Ulysses', 65, 'Ward'),
('Victoria', 66, 'Foster'),
('Walter', 67, 'Stewart'),
('Xavier', 68, 'Wells'),
('Yolanda', 69, 'Bennett'),
('Zachary', 70, 'Gray'),
('Arthur', 71, 'Sullivan'),
('Beatrice', 72, 'Cole'),
('Christopher', 73, 'Morris'),
('Danielle', 74, 'Gutierrez'),
('Edward', 75, 'Perry'),
('Felicity', 76, 'Barnes'),
('George', 77, 'Knight'),
('Holly', 78, 'Bryant'),
('Ivan', 79, 'Reed'),
('Jessica', 80, 'Banks'),
('Kenneth', 81, 'Lopez'),
('Lily', 82, 'Harrison'),
('Matthew', 83, 'Gomez'),
('Natalie', 84, 'Fisher'),
('Oliver', 85, 'Murray'),
('Patricia', 86, 'Marshall'),
('Quincy', 87, 'Dean'),
('Rebecca', 88, 'Gilbert'),
('Simon', 89, 'Wagner'),
('Tara', 90, 'Dunn'),
('Ursula', 91, 'Mcdonald'),
('Victor', 92, 'Pierce'),
('Wendy', 93, 'Gordon'),
('Xena', 94, 'Holmes'),
('Yvonne', 95, 'Henry'),
('Zelda', 96, 'Hunt'),
('Andrew', 97, 'Rose'),
('Barbara', 98, 'Owens'),
('Carl', 99, 'Hudson'),
('Daisy', 100, 'Bishop');

E assim concluímos o processo de configuração do Kafka com Debezium, estabelecendo uma estrutura local para monitoramento de operações em bancos de dados legados. Com este ambiente configurado, podemos gerar eventos a partir das alterações realizadas nas tabelas do banco de dados, facilitando a integração e o processamento de dados em tempo real. Se precisar de mais alguma informação ou tiver alguma dúvida, estou à disposição para ajudar!

This post is licensed under CC BY 4.0 by the author.