Setup a JDBC Source from PostgreSQL

JDBC source connector is useful to push data from a relational database such as PostgreSQL to Kafka. Data in Kafka can be consumed, transformed and consumed any number of times in interesting ways. This help article illustrates steps to setup JDBC source connector with PostgreSQL database.

Kafka Connect JDBC source settings

Here we illustrate steps required to setup JDBC source connector with Aiven for PostgreSQL service. Before getting started with the steps, the following information regarding the Kafka and PostgreSQL services need to be collected.

Following Aiven for Kafka service details are required (from the Aiven console):

KAFKA_HOST
KAFKA_PORT
KAFKA_CONNECT_SERVICE_URI

Following PostgreSQL service details are required (from the Aiven console):

PG_INSTANCE_NAME
PG_SERVICE_URI
PG_HOST
PG_PORT
PG_USER
PG_PW
PG_DEFAULT_DB

The Kafka Connect JDBC source settings can be done via Aiven web UI or via JSON file. This article will cover the JSON example. Aiven’s web UI allows the copy/paste of JSON files and populates the UI forms appropriately once parsed the JSON file. The Kafka Connect JDBC source settings for a PostgreSQL database are the following

{
    "name": "pg-bulk-source",
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<PG_HOST>:<PG_PORT>/<PG_DEFAULT_DB>?sslmode=require",
    "connection.user": "<PG_USER>",
    "connection.password": "<PG_PASSWORD>",
    "table.whitelist": "<PREFIX>",
    "mode": "bulk|incrementing|timestamp",
    "poll.interval.ms": "2000",
    "topic.prefix": "<PREFIX>"
}

Parameters Description

avn service get $POSTGRES_NAME --project=$PROJECT_NAME --format '{service_uri_params}'

A specific mention needs to go for the mode parameter.

The Kafka Connect JDBC source allows 4 main modes:

  1. bulk: the connector will query the full table every time retrieving all rows and publishing them to Kafka
  2. incrementing: the connector will query the table appending a WHERE condition based on a Incrementing Column. This requires the presence of a column containing an always incremental number (like a series). This column will be used to check which rows have been added since last query, the column name will be passed via the incrementing.column.name parameter
  3. timestamp: the connector will query the table appending a WHERE condition based on a Timestamp Column(s). This requires the presence of one or more columns containing the row’s creation date and modification date. In cases of two columns (creation_date and modification_date) the polling query will apply the COALESCENCE function, take the value of the second column only when the first column is null. The timestamp column(s) can be passed via the timestamp.column.name parameter
  4. timestamp+incrementing: this last mode uses both the incrementing and timestamp functionalities with incrementing.column.name selecting the incremental column and timestamp.column.name the timestamp one(s).

Scenario 1: Bulk Load

Let’s start with the simple use case, a minimal table, to show the basic Kafka Connect JDBC source functionality.

Let’s Login to PostgreSQL

avn service cli $PG_INSTANCE_NAME

And create a simple accounts table containing username and email columns.

CREATE TABLE accounts (
	username VARCHAR ( 50 ) PRIMARY KEY,
	email VARCHAR ( 255 ) UNIQUE NOT NULL
);

Now let’s add some data

insert into accounts (username, email) values ('francesco','fra@fakecompany.com');
insert into accounts (username, email) values ('ugo','ugo@fakecompany.com');
insert into accounts (username, email) values ('vincenzo','vincenzo@fakecompany.com');

and verify it’s there

 <DATABASE>=> select * from accounts;
 username  |          email           
-----------+--------------------------
 francesco | fra@fakecompany.com
 ugo       | ugo@fakecompany.com
 vincenzo  | vincenzo@fakecompany.com
(3 rows)

Now it’s time to create the first Kafka Connect JDBC source. To do so we need to prepare a JSON file, named kafka_jdbc_simple_source.json, containing the following properties

{
    "name": "pg-bulk-source",
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
    "connection.user": "<PG_USER>",
    "connection.password": "<PG_PASSWORD>",
    "table.whitelist": "accounts",
    "mode": "bulk",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_"
}

Please check the table.whitelist setting pointing to the accounts table just created and the mode equal to bulk.

