Setup a JDBC Source from PostgreSQL
JDBC source connector is useful to push data from a relational database such as PostgreSQL to Kafka. Data in Kafka can be consumed, transformed and consumed any number of times in interesting ways. This help article illustrates steps to setup JDBC source connector with PostgreSQL database.
Kafka Connect JDBC source settings
Here we illustrate steps required to setup JDBC source connector with Aiven for PostgreSQL service. Before getting started with the steps, the following information regarding the Kafka and PostgreSQL services need to be collected.
Following Aiven for Kafka service details are required (from the Aiven console):
KAFKA_HOST
KAFKA_PORT
KAFKA_CONNECT_SERVICE_URI
Following PostgreSQL service details are required (from the Aiven console):
PG_INSTANCE_NAME
PG_SERVICE_URI
PG_HOST
PG_PORT
PG_USER
PG_PW
PG_DEFAULT_DB
The Kafka Connect JDBC source settings can be done via Aiven web UI or via JSON file. This article will cover the JSON example. Aiven’s web UI allows the copy/paste of JSON files and populates the UI forms appropriately once parsed the JSON file. The Kafka Connect JDBC source settings for a PostgreSQL database are the following
{
"name": "pg-bulk-source",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<PG_HOST>:<PG_PORT>/<PG_DEFAULT_DB>?sslmode=require",
"connection.user": "<PG_USER>",
"connection.password": "<PG_PASSWORD>",
"table.whitelist": "<PREFIX>",
"mode": "bulk|incrementing|timestamp",
"poll.interval.ms": "2000",
"topic.prefix": "<PREFIX>"
}
Parameters Description
name: name of the connector, must be unique per Kafka instanceconnection.url: is the JDBC url to postgreSQL, hostname, port and database name are the ones noted above or can be extracted with the following Aiven client command
avn service get $POSTGRES_NAME --project=$PROJECT_NAME --format '{service_uri_params}'
connection.userandconnection.password: refer to the user credentials needed to connect to postgreSQL. If using the defaultavnadminuser, the credentials can be found with the same command used abovetable.whitelist: name(s) of the tables to ingestpoll.interval.ms: interval in ms between database pollstopic.prefix: Kafka topic prefix. Kafka connect with create a topic for each table mentioned in thetable.whitelistparameter with name<topic.prefix><table_name>
A specific mention needs to go for the mode parameter.
The Kafka Connect JDBC source allows 4 main modes:
bulk: the connector will query the full table every time retrieving all rows and publishing them to Kafkaincrementing: the connector will query the table appending aWHEREcondition based on a Incrementing Column. This requires the presence of a column containing an always incremental number (like a series). This column will be used to check which rows have been added since last query, the column name will be passed via theincrementing.column.nameparametertimestamp: the connector will query the table appending aWHEREcondition based on a Timestamp Column(s). This requires the presence of one or more columns containing the row’s creation date and modification date. In cases of two columns (creation_dateandmodification_date) the polling query will apply theCOALESCENCEfunction, take the value of the second column only when the first column is null. The timestamp column(s) can be passed via thetimestamp.column.nameparametertimestamp+incrementing: this last mode uses both theincrementingandtimestampfunctionalities withincrementing.column.nameselecting the incremental column andtimestamp.column.namethe timestamp one(s).
Scenario 1: Bulk Load
Let’s start with the simple use case, a minimal table, to show the basic Kafka Connect JDBC source functionality.
Let’s Login to PostgreSQL
avn service cli $PG_INSTANCE_NAME
And create a simple accounts table containing username and email columns.
CREATE TABLE accounts (
username VARCHAR ( 50 ) PRIMARY KEY,
email VARCHAR ( 255 ) UNIQUE NOT NULL
);
Now let’s add some data
insert into accounts (username, email) values ('francesco','fra@fakecompany.com');
insert into accounts (username, email) values ('ugo','ugo@fakecompany.com');
insert into accounts (username, email) values ('vincenzo','vincenzo@fakecompany.com');
and verify it’s there
<DATABASE>=> select * from accounts;
username | email
-----------+--------------------------
francesco | fra@fakecompany.com
ugo | ugo@fakecompany.com
vincenzo | vincenzo@fakecompany.com
(3 rows)
Now it’s time to create the first Kafka Connect JDBC source. To do so we need to prepare a JSON file, named kafka_jdbc_simple_source.json, containing the following properties
{
"name": "pg-bulk-source",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
"connection.user": "<PG_USER>",
"connection.password": "<PG_PASSWORD>",
"table.whitelist": "accounts",
"mode": "bulk",
"poll.interval.ms": "2000",
"topic.prefix": "pg_source_"
}
Please check the table.whitelist setting pointing to the accounts table just created and the mode equal to bulk.
We can check out the output with [Kafkacat](help article, after downloading the appropriate SSL certificate files and setup the kafkacat.config configuration file with
kafkacat -F kafkacat.config -C -t pg_source_accounts
The output will be
{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 3
{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 6
{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 9
As we can notice, the Kafka Connect JDBC connector, is extracting the whole accounts table every 2 seconds (as per our poll.interval.ms) definition.
If now we add a new row to accounts from the postgreSQL client
insert into accounts (username, email) values ('laura','laura@fakecompany.com');
we can see the new row appearing in our topic at the following poll
{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
{"username":"laura","email":"laura@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 10
Scenario 2: Incremental Column
Obviously querying and extracting the whole table at every poll is not ideal. Especially in cases where the table size is relevant, this can quickly become a bottleneck of any data pipeline solution. But, as mentioned before, we can use one of the incremental modes to solve the issue.
Let’s create a new accounts_serial table with an always incrementing SERIAL column. The following command can be used in the postgreSQL client
CREATE TABLE accounts_serial (
user_id serial PRIMARY KEY,
username VARCHAR ( 50 ) UNIQUE NOT NULL,
email VARCHAR ( 255 ) UNIQUE NOT NULL
)
;
And insert the same rows as before
insert into accounts_serial (username, email) values ('francesco','fra@fakecompany.com');
insert into accounts_serial (username, email) values ('ugo','ugo@fakecompany.com');
insert into accounts_serial (username, email) values ('vincenzo','vincenzo@fakecompany.com');
We can verify the correctness of the SERIAL column with
<DATABASE>=> select * from accounts_serial;
user_id | username | email
---------+-----------+--------------------------
1 | francesco | fra@fakecompany.com
2 | ugo | ugo@fakecompany.com
3 | vincenzo | vincenzo@fakecompany.com
(3 rows)
We can now start our incremental Kafka Connect JDBC source by copying the kafka_jdbc_simple_source.json file into a new kafka_jdbc_incremental_source.json and amending the name, mode and table.whitelist plus setting the newly created user_id column as incrementing.column.name entry. The new file will look like the following
{
"name": "pg-incremental-source",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
"connection.user": "<PG_USER>",
"connection.password": "<PG_PASSWORD>",
"table.whitelist": "accounts_serial",
"mode": "incrementing",
"incrementing.column.name":"user_id",
"poll.interval.ms": "2000",
"topic.prefix": "pg_source_"
}
Again, we can start the connect with
avn service connector create $KAFKA_NAME @kafka_jdbc_incremental_source.json --project $PROJECT_NAME
and read the newly created topic with Kafkacat
kafkacat -F kafkacerts/kafkacat.config -C -t pg_source_accounts_serial
as expected the topic will contains only the 3 records
{"user_id":1,"username":"francesco","email":"fra@fakecompany.com"}
{"user_id":2,"username":"ugo","email":"ugo@fakecompany.com"}
{"user_id":3,"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts_serial [0] at offset 3
If we now insert the additional 4th record within the postgreSQL client
insert into accounts_serial (username, email) values ('laura','laura@fakecompany.com');
Kafkacat’s output will be
{"user_id":4,"username":"laura","email":"laura@fakecompany.com"}
% Reached end of topic pg_source_accounts_serial [0] at offset 4
Showing only the additional row as expected
Now, what does it happen if I update a row?
update accounts_serial set email='francesco@fakecompany.com' where user_id=1;
Well, the update will NOT be pushed to Kafka, since the user_id column wasn’t updated and all the records with user_id <= 4 were already parsed. This can represent a problem if we’re keen on retrieving not only the new records but also the changed ones.
Scenario 3: Timestamp column
How can we know which rows have been created or updated since the last query? There is no out-of-the-box solution with PostgreSQL, we need to create dedicated columns and triggers.
Let’s create a new accounts_timestamp table with an creation_at and modified_at columns. The following command can be used in the postgreSQL client
CREATE TABLE accounts_timestamp (
username VARCHAR ( 50 ) PRIMARY KEY,
email VARCHAR ( 255 ) UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
modified_at TIMESTAMP
)
;
The created_at field will work as expected immediately, with the DEFAULT NOW() definition.
The modified_at on the other side, requires a bit more tuning to be usable: we’ll need to create a trigger, that will insert the current timestamp in case of updates. The following SQL can be executed from the postgreSQL client.
CREATE OR REPLACE FUNCTION change_modified_at()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
BEGIN
NEW.modified_at := NOW();
RETURN NEW;
END;
$$
;
CREATE TRIGGER modified_at_updates
BEFORE UPDATE
ON accounts_timestamp
FOR EACH ROW
EXECUTE PROCEDURE change_modified_at();
The first statement will create the change_modified_at function that will later be used by the modified_at_updates trigger.
If we now insert the same rows as before
insert into accounts_timestamp (username, email) values ('francesco','fra@fakecompany.com');
insert into accounts_timestamp (username, email) values ('ugo','ugo@fakecompany.com');
insert into accounts_timestamp (username, email) values ('vincenzo','vincenzo@fakecompany.com');
We can verify that the created_at column is successfully populated with
<DATABASE>=> select * from accounts_timestamp;
username | email | created_at | modified_at
-----------+--------------------------+----------------------------+-------------
francesco | fra@fakecompany.com | 2021-03-01 11:58:27.429172 |
ugo | ugo@fakecompany.com | 2021-03-01 11:58:27.438725 |
vincenzo | vincenzo@fakecompany.com | 2021-03-01 11:58:27.448099 |
(3 rows)
Now let’s create a timestamp-based Kafka Connect JDBC sink by copying the kafka_jdbc_incremental_source.json file into a new kafka_jdbc_timestamp_source.json and amending the name, mode and table.whitelist.
We also need to set the newly created columns modified_at and created_at columns as timestamp.column.name entry. The value for this setting should be modified_at,created_at since modified_at will contain the most recent update timestamp, and in case of null value, we can rely on the created_at column. The file will look like the following
{
"name": "pg-timestamp-source",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
"connection.user": "<PG_USER>",
"connection.password": "<PG_PASSWORD>",
"table.whitelist": "accounts_timestamp",
"mode": "timestamp",
"timestamp.column.name":"modified_at,created_at",
"poll.interval.ms": "2000",
"topic.prefix": "pg_source_"
}
Again, we can start the timestamp-based connector with
avn service connector create $KAFKA_NAME @kafka_jdbc_timestamp_source.json --project $PROJECT_NAME
Checking the output pg_source_accounts_timestamp topic from Kafkacat will output the three rows as expected
{"username":"francesco","email":"fra@fakecompany.com","created_at":1614599907429,"modified_at":null}
{"username":"ugo","email":"ugo@fakecompany.com","created_at":1614599907438,"modified_at":null}
{"username":"vincenzo","email":"vincenzo@fakecompany.com","created_at":1614599907448,"modified_at":null}
% Reached end of topic pg_source_accounts_timestamp [0] at offset 3
If we now update an existing record with
update accounts_timestamp set email='francesco@fakecompany.com' where username='francesco';
We can verify that the modified_at column is correctly updated in PostgreSQL
<DATABASE>=> select * from accounts_timestamp; username | email | created_at | modified_at
-----------+---------------------------+----------------------------+----------------------------
ugo | ugo@fakecompany.com | 2021-03-01 11:58:27.438725 |
vincenzo | vincenzo@fakecompany.com | 2021-03-01 11:58:27.448099 |
francesco | francesco@fakecompany.com | 2021-03-01 11:58:27.429172 | 2021-03-01 12:00:37.084052
(3 rows)
And in Kafkacat we receive the update
{"username":"francesco","email":"francesco@fakecompany.com","created_at":1614599907429,"modified_at":1614600037084}
What about Deletions?
What happens if we delete one row? Let’s say our account ugo is gone
delete from accounts_timestamp where username='ugo';
We can verify it within postgreSQL
<DATABASE>=> select * from accounts_timestamp;
username | email | created_at | modified_at
-----------+---------------------------+----------------------------+----------------------------
vincenzo | vincenzo@fakecompany.com | 2021-03-01 11:58:27.448099 |
francesco | francesco@fakecompany.com | 2021-03-01 11:58:27.429172 | 2021-03-01 12:00:37.084052
(2 rows)
We wont see any new row in both timestamp and incremental based solution. This is due to the fact that such a deletion doesn’t create a new id or a newer timestamp. On the other side, the bulk connector will correctly output only the rows present in the table after the delete.
How to solve the deletion problem? We have two choices:
- We apply soft deletes by adding a boolean column
IS_DELETEDand set it totruewhen the record should be deleted. This action, which is now and update rather than a delete, will change themodified_atcolumn, thus the timestamp-based will propagate the change. - We switch from the JDBC to a CDC connector for PostgreSQL. The PostgreSQL CDC connector, using the DB’s Logical decoding feature, listens and propagates any insert, update, and delete statements.
Scenario 4: Complex queries
What if we don’t want to extract data from a single table, but join various tables and take the output?
Let’s reuse the accounts_serial example and create an additional table accounts_working_hours
create table accounts_working_hours
(
user_id integer REFERENCES accounts_serial (user_id),
work_date DATE,
nr_hours INT,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
And insert few rows
insert into accounts_working_hours (user_id, work_date, nr_hours) values (1, '2021-01-01',8);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (1, '2021-01-02',7);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (1, '2021-01-03',5);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (2, '2021-01-01',7);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (2, '2021-01-02',3);
The table accounts_working_hours contains for each day the number of hours worked by an account. Let’s say we want to feed Kafka with a stream having the updated total hours per account. The basic query will be
with total_hours as (
select username,
sum(nr_hours) nr_hours,
max(created_at) last_created_at
from
accounts_serial serial
inner join
accounts_working_hours awh
on serial.user_id=awh.user_id
group by username)
select * from total_hours;
That will output
username | nr_hours | last_created_at
-----------+----------+----------------------------
francesco | 20 | 2021-03-01 12:41:41.657594
ugo | 10 | 2021-03-01 12:41:41.676173
(2 rows)
Now let’s create a timestamp-based connector, using the last_created_at field as timestamp and the query above as query field. First we need to define the connector configurations in a new file kafka_jdbc_timestamp_agg_source.json
{
"name": "pg-timestamp-agg-source",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
"connection.user": "<PG_USER>",
"connection.password": "<PG_PASSWORD>",
"query": "with total_hours as (select username, sum(nr_hours) nr_hours, max(created_at) last_created_at from accounts_serial serial inner join accounts_working_hours awh on serial.user_id=awh.user_id group by username) select * from total_hours",
"mode": "timestamp",
"timestamp.column.name":"last_created_at",
"poll.interval.ms": "2000",
"topic.prefix": "pg_source_agg_hours",
"timestamp.delay.interval.ms": "1000"
}
Now let’s query with Kafkacat the topic pg_source_agg_hours
{"username":"francesco","nr_hours":20,"last_created_at":1614602501657}
{"username":"ugo","nr_hours":10,"last_created_at":1614602501676}
% Reached end of topic pg_sourge_agg_hours [0] at offset 2
if we add a new working day
insert into accounts_working_hours (user_id, work_date, nr_hours) values (3, '2021-01-01',12);
We can see the updated count in Kafkacat for vincenzo (user_id=3)
{"username":"vincenzo","nr_hours":12,"last_created_at":1614604142898}
% Reached end of topic pg_sourge_agg_hours [0] at offset 3
Schema Changes
What happens if I do a table’s schema change? Is that propagated by the connector?
Let’s check what happens if I add a column to the table and insert a new row
alter table accounts_serial add column company_name VARCHAR ( 50 );
insert into accounts_serial (username, email, company_name) values ('carlo','carlo@fakecompany.com','fakecompany');
Reading the pg_source_accounts_serial topic from the beginning with Kafkacat
{"user_id":1,"username":"francesco","email":"fra@fakecompany.com"}
{"user_id":2,"username":"ugo","email":"ugo@fakecompany.com"}
{"user_id":3,"username":"vincenzo","email":"vincenzo@fakecompany.com"}
{"user_id":4,"username":"laura","email":"laura@fakecompany.com"}
{"user_id":5,"username":"carlo","email":"carlo@fakecompany.com","company_name":"fakecompany"}
% Reached end of topic pg_source_accounts_serial [0] at offset 5
As we would expect, only the new record has the updated company_name field, while we didn’t receive any updates for the other records since the other ids were already parsed.
As long as the schema change doesn’t impact the incremental (or timestamp in case of timestamp-based connectors) defined columns we can expect the new columns to be propagated correctly since the connector is executing a SELECT * FROM TABLE. The same will of course apply in the bulk and timestamp-based connectors.
The query-based connector on the other side, has the set of fields defined in the query field of the config file. Thus it will need to be amended to reflect the changes in the source table fields that we want to propagate to Kafka.
Summary
Kafka Connect JDBC is a valid option if you’re planning to source events from an existing database (in this example PostgreSQL). The various optimisation methods such as incremental and timestamp need special columns to be present in the source tables but allow to focus the transmission only on the changed rows. The query mode enables more freedom in query definition, but doesn’t propagate schema changes. The following is an overview of the various methods and capabilities.
| Connect Type | Inserts | Updates | Deletion | Schema Change | Comments |
|---|---|---|---|---|---|
| bulk | ✅ | ❌ | ✅ | ✅ | All rows are extracted EVERY TIME |
| incremental | ✅ | ❌ | ❌ | ✅ | Doesn’t support updates/deletions if the incremental column doesn’t change |
| timestamp | ✅ | ✅ | ⚠️ | ✅ | Requires additional logic in the Database to work. Deletions work only if soft-deletions |
| query | ✅ | ✅ | ⚠️ | ❌ | Query fields are static, defined in Kafka Connect config file |
For more complex scenarios, CDC-based options provide a scalable solution with minimal impact on the source database.
If you want to know more about Aiven, Kafka and PostgreSQL