This project demonstrates how to build a robust real-time data pipeline that captures changes from PostgreSQL and streams them to Elasticsearch using Debezium, Kafka, and Go.
Architecture Overview
The application follows a Domain-Driven Design (DDD) architecture with clean separation of concerns:
PostgreSQL (Upsert) --> Debezium --> Kafka --> Kafka Connect --> Elasticsearch (GET)
▲ |
| v
Go App ------------------------------------------------------
Key Features
- Change Data Capture (CDC): Capture database changes in real-time using Debezium
- Event Streaming: Process and route events through Kafka
- Search and Analytics: Index data in Elasticsearch for powerful search capabilities
- RESTful API: Expose data through a clean API built with Fiber
- Domain-Driven Design: Well-structured codebase following DDD principles
- Repository Pattern: Clean separation between domain and data access layers
- Configuration Management: Flexible configuration using Viper
Technology Stack
- Go: Core application language (v1.24.1+)
- Fiber: Fast HTTP web framework
- GORM: Powerful ORM for database operations
- Viper: Configuration management
- PostgreSQL: Source database
- Debezium: Change Data Capture platform
- Kafka: Event streaming platform
- Elasticsearch: Search and analytics engine
- Docker: Containerization
Detailed Setup Guide
Prerequisites
- Go 1.24.1 or higher
- Docker and Docker Compose
- Git
Installation
-
Clone the repository:
git clone https://github.com/mehmetymw/debezium-postgres-es.git cd debezium-postgres-es
-
Install dependencies:
go mod tidy
-
Configure the application: Create a
config.yaml
file in the root directory or set environment variables.
Docker Compose Setup
Our docker-compose.yml
file sets up the entire infrastructure needed for the CDC pipeline:
version: '3'
services:
# PostgreSQL database with Debezium support
postgres:
image: debezium/postgres:13
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=inventory
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
# Zookeeper for Kafka coordination
zookeeper:
image: debezium/zookeeper:1.9
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
# Kafka message broker
kafka:
image: debezium/kafka:1.9
ports:
- "9092:9092"
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
# Kafka Connect with Debezium connectors
connect:
build:
context: .
dockerfile: Dockerfile
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=connect_configs
- OFFSET_STORAGE_TOPIC=connect_offsets
- STATUS_STORAGE_TOPIC=connect_statuses
depends_on:
- kafka
- postgres
# Elasticsearch for data indexing and search
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
ports:
- "9200:9200"
- "9300:9300"
environment:
- discovery.type=single-node
- xpack.security.enabled=false
# Our Go application
app:
build:
context: ./app
dockerfile: Dockerfile
ports:
- "8080:8080"
environment:
- POSTGRES_HOST=postgres
- POSTGRES_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=inventory
- ELASTICSEARCH_URL=http://elasticsearch:9200
depends_on:
- postgres
- elasticsearch
- connect
Connector Configuration
PostgreSQL Source Connector
The PostgreSQL source connector (postgres-source-all.json
) captures changes from our database:
{
"name": "postgres-source-all",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "dbserver1",
"table.include.list": "public.orders",
"plugin.name": "pgoutput",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms"
}
}
Elasticsearch Sink Connector
The Elasticsearch sink connector (elastic-sink-all.json
) streams the data to Elasticsearch:
{
"name": "elastic-sink-all",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.public.orders",
"connection.url": "http://elasticsearch:9200",
"transforms": "KeyToValue",
"transforms.KeyToValue.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.KeyToValue.fields": "id",
"key.ignore": "false",
"type.name": "_doc",
"behavior.on.null.values": "delete",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Automated Setup with steps.sh
I provide a convenient shell script (steps.sh
) that automates the entire setup process. Here’s what it does:
#!/bin/bash
# Step 1: Start all services with Docker Compose
docker-compose down && docker-compose up -d --build
# Step 2: Wait for services to start
sleep 10
# Step 3: Create orders table in PostgreSQL
docker exec -i debezium-postgres-1 psql -U postgres inventory -c "CREATE TABLE IF NOT EXISTS public.orders (
id VARCHAR(255) PRIMARY KEY,
orderId VARCHAR(255),
customerId VARCHAR(255),
status VARCHAR(50)
);"
# Step 4: Insert sample data
docker exec -i debezium-postgres-1 psql -U postgres inventory -c "
INSERT INTO public.orders (id, orderId, customerId, status) VALUES
('1', '101', '501', 'NEW'),
('2', '102', '502', 'PROCESSING'),
('3', '103', '503', 'COMPLETED'),
('4', '104', '504', 'SHIPPED'),
('5', '105', '505', 'DELIVERED'),
('6', '106', '506', 'CANCELLED'),
('7', '107', '507', 'RETURNED'),
('8', '108', '508', 'PENDING'),
('9', '109', '509', 'ON_HOLD'),
('10', '110', '510', 'BACKORDERED');"
# Step 5: Delete connectors if they exist
curl -X DELETE http://localhost:8083/connectors/postgres-source-all || echo "Connector not found"
curl -X DELETE http://localhost:8083/connectors/elastic-sink-all || echo "Connector not found"
# Step 6: Configure connectors
curl -X POST -H "Content-Type: application/json" --data @postgres-source-all.json http://localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @elastic-sink-all.json http://localhost:8083/connectors
# Step 7: Check Elasticsearch data
echo "Data in Elasticsearch after initial sync:"
curl -X GET "http://localhost:9200/dbserver1.public.orders/_search?pretty"
# Step 8: Check connector status
echo "PostgreSQL connector status:"
curl -X GET http://localhost:8083/connectors/postgres-source-all/status
echo "Elasticsearch connector status:"
curl -X GET http://localhost:8083/connectors/elastic-sink-all/status
How to Use steps.sh
-
Make the script executable:
chmod +x steps.sh
-
Run the script:
./steps.sh
The script performs the following actions:
- Starts all services defined in docker-compose.yml
- Creates the orders table in PostgreSQL
- Inserts sample data into the orders table
- Configures the Debezium PostgreSQL source connector
- Configures the Elasticsearch sink connector
- Verifies the data in Elasticsearch
- Checks the status of both connectors
Running the Application
Using Go
go run main.go
Using Docker
docker-compose up -d
Testing the Pipeline
Once the pipeline is running, you can test it by making changes to the PostgreSQL database and observing how they propagate to Elasticsearch:
- Insert a new order:
docker exec -i debezium-postgres-1 psql -U postgres inventory -c "
INSERT INTO public.orders (id, orderId, customerId, status) VALUES ('11', '111', '511', 'NEW');"
- Update an existing order:
docker exec -i debezium-postgres-1 psql -U postgres inventory -c "
UPDATE public.orders SET status = 'SHIPPED' WHERE id = '2';"
- Delete an order:
docker exec -i debezium-postgres-1 psql -U postgres inventory -c "
DELETE FROM public.orders WHERE id = '10';"
- Check Elasticsearch to see the changes:
curl -X GET "http://localhost:9200/dbserver1.public.orders/_search?pretty"
API Endpoints
The application exposes the following RESTful API endpoints:
GET /api/orders
- Get all ordersGET /api/orders/:id
- Get a specific orderPOST /api/orders
- Create a new orderPUT /api/orders/:id
- Update an existing orderDELETE /api/orders/:id
- Delete an orderGET /api/orders/status/:status
- Get orders by statusGET /health
- Health check endpoint
Project Structure
The project follows a clean architecture with DDD principles:
app/
├── domain/ # Domain layer (entities, repositories interfaces)
│ ├── entity/ # Domain entities
│ └── repository/ # Repository interfaces
├── application/ # Application layer (services, use cases)
│ └── service/ # Application services
├── infrastructure/ # Infrastructure layer (DB, external services)
│ └── persistence/ # Database related code
│ ├── models/ # Database models
│ ├── repository/ # Repository implementations
│ └── migrations/ # Database migrations
├── interfaces/ # Interface layer (API, CLI)
│ └── api/ # API related code
│ ├── handlers/ # HTTP handlers
│ └── routes/ # Route definitions
├── config/ # Configuration
├── main.go # Application entry point
└── docker-compose.yml # Docker Compose configuration
How It Works
- Change Capture: Debezium monitors PostgreSQL’s transaction log for changes
- Event Publishing: Changes are published to Kafka topics
- Event Consumption: The application consumes events from Kafka
- Data Indexing: Events are transformed and indexed in Elasticsearch
- Data Access: The API provides access to the data from both PostgreSQL and Elasticsearch
Troubleshooting
Common Issues
-
Connectors not starting: Check Kafka Connect logs:
docker logs debezium-connect-1
-
Data not appearing in Elasticsearch: Verify the connector status:
curl -X GET "http://localhost:8083/connectors/elastic-sink-all/status"
-
PostgreSQL connection issues: Ensure PostgreSQL is running:
docker ps | grep postgres
-
Elasticsearch not indexing data: Check Elasticsearch logs:
docker logs debezium-elasticsearch-1
Configuration
The application uses Viper for configuration management. You can configure it using:
- Environment variables
- Configuration files (YAML, JSON, etc.)
- Command-line flags
Example configuration:
postgres:
host: localhost
port: 5432
user: postgres
password: postgres
dbname: inventory
elasticsearch:
url: http://localhost:9200
username: ""
password: ""
server:
port: 8080
Source Code
The complete source code for this project is available on GitHub:
github.com/Mehmetymw/debezium-postgres-es
Feel free to:
- Star the repository if you find it useful
- Open issues for any bugs or feature requests
- Submit pull requests to contribute improvements
- Fork it to use as a starting point for your own projects
The repository includes:
- Detailed setup instructions
- Docker Compose configuration
- Example configurations
- API documentation
- Common troubleshooting tips