We can check out the output with [Kafkacat](help article, after downloading the appropriate SSL certificate files and setup the kafkacat.config configuration file with

kafkacat -F kafkacat.config -C -t pg_source_accounts

The output will be

{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 3
{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 6
{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 9

As we can notice, the Kafka Connect JDBC connector, is extracting the whole accounts table every 2 seconds (as per our poll.interval.ms) definition.

If now we add a new row to accounts from the postgreSQL client

insert into accounts (username, email) values ('laura','laura@fakecompany.com');

we can see the new row appearing in our topic at the following poll

{"username":"francesco","email":"fra@fakecompany.com"}
{"username":"ugo","email":"ugo@fakecompany.com"}
{"username":"vincenzo","email":"vincenzo@fakecompany.com"}
{"username":"laura","email":"laura@fakecompany.com"}
% Reached end of topic pg_source_accounts [0] at offset 10

Scenario 2: Incremental Column

Obviously querying and extracting the whole table at every poll is not ideal. Especially in cases where the table size is relevant, this can quickly become a bottleneck of any data pipeline solution. But, as mentioned before, we can use one of the incremental modes to solve the issue.

Let’s create a new accounts_serial table with an always incrementing SERIAL column. The following command can be used in the postgreSQL client

CREATE TABLE accounts_serial (
  user_id serial PRIMARY KEY,
  username VARCHAR ( 50 ) UNIQUE NOT NULL,
  email VARCHAR ( 255 ) UNIQUE NOT NULL
  )
;

And insert the same rows as before

insert into accounts_serial (username, email) values ('francesco','fra@fakecompany.com');
insert into accounts_serial (username, email) values ('ugo','ugo@fakecompany.com');
insert into accounts_serial (username, email) values ('vincenzo','vincenzo@fakecompany.com');

We can verify the correctness of the SERIAL column with

 <DATABASE>=> select * from accounts_serial;
 user_id | username  |          email           
---------+-----------+--------------------------
       1 | francesco | fra@fakecompany.com
       2 | ugo       | ugo@fakecompany.com
       3 | vincenzo  | vincenzo@fakecompany.com
(3 rows)

We can now start our incremental Kafka Connect JDBC source by copying the kafka_jdbc_simple_source.json file into a new kafka_jdbc_incremental_source.json and amending the name, mode and table.whitelist plus setting the newly created user_id column as incrementing.column.name entry. The new file will look like the following

{
    "name": "pg-incremental-source",
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
    "connection.user": "<PG_USER>",
    "connection.password": "<PG_PASSWORD>",
    "table.whitelist": "accounts_serial",
    "mode": "incrementing",
    "incrementing.column.name":"user_id",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_"
}

Again, we can start the connect with

avn service connector create $KAFKA_NAME @kafka_jdbc_incremental_source.json --project $PROJECT_NAME

and read the newly created topic with Kafkacat

kafkacat -F kafkacerts/kafkacat.config -C -t pg_source_accounts_serial

as expected the topic will contains only the 3 records

{"user_id":1,"username":"francesco","email":"fra@fakecompany.com"}
{"user_id":2,"username":"ugo","email":"ugo@fakecompany.com"}
{"user_id":3,"username":"vincenzo","email":"vincenzo@fakecompany.com"}
% Reached end of topic pg_source_accounts_serial [0] at offset 3

If we now insert the additional 4th record within the postgreSQL client

insert into accounts_serial (username, email) values ('laura','laura@fakecompany.com');

Kafkacat’s output will be

{"user_id":4,"username":"laura","email":"laura@fakecompany.com"}
% Reached end of topic pg_source_accounts_serial [0] at offset 4

Showing only the additional row as expected

Now, what does it happen if I update a row?

update accounts_serial set email='francesco@fakecompany.com' where user_id=1;

Well, the update will NOT be pushed to Kafka, since the user_id column wasn’t updated and all the records with user_id <= 4 were already parsed. This can represent a problem if we’re keen on retrieving not only the new records but also the changed ones.

Scenario 3: Timestamp column

How can we know which rows have been created or updated since the last query? There is no out-of-the-box solution with PostgreSQL, we need to create dedicated columns and triggers.

Let’s create a new accounts_timestamp table with an creation_at and modified_at columns. The following command can be used in the postgreSQL client

CREATE TABLE accounts_timestamp (
  username VARCHAR ( 50 ) PRIMARY KEY,
  email VARCHAR ( 255 ) UNIQUE NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  modified_at TIMESTAMP
  )
;

The created_at field will work as expected immediately, with the DEFAULT NOW() definition. The modified_at on the other side, requires a bit more tuning to be usable: we’ll need to create a trigger, that will insert the current timestamp in case of updates. The following SQL can be executed from the postgreSQL client.

CREATE OR REPLACE FUNCTION change_modified_at()
  RETURNS TRIGGER
  LANGUAGE PLPGSQL
  AS
$$
BEGIN
	NEW.modified_at := NOW();
	RETURN NEW;
END;
$$
;
CREATE TRIGGER modified_at_updates
  BEFORE UPDATE
  ON accounts_timestamp
  FOR EACH ROW
  EXECUTE PROCEDURE change_modified_at();

The first statement will create the change_modified_at function that will later be used by the modified_at_updates trigger. If we now insert the same rows as before

insert into accounts_timestamp (username, email) values ('francesco','fra@fakecompany.com');
insert into accounts_timestamp (username, email) values ('ugo','ugo@fakecompany.com');
insert into accounts_timestamp (username, email) values ('vincenzo','vincenzo@fakecompany.com');

We can verify that the created_at column is successfully populated with

 <DATABASE>=> select * from accounts_timestamp;                                   
 username  |          email           |         created_at         | modified_at
-----------+--------------------------+----------------------------+-------------
 francesco | fra@fakecompany.com      | 2021-03-01 11:58:27.429172 |
 ugo       | ugo@fakecompany.com      | 2021-03-01 11:58:27.438725 |
 vincenzo  | vincenzo@fakecompany.com | 2021-03-01 11:58:27.448099 |
(3 rows)

Now let’s create a timestamp-based Kafka Connect JDBC sink by copying the kafka_jdbc_incremental_source.json file into a new kafka_jdbc_timestamp_source.json and amending the name, mode and table.whitelist.

We also need to set the newly created columns modified_at and created_at columns as timestamp.column.name entry. The value for this setting should be modified_at,created_at since modified_at will contain the most recent update timestamp, and in case of null value, we can rely on the created_at column. The file will look like the following

{
    "name": "pg-timestamp-source",
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
    "connection.user": "<PG_USER>",
    "connection.password": "<PG_PASSWORD>",
    "table.whitelist": "accounts_timestamp",
    "mode": "timestamp",
    "timestamp.column.name":"modified_at,created_at",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_"
}

Again, we can start the timestamp-based connector with

avn service connector create $KAFKA_NAME @kafka_jdbc_timestamp_source.json --project $PROJECT_NAME

Checking the output pg_source_accounts_timestamp topic from Kafkacat will output the three rows as expected

{"username":"francesco","email":"fra@fakecompany.com","created_at":1614599907429,"modified_at":null}
{"username":"ugo","email":"ugo@fakecompany.com","created_at":1614599907438,"modified_at":null}
{"username":"vincenzo","email":"vincenzo@fakecompany.com","created_at":1614599907448,"modified_at":null}
% Reached end of topic pg_source_accounts_timestamp [0] at offset 3

If we now update an existing record with

update accounts_timestamp set email='francesco@fakecompany.com' where username='francesco';

We can verify that the modified_at column is correctly updated in PostgreSQL

 <DATABASE>=> select * from accounts_timestamp;                                     username  |           email           |         created_at         |        modified_at         
-----------+---------------------------+----------------------------+----------------------------
 ugo       | ugo@fakecompany.com       | 2021-03-01 11:58:27.438725 |
 vincenzo  | vincenzo@fakecompany.com  | 2021-03-01 11:58:27.448099 |
 francesco | francesco@fakecompany.com | 2021-03-01 11:58:27.429172 | 2021-03-01 12:00:37.084052
(3 rows)

And in Kafkacat we receive the update

{"username":"francesco","email":"francesco@fakecompany.com","created_at":1614599907429,"modified_at":1614600037084}

What about Deletions?

What happens if we delete one row? Let’s say our account ugo is gone

delete from accounts_timestamp where username='ugo';

We can verify it within postgreSQL

 <DATABASE>=> select * from accounts_timestamp;
 username  |           email           |         created_at         |        modified_at         
-----------+---------------------------+----------------------------+----------------------------
 vincenzo  | vincenzo@fakecompany.com  | 2021-03-01 11:58:27.448099 |
 francesco | francesco@fakecompany.com | 2021-03-01 11:58:27.429172 | 2021-03-01 12:00:37.084052
(2 rows)

We wont see any new row in both timestamp and incremental based solution. This is due to the fact that such a deletion doesn’t create a new id or a newer timestamp. On the other side, the bulk connector will correctly output only the rows present in the table after the delete.

How to solve the deletion problem? We have two choices:

Scenario 4: Complex queries

What if we don’t want to extract data from a single table, but join various tables and take the output?

Let’s reuse the accounts_serial example and create an additional table accounts_working_hours

create table accounts_working_hours
(
  user_id integer REFERENCES accounts_serial (user_id),
  work_date DATE,
  nr_hours INT,
  created_at TIMESTAMP NOT NULL DEFAULT NOW()
  );

And insert few rows

insert into accounts_working_hours (user_id, work_date, nr_hours) values (1, '2021-01-01',8);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (1, '2021-01-02',7);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (1, '2021-01-03',5);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (2, '2021-01-01',7);
insert into accounts_working_hours (user_id, work_date, nr_hours) values (2, '2021-01-02',3);

The table accounts_working_hours contains for each day the number of hours worked by an account. Let’s say we want to feed Kafka with a stream having the updated total hours per account. The basic query will be

with total_hours as (
  select username,
    sum(nr_hours) nr_hours,
    max(created_at) last_created_at
  from
    accounts_serial serial  
      inner join
    accounts_working_hours awh
      on serial.user_id=awh.user_id
  group by username)
select * from total_hours;

That will output

username  | nr_hours |      last_created_at       
-----------+----------+----------------------------
francesco |       20 | 2021-03-01 12:41:41.657594
ugo       |       10 | 2021-03-01 12:41:41.676173
(2 rows)

Now let’s create a timestamp-based connector, using the last_created_at field as timestamp and the query above as query field. First we need to define the connector configurations in a new file kafka_jdbc_timestamp_agg_source.json

{
    "name": "pg-timestamp-agg-source",
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/ <DATABASE>?sslmode=require",
    "connection.user": "<PG_USER>",
    "connection.password": "<PG_PASSWORD>",
    "query": "with total_hours as (select username, sum(nr_hours) nr_hours, max(created_at) last_created_at from accounts_serial serial   inner join  accounts_working_hours awh on serial.user_id=awh.user_id group by username) select * from total_hours",
    "mode": "timestamp",
    "timestamp.column.name":"last_created_at",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_agg_hours",
    "timestamp.delay.interval.ms": "1000"
}

Now let’s query with Kafkacat the topic pg_source_agg_hours

{"username":"francesco","nr_hours":20,"last_created_at":1614602501657}
{"username":"ugo","nr_hours":10,"last_created_at":1614602501676}
% Reached end of topic pg_sourge_agg_hours [0] at offset 2

if we add a new working day

insert into accounts_working_hours (user_id, work_date, nr_hours) values (3, '2021-01-01',12);

We can see the updated count in Kafkacat for vincenzo (user_id=3)

{"username":"vincenzo","nr_hours":12,"last_created_at":1614604142898}
% Reached end of topic pg_sourge_agg_hours [0] at offset 3

Schema Changes

What happens if I do a table’s schema change? Is that propagated by the connector?

Let’s check what happens if I add a column to the table and insert a new row

alter table accounts_serial add column company_name VARCHAR ( 50 );
insert into accounts_serial (username, email, company_name) values ('carlo','carlo@fakecompany.com','fakecompany');

Reading the pg_source_accounts_serial topic from the beginning with Kafkacat

{"user_id":1,"username":"francesco","email":"fra@fakecompany.com"}
{"user_id":2,"username":"ugo","email":"ugo@fakecompany.com"}
{"user_id":3,"username":"vincenzo","email":"vincenzo@fakecompany.com"}
{"user_id":4,"username":"laura","email":"laura@fakecompany.com"}
{"user_id":5,"username":"carlo","email":"carlo@fakecompany.com","company_name":"fakecompany"}
% Reached end of topic pg_source_accounts_serial [0] at offset 5

As we would expect, only the new record has the updated company_name field, while we didn’t receive any updates for the other records since the other ids were already parsed. As long as the schema change doesn’t impact the incremental (or timestamp in case of timestamp-based connectors) defined columns we can expect the new columns to be propagated correctly since the connector is executing a SELECT * FROM TABLE. The same will of course apply in the bulk and timestamp-based connectors.

The query-based connector on the other side, has the set of fields defined in the query field of the config file. Thus it will need to be amended to reflect the changes in the source table fields that we want to propagate to Kafka.

Summary

Kafka Connect JDBC is a valid option if you’re planning to source events from an existing database (in this example PostgreSQL). The various optimisation methods such as incremental and timestamp need special columns to be present in the source tables but allow to focus the transmission only on the changed rows. The query mode enables more freedom in query definition, but doesn’t propagate schema changes. The following is an overview of the various methods and capabilities.

Connect Type Inserts Updates Deletion Schema Change Comments
bulk All rows are extracted EVERY TIME
incremental Doesn’t support updates/deletions if the incremental column doesn’t change
timestamp ⚠️ Requires additional logic in the Database to work. Deletions work only if soft-deletions
query ⚠️ Query fields are static, defined in Kafka Connect config file

For more complex scenarios, CDC-based options provide a scalable solution with minimal impact on the source database.

If you want to know more about Aiven, Kafka and PostgreSQL