Using HarperDB with NATS and Kafka for Change Data Capture
Change Data Capture (CDC) is a popular design pattern used to track changes in data from a source database and stream those changes to downstream processes. HarperDB, a clustering engine and custom functions database, can be used to implement CDC. In this tutorial, we’ll see how to utilize the internal NATS streaming service and the Fastify Kafka plugin to publish new records to Kafka.
HarperDB Setup To start, spin up HarperDB with custom functions enabled alongside Kafka and Zookeeper. Add the following contents to docker-compose.yml
:
version: "3.7"
services:
harperdb:
image: harperdb/hdb:v2.3.3
ports:
- "9925:9925"
- "9926:9926"
volumes:
- ./harperdb:/opt/hdb-data
- ./harperdb/custom_functions:/opt/hdb/custom_functions
environment:
- HDB_LICENSE_KEY=YOUR_LICENSE_KEY
- HDB_CUSTOM_FUNCTIONS_ENABLED=true
- HDB_KAFKA_ENABLED=true
- HDB_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- HDB_KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- HDB_NATS_ENABLED=true
- HDB_NATS_URL=nats://nats:4222
- HDB_NATS_CLUSTER_ID=test-cluster
- HDB_NATS_CLIENT_ID=test-client
depends_on:
- kafka
- zookeeper
- nats
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_CREATE_TOPICS=test:1:1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
For this example, we’ll store the database contents locally in ./harperdb
directory. Also, note that we are not specifying CLUSTERING_ENABLED=true
in docker-compose. This will break the initial startup, and we’ll configure via Harper Studio console.
Start up the services via docker-compose up -d
.
Now we need to connect our local instance to Harper Studio. Specify the username and password from the docker compose file as well as port and host.
After we log in, we can create a cluster user:
CREATE USER cluster_user WITH PASSWORD 'password';
GRANT CLUSTER_ADMIN TO cluster_user;
Finally, let’s create a schema and table. We’ll use our favorite dev
schema and dog
table respectively.
Custom Functions Setup HarperDB has an existing template utilizing the internal NATS stream and publishing to WebSockets: ​​https://github.com/HarperDB-Add-Ons/cf-template-websockets
We will modify this setup to publish to Kafka. But first, clone this repo into the custom_function
directory of your HarperDB instance.
cd harperdb/custom_functions
git clone https://github.com/HarperDB-Add-Ons/cf-template-websockets.git
To get this working, rename config.json.example
to config.json
and update our NATS user and pass to one we created via HarperDB Studio. Finally, run npm i
to install the dependencies.
NOTE: HarperDB Studio cannot parse file names with multiple “.” so it may say “File does not exist”. Simply rename the files if you want to see the file contents on the console.
Now restart HarperDB, and we can use the example client file (client.example.js
) to test the WebSocket connection.
Once we start this function, we should see the message “open!” and adding new records to our dog
table will print out the records.
Modifying to Publish to Kafka Instead of publishing messages back to the WebSocket client, let’s now publish JSON messages to Kafka. To do so, install the Fastify Kafka library: npm i fastify-kafkajs
.
Then we can import and register the Kafka client.
const fastify = require('fastify')({ logger: true })
const { Kafka } = require('kafkajs')
fastify.register(require('fastify-kafkajs'), {
clientId: 'my-app',
brokers: ['localhost:9092']
})
We can now simply modify the onPublishedMessage
function to publish to Kafka instead of writing back to the socket:
const kafka = fastify.kafka()
const producer = kafka.producer()
async function onPublishedMessage(message) {
const { operation, table, record } = message
const key = `${table}_${record.id}`
const value = JSON.stringify(record)
await producer.send({
topic: table,
messages: [{ key, value }],
})
}
Now restart the server and connect to our WebSocket client again. Publish another message to HarperDB, and we can check that it has been published to Kafka by sshing into the Kafka container and using the kafka-console-consumer
binary:
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dog --from-beginning
Wrapping Up In this tutorial, we saw how to use the internal NATS stream to listen to changes to data in HarperDB. We then created a Fastify route to subscribe to those tables and publish those new messages to WebSockets and Kafka. You can modify the onPublishedMessage
method to publish to multiple topics and also run this WebSocket client in the background to emulate a Debezium-like experience.
Get Started With HarperDB HarperDB is a powerful database that can be used for a variety of use cases. Whether you need to implement CDC or build a real-time application, HarperDB has the features you need. To learn more, check out the HarperDB documentation and start building today.