{"id":2834,"date":"2025-03-13T15:44:30","date_gmt":"2025-03-13T15:44:30","guid":{"rendered":"https:\/\/bynatree.com\/?p=2834"},"modified":"2025-03-13T15:44:30","modified_gmt":"2025-03-13T15:44:30","slug":"real-time-data-replication-postgresql-16-%e2%86%92-kafka-%e2%86%92-sql-server-2022-express","status":"publish","type":"post","link":"https:\/\/divaind.com\/ie1\/2025\/03\/13\/real-time-data-replication-postgresql-16-%e2%86%92-kafka-%e2%86%92-sql-server-2022-express\/","title":{"rendered":"Real-Time Data Replication: PostgreSQL 16 \u2192 Kafka \u2192 SQL Server 2022 Express"},"content":{"rendered":"<blockquote><p>A few days ago, we were approached by a client with a particular objective but a difficult barrier. Their goal? <strong>Real-Time Data Replication: PostgreSQL 16 \u2192 Kafka \u2192 SQL Server 2022 Express.<\/strong> They needed their data to be transferred effortlessly from PostgreSQL 16 to SQL Server 2022 Express in real time.<\/p><\/blockquote>\n<p>Their expanding operations demanded improved system collaboration, and delays caused by outdated data workflows could no longer be tolerated. This challenge set the stage for an innovative and efficient solution.<\/p>\n<h3>System Configuration<\/h3>\n<h3>Linux VM (CentOS 8.5)<\/h3>\n<ul>\n<li>PostgreSQL 16 (Source Database)<\/li>\n<li>Kafka 3.9.0 (Message Broker)<\/li>\n<li>Debezium Connector for PostgreSQL 3.0.6<\/li>\n<li>Kafka Connect (Handles Source and Sink Connectors)<\/li>\n<li>Kafka JDBC Sink Connector (For SQL Server Integration)<\/li>\n<\/ul>\n<h3>Windows Machine<\/h3>\n<ul>\n<li>SQL Server 2022 Express (Target Database)<\/li>\n<\/ul>\n<h3>Steps to Configure Real-Time Replication<\/h3>\n<p><strong>Install and Configure PostgreSQL 16<\/strong><br \/>\n<strong>Enable Logical Replication in PostgreSQL<\/strong><br \/>\nEdit <strong>postgresql.conf:<\/strong><\/p>\n<pre class=\"theme:solarized-light lang:default decode:true\">sudo vi \/var\/lib\/pgsql\/16\/data\/postgresql.conf<\/pre>\n<p>Ensure these settings:<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true \">wal_level = logical\nmax_replication_slots = 10\nmax_wal_senders = 10<\/pre>\n<p>Restart PostgreSQL:<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true\">sudo systemctl restart postgresql-16<\/pre>\n<h3>Created a <strong>logical replication slot<\/strong> and a publication for the required tables.<\/h3>\n<p>Create Replication Slot &amp;amp; Publication<\/p>\n<p>Create a logical replication slot to read the transaction logs, create publication with tables need to be replicated.<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true \">SELECT pg_create_logical_replication_slot(&amp;#39;debezium_slot&amp;#39;, &amp;#39;pgoutput&amp;#39;);\nCREATE PUBLICATION dbz_publication FOR TABLE public.employee;<\/pre>\n<h3>Install and Configure Kafka 3.9.0<br \/>\nDownload &amp;amp; Extract Kafka<\/h3>\n<pre class=\"theme:solarized-light lang:default decode:true \">wget https:\/\/downloads.apache.org\/kafka\/3.9.0\/kafka_2.13-3.9.0.tgz\ntar -xvzf kafka_2.13-3.9.0.tgz\ncd kafka_2.13-3.9.0<\/pre>\n<h3>Start Zookeeper &amp;amp; Kafka Broker<\/h3>\n<p>Started both <strong>Zookeeper<\/strong> and the <strong>Kafka broker<\/strong> to handle the data pipeline.<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true \">bin\/zookeeper-server-start.sh config\/zookeeper.properties &amp;amp;\nbin\/kafka-server-start.sh config\/server.properties &amp;amp;<\/pre>\n<h3>Install Debezium PostgreSQL Connector<br \/>\nDownload and Place in Plugins Directory<\/h3>\n<pre class=\"theme:solarized-light lang:default decode:true \">mkdir -p kafka_2.13-3.9.0\/plugins\/debezium-postgres\ncd kafka_2.13-3.9.0\/plugins\/debezium-postgres\nwget https:\/\/repo1.maven.org\/maven2\/io\/debezium\/debezium-connector-\npostgres\/3.0.6.Final\/debezium-connector-postgres-3.0.6.Final.jar<\/pre>\n<h3>Register Debezium PostgreSQL Source Connector<\/h3>\n<p>Registered the Debezium PostgreSQL <strong>source connector<\/strong>, linking PostgreSQL to Kafka.<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true \">Create register-postgres.json:\n{\n&amp;quot;name&amp;quot;: &amp;quot;postgres-connector&amp;quot;,\n&amp;quot;config&amp;quot;: {\n&amp;quot;connector.class&amp;quot;: &amp;quot;io.debezium.connector.postgresql.PostgresConnector&amp;quot;,\n&amp;quot;database.hostname&amp;quot;: &amp;quot;localhost&amp;quot;,\n&amp;quot;database.port&amp;quot;: &amp;quot;5432&amp;quot;,\n&amp;quot;database.user&amp;quot;: &amp;quot;postgres&amp;quot;,\n&amp;quot;database.password&amp;quot;: &amp;quot;postgres&amp;quot;,\n&amp;quot;database.dbname&amp;quot;: &amp;quot;postgres&amp;quot;,\n&amp;quot;database.server.name&amp;quot;: &amp;quot;postgres&amp;quot;,\n&amp;quot;plugin.name&amp;quot;: &amp;quot;pgoutput&amp;quot;,\n\n&amp;quot;slot.name&amp;quot;: &amp;quot;debezium_slot&amp;quot;,\n&amp;quot;publication.name&amp;quot;: &amp;quot;dbz_publication&amp;quot;,\n&amp;quot;table.include.list&amp;quot;: &amp;quot;public.employee&amp;quot;,\n&amp;quot;topic.prefix&amp;quot;: &amp;quot;postgres&amp;quot;,\n&amp;quot;database.history.kafka.bootstrap.servers&amp;quot;: &amp;quot;localhost:9092&amp;quot;,\n&amp;quot;database.history.kafka.topic&amp;quot;: &amp;quot;schema-changes.postgres&amp;quot;\n}\n}<\/pre>\n<p>Register the connector:<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true \">curl -X POST -H &amp;quot;Accept:application\/json&amp;quot; -H &amp;quot;Content-Type:application\/json&amp;quot; \\\nhttp:\/\/localhost:8083\/connectors\/ -d @register-postgres.json<\/pre>\n<h2>Setup SQL Server 2022 Express<\/h2>\n<h3>Enable SQL Server Authentication<\/h3>\n<p>1. Open SQL Server Management Studio (SSMS).<br \/>\n2. Right-click Server Name \u2192 Properties \u2192 Security.<br \/>\n3. Change Server Authentication to SQL Server and Windows Authentication mode.<br \/>\n4. Restart SQL Server.<\/p>\n<p>Enable sa Login<\/p>\n<p>Enabled the <strong>sa login<\/strong> to provide full access<\/p>\n<pre class=\"theme:solarized-light lang:pgsql decode:true\">ALTER LOGIN sa ENABLE;\nALTER LOGIN sa WITH PASSWORD = &amp;#39;YourNewStrongPassword&amp;#39;<\/pre>\n<h3>Downloaded and installed the <strong>JDBC connector<\/strong>.<\/h3>\n<p>Install and Configure Kafka JDBC Sink Connector for SQL Server<br \/>\nDownload JDBC Connector<\/p>\n<pre class=\"theme:solarized-light lang:sh decode:true \">mkdir -p kafka_2.13-3.9.0\/plugins\/jdbc-sink\ncd kafka_2.13-3.9.0\/plugins\/jdbc-sink\nwget https:\/\/packages.confluent.io\/maven\/io\/confluent\/kafka-connect-jdbc\/10.8.0\/kafka-connect-\njdbc-10.8.0.jar<\/pre>\n<h3>Register Kafka JDBC Sink Connector<\/h3>\n<p>Create a table called staff in the destination SQL server database and give \u201ctable.name.format\u201d for<br \/>\nthe same. This will replicate the data from employee table in PostgreSQL to staff in SQL server.<\/p>\n<pre class=\"theme:solarized-light lang:plsql decode:true \">Create register-mssql-sink.json:\n{\n&amp;quot;name&amp;quot;: &amp;quot;mssql-sink-connector&amp;quot;,\n&amp;quot;config&amp;quot;: {\n&amp;quot;connector.class&amp;quot;: &amp;quot;io.confluent.connect.jdbc.JdbcSinkConnector&amp;quot;,\n&amp;quot;tasks.max&amp;quot;: &amp;quot;1&amp;quot;,\n&amp;quot;topics&amp;quot;: &amp;quot;postgres.public.employee&amp;quot;,\n&amp;quot;connection.url&amp;quot;: &amp;quot;jdbc:sqlserver:\/\/192.168.0.102:1433;databaseName=kafka_db;encrypt=false&amp;quot;,\n&amp;quot;connection.user&amp;quot;: &amp;quot;sa&amp;quot;,\n&amp;quot;connection.password&amp;quot;: &amp;quot;YourNewStrongPassword&amp;quot;,\n&amp;quot;auto.create&amp;quot;: &amp;quot;true&amp;quot;,\n&amp;quot;auto.evolve&amp;quot;: &amp;quot;true&amp;quot;,\n&amp;quot;insert.mode&amp;quot;: &amp;quot;upsert&amp;quot;,\n&amp;quot;pk.fields&amp;quot;: &amp;quot;id&amp;quot;,\n&amp;quot;pk.mode&amp;quot;: &amp;quot;record_value&amp;quot;,\n&amp;quot;table.name.format&amp;quot;: &amp;quot;staff&amp;quot;,\n&amp;quot;dialect.name&amp;quot;: &amp;quot;SqlServerDatabaseDialect&amp;quot;,\n&amp;quot;batch.size&amp;quot;: &amp;quot;1000&amp;quot;,\n&amp;quot;delete.enabled&amp;quot;: &amp;quot;true&amp;quot;,\n&amp;quot;transforms&amp;quot;: &amp;quot;unwrap&amp;quot;,\n&amp;quot;transforms.unwrap.type&amp;quot;: &amp;quot;io.debezium.transforms.ExtractNewRecordState&amp;quot;,\n&amp;quot;transforms.unwrap.drop.tombstones&amp;quot;: &amp;quot;false&amp;quot;,\n&amp;quot;transforms.unwrap.delete.handling.mode&amp;quot;: &amp;quot;drop&amp;quot;\n}\n}<\/pre>\n<h3>Register the connector:<\/h3>\n<p>Registered the Kafka JDBC <strong>sink connector<\/strong>, mapping the data flow to SQL Server.<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true \">curl -X POST -H &amp;quot;Accept:application\/json&amp;quot; -H &amp;quot;Content-Type:application\/json&amp;quot; \\\nhttp:\/\/localhost:8083\/connectors\/ -d @register-mssql-sink.json<\/pre>\n<h3>Start Zookeeper, Kafka Broker, and Kafka Connect:<\/h3>\n<p>Launched <strong>Zookeeper<\/strong>, the <strong>Kafka Broker<\/strong>, and <strong>Kafka Connect<\/strong>, ensuring all components worked in harmony.<\/p>\n<pre class=\"theme:solarized-light lang:default decode:true \">bin\/zookeeper-server-start.sh config\/zookeeper.properties &amp;amp;\nbin\/kafka-server-start.sh config\/server.properties &amp;amp;\nbin\/connect-distributed.sh config\/connect-distributed.properties &amp;amp;<\/pre>\n<p>What we achieved was nothing short of extraordinary: a real-time replication system that seamlessly integrated PostgreSQL and SQL Server. The once-tedious manual operations were automated and streamlined. By connecting these technologies, we provided a solution that not only met, but fulfilled, the client&#8217;s expectations.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>A few days ago, we were approached by a client with a particular objective but a difficult barrier. Their goal? Real-Time Data Replication: PostgreSQL 16 \u2192 Kafka \u2192 SQL Server 2022 Express. They needed their data to be transferred effortlessly from PostgreSQL 16 to SQL Server 2022 Express in real time. Their expanding operations demanded&hellip;<\/p>\n","protected":false},"author":1,"featured_media":2835,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[28],"tags":[82,96,133,135,136,141,170,187,266,302,310,322,336],"class_list":["post-2834","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-postgresql","tag-bigdata","tag-cloudcomputing","tag-dataintegration","tag-datapipeline","tag-datareplication","tag-debezium","tag-innovation","tag-kafka","tag-postgresql","tag-realtimedata","tag-scalablesystems","tag-sqlserver","tag-techsolutions","category-28","description-off"],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/posts\/2834","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/comments?post=2834"}],"version-history":[{"count":0,"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/posts\/2834\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/media\/2835"}],"wp:attachment":[{"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/media?parent=2834"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/categories?post=2834"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/divaind.com\/ie1\/wp-json\/wp\/v2\/tags?post=2834"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}