Configure Debezium Server
This document closely follows Debezium’s documentation. It explains how to use Debezium Server’s application.properties
configuration file to configure Debezium Server.
You will find similar explanations inside the application.properties
file generated by the scaffold
command:
redis-di scaffold --db-type cassandra|mysql|oracle|postgresql|sqlserver --dir <PATH_TO_DIR>
Target (sink) connector
The target connector is Redis.
Basic configuration
debezium.sink.type=redis
debezium.sink.redis.address=<REDIS_DI_BDB_HOST>:12001
debezium.sink.redis.password=<REDIS_DI_PASSWORD>
Preventing data loss
To prevent data loss the Debezium Redis Sink Connector needs to be configured to wait for write acknowledgment from the RDI database replica shard. Use the following property to achieve this:
debezium.sink.redis.wait.retry.enabled=true
Additionally, you can configure the wait timeout for replica shard acknowledgment and the delay between write retries (both 1000 milliseconds by default). See the reference section of this document for a full reference.
Redis data integration configuration reference
Property | Default | Description |
---|---|---|
debezium.sink.type | Must be set to redis. | |
debezium.sink.redis.address | An address, formatted as host:port, at which the Redis target streams are provided. | |
database.user | Username to use when connecting to the database server. | |
debezium.sink.redis.password | (Optional) A user password used to communicate with Redis. A password must be set if a user is set. | |
debezium.sink.redis.ssl.enabled | false | (Optional) Use SSL to communicate with Redis. |
debezium.sink.redis.null.key | Redis does not support the notion of data without keys, so this string will be used as the key name for records without primary key. | |
debezium.sink.redis.null.value | Redis does not support the notion of null payloads, as is the case with tombstone events. This string will be used as the value for records without a payload. | |
debezium.sink.redis.batch.size | 500 | The number of change records to insert into a single batch write (pipelined transaction). |
debezium.sink.redis.retry.initial.delay.ms | 300 | The initial retry delay when encountering Redis connection or OOM issues. This value will be doubled upon every retry but won’t exceed debezium.sink.redis.retry.max.delay.ms . |
debezium.sink.redis.retry.max.delay.ms | 10000 | The max delay when encountering Redis connection or OOM issues. |
debezium.sink.redis.memory.limit.mb | 80 | Debezium stops sending events when Redis size exceeds this threshold. |
debezium.sink.redis.wait.enabled | false | If Redis is configured with a replica shard, this setting allows Debezium to verify that the data has been written to the replica. |
debezium.sink.redis.wait.timeout.ms | 1000 | Defines the timeout in milliseconds when waiting for replica writes. |
debezium.sink.redis.wait.retry.enabled | false | Enables retry on wait for replica failure. |
debezium.sink.redis.wait.retry.delay.ms | 1000 | Defines the delay of retry on wait for replica failure. |
Notes
- When using Redis to store schema history and offsets, the values of the properties
debezium.source.offset.storage.redis.*
anddebezium.source.schema.history.internal.redis.*
will be inherited from the correspondingdebezium.sink.redis.*
properties. - If you want to override any of these inherited defaults, add them explicitly as
debezium.source.offset.storage.redis.*
and/ordebezium.source.schema.history.internal.redis.*
properties.
Source (database) connector
The source connector depends on the type of database used as the source. The basic configurations are the same for all database types except for the connector class.
Essential source properties
Note: Add
debezium.source.
prefix to the listed properties when used inapplication.properties
.
Property | Default | Source Databases | Description |
---|---|---|---|
connector.class | choose from the following: io.debezium.connector.postgresql.PostgresConnector , io.debezium.connector.mongodb.MongoDbConnector , io.debezium.connector.mysql.MySqlConnector , io.debezium.connector.oracle.OracleConnector , io.debezium.connector.sqlserver.SqlServerConnector , or io.debezium.connector.cassandra.Cassandra4Connector | ||
database.hostname | MySQL, Oracle, PostgreSQL, SQLServer | The address of the database instance. | |
database.port | MySQL, Oracle, PostgreSQL, SQLServer | The port number of the database instance. | |
database.user | MySQL, Oracle, PostgreSQL, SQLServer | Username to use when connecting to the database server. | |
database.password | MySQL, Oracle, PostgreSQL, SQLServer | Password to use when connecting to the database server. | |
database.dbname | Oracle, PostgreSQL | The name of the database from which to stream the changes. | |
database.names | SQLServer | The comma-separated list of the SQL Server database names from which to stream the changes. | |
database.pdb.name | ORCLPDB1 | Oracle | The name of the Oracle Pluggable Database that the connector captures changes from. For non-CDB installations, do not specify this property. |
lob.enabled | false | Oracle | Enables capturing and serialization of large object (CLOB, NCLOB, and BLOB) column values in change events. |
unavailable.value.placeholder | __debezium_unavailable_value | Oracle | Specifies the constant that the connector provides to indicate that the original value is unchanged and not provided by the database. |
database.encrypt | false | SQLServer | If SSL is enabled for a SQL Server database, set this property to true. |
database.server.id | 1 | MySQL | A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster. |
schema.include.list | Oracle, PostgreSQL, SQLServer | An optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes. Any schema name not included in schema.include.list is excluded from having its changes captured. By default, all non-system schemas have their changes captured. If you include this property in the configuration, Do not also set the schema.exclude.list property. | |
schema.exclude.list | Oracle, PostgreSQL, SQLServer | An optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes. Any schema whose name is not included in schema.exclude.list has its changes captured, with the exception of system schemas. If you include this property in the configuration, do not also set the schema.include.list property. | |
database.include.list | MongoDB, MySQL | An optional, comma-separated list of regular expressions that match the names of the databases for which to capture changes. The connector does not capture changes in any database whose name is not in database.include.list . By default, the connector captures changes in all databases. If you include this property in the configuration, do not also set the database.exclude.list property. | |
database.exclude.list | MongoDB, MySQL | An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes. The connector captures changes in any database whose name is not in the database.exclude.list . If you include this property in the configuration, do not also set the database.include.list property. | |
table.include.list | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables that you want Debezium to capture. Any table that is not included in table.include.list is excluded from capture. Each identifier is of the form schemaName.tableName . By default, the connector captures all non-system tables for the designated schemas. Must not be used with table.exclude.list . | |
table.exclude.list | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for the tables that you want to exclude from being captured; Debezium captures all tables that are not included in table.exclude.list . Each identifier is of the form schemaName.tableName . Must not be used with table.include.list . | |
column.include.list | empty string | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in the change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName . Note that primary key columns are always included in the event’s key, even if not included in the value. Do not also set the column.exclude.list property. |
column.exclude.list | empty string | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName . Note that primary key columns are always included in the event’s key, also if excluded from the value. Do not also set the column.include.list property. |
topic.prefix | Cassandra, MySQL, Oracle, PostgreSQL, SQLServer | A prefix for all topic names that receive events emitted by this connector. | |
cassandra.node.id | Cassandra | The name of the Cassandra node. | |
cassandra.hosts | localhost | Cassandra | One or more addresses of Cassandra nodes separated by “,”. |
cassandra.port | 9042 | Cassandra | The port used to connect to a Cassandra host(s). |
cassandra.config | Cassandra | The absolute path of the YAML config file used by a Cassandra node. | |
http.port | 8000 | Cassandra | The port used by the HTTP server for ping, health check, and build information. |
commit.log.relocation.dir | Cassandra | The local directory where commit logs get relocated to from cdc_raw dir after processing. | |
commit.log.real.time.processing.enabled | false | Cassandra | Only applicable in Cassandra 4 and, if set to true, the Cassandra connector agent will read commit logs incrementally by watching for updates in commit log index files and stream data in real-time, at the frequency determined by commit.log.marked.complete.poll.interval.ms . If set to false, then the Cassandra 4 connector waits for commit log files to be marked Completed before processing them. |
commit.log.marked.complete.poll.interval.ms | 10000 | Cassandra | Only applicable in Cassandra 4 and when real-time streaming is enabled by commit.log.real.time.processing.enabled . This config determines the frequency at which the commit log index file is polled for updates in offset value. |
mongodb.hosts | MongoDB | The connection string to use to connect to the MongoDB replica set. | |
mongodb.connection.mode | replica_set | MongoDB | Specifies the connecticity mode of the MongoDB database. If you are using a replica set, set the mode to replica_set . If you are using a sharded cluster, set the mode to sharded . |
collection.include.list | MongoDB | An optional, comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored. By default, the connector monitors all collections except those in the local and admin databases. Fully qualified names are of the form databaseName.collectionName . | |
collection.exclude.list | MongoDB | An optional, comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring. Fully qualified names are of the form databaseName.collectionName | |
field_exclude_list | MongoDB | An optional,comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName . |
Advanced source properties
Note: Add a
debezium.source.
prefix to the listed properties when used inapplication.properties
.
Property | Default | Source Databases | Description |
---|---|---|---|
value.converter.schemas.enable | true | Cassandra, MongoDB, MySQL, Oracle, PostgreSQL, SQLServer, | If set to false the schema payload will be excluded from each change event record. |
key.converter.schemas.enable | true | Cassandra, MongoDB, MySQL, Oracle, PostgreSQL, SQLServer | If set to false the key payload will be excluded from each change event record. |
Debezium connectors for various databases
For additional properties consult Debezium’s documentation for a specific connector:
Configure an initial snapshot without filtering queries (relevant for MySQL, Oracle, PostgreSQL and SQLServer)
Tables to be included in the initial snapshot require the property debezium.source.table.include.list
. They should be specified as a comma-separated list of fully-qualified table names.
Use queries in initial snapshot (relevant for MySQL, Oracle, PostgreSQL and SQLServer)
In case you want a snapshot to include only a subset of the rows in a table, you need to add the property
debezium.source.snapshot.select.statement.overrides
and add a comma-separated list of fully-qualified table names. The list should include every table for which you want to add a SELECT statement.For each table in the list above, add a further configuration property that specifies the
SELECT
statement for the connector to run on the table when it takes a snapshot.The specified
SELECT
statement determines the subset of table rows to include in the snapshot.Use the following format to specify the name of this
SELECT
statement property:- Oracle, SQLServer, PostrgreSQL:
snapshot.select.statement.overrides.<databaseName>.<tableName>
- MySQL:
snapshot.select.statement.overrides.<schemaName>.<tableName>
- Oracle, SQLServer, PostrgreSQL:
Add a comma-separated list of fully-qualified column names that are included in the
SELECT
statement:debezium.source.column.include.list = <databaseName>.<tableName>.<columnName1>,<databaseName>.<tableName>.<columnName2>,<databaseName>.<tableName>.<columnName3>...
In case you want to include all the table columns in the
SELECT
statement, you can use a regular expression in the form of<databaseName>.<tableName>.*
instead of adding each one of the table columns to thedebezium.source.column.include.list
property.
NOTE: Add all tables as a comma-separated list of fully-qualified table names to the property
debezium.source.table.include.list
.
Example
To select the columns CustomerId
, FirstName
, and LastName
from customer
table and join with the invoice
table in order to get customers with total invoices greater than 8000, add the following properties to the application.properties
file:
debezium.source.table.include.list = chinook.customer
debezium.source.column.include.list = chinook.customer.CustomerID,chinook.customer.FirstName,chinook.customer.LastName
debezium.source.snapshot.select.statement.overrides=chinook.customer
debezium.source.snapshot.select.statement.overrides.chinook.customer = SELECT c.CustomerId, c.FirstName, c.LastName \
FROM chinook.customer c INNER JOIN chinook.invoice inv \
ON c.CustomerId = inv.CustomerId \
WHERE inv.total > 8000
Form custom message key(s) for change event records
By default, Debezium uses the primary key column(s) of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.
To establish a custom message key for a table, list the table followed by the column to use as the message key. Each list entry takes the following format:
debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName>
To base a table key on multiple column names, insert commas between the columns names:
debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName1>,<columnName2>...
The property can include entries for multiple tables. Use a semicolon to separate table entries in the list:
debezium.source.message.key.columns=<databaseName>.<tableName1>:<columnName1>,<columnName2>;<databaseName>:<tableName>:<columnName1>,<columnName2>
Note: In case the property
column.include.list
is defined in yourapplication.properties
file, make sure it includes all the column names that are specified in the propertymessage.key.columns
.
Fully-qualified table name
In this document we refer to the fully-qualified table name as <databaseName>.<tableName>
. This format is for MySQL databases. For Oracle, SQLServer, and Postgresql databases use <schemaName>
.<tableName>
instead.
Database Type | Fully-qualified Table Name |
---|---|
Oracle, SQLServer, PostrgreSQL | <schemaName>.<tableName> |
MySQL | <databaseName>.<tableName> |
Notes
- You can specify the fully-qualified table name
<databaseName>.<tableName>
as a regular expression instead of providing the full name of thedatabaseName
andtableName
. - There is no limit to the number of columns that can be used to create custom message keys. However, it’s best to use the minimum required number of columns to specify a unique key.
Examples
The primary key of the tables
customer
andemployee
isCustomerID
.To establish custom messages keys based on
FirstName
andLastName
for the tablescustomer
andemployee
, add the following property to theapplication.properties
file:debezium.source.database.message.key.columns=chinook.customer:FirstName,LastName;chinook.employee:FirstName,LastName
To specify the columns
FirstName
andLastName
as the message keys for the tablechinook.customer
in any database add the following property to theapplication.properties
file:chinook.customer:FirstName,LastName;(.*).employee:FirstName,LastName
Configuring debezium connector to fetch source database secrets from secret store
Providing the source database password in clear text is not an acceptable option. Fortunately, Debezium supports fetching secrets from environment variables or from a file. This is done using a Quarkus feature that provides property expression expansion on configuration values.
An expression string is a mix of plain strings and expression segments, which are wrapped by the sequence ${…}
.
Using environment variables
Property Expressions also work with environment variables, for example:
...
database.password="${MYSQL_PASSWORD}"
...
Getting secrets from a file
In addition, Debezium application.properties
supports the ${file:path:key}
variable syntax. The path
is the absolute path to the file and the key
is the property key.
The file storing the secrets should be in a format of a single key=value
entry per line, for example:
...
debezium.source.config.providers=file
debezium.source.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
database.password="${file:/tmp/debezium/secrets.txt:db-password}"
...