Postgres CDC with Airbyte on AWS RDS

Before starting, You Need to Understand

what is the CDC?

it's change data capture technic for not full-sync. imagine you have 1 Tera byte of DB data. and you want sync to another destination like BigQuery or S3 or anywhere you want.

if you full-sync, you nee to copy 1 Tera each time. it's very slow and inefficient

Physical Replication & Logical Replication

setting CDC is Logical Replication for that, you need to change the configuration of PostgreSQL logical_replication to 1

after that, you can be set like this

-- Each database should have a different slot
-- If you do not designate a DB name, then the default is your connection DB
SELECT pg_create_logical_replication_slot('my_cdc_slot', 'pgoutput');

-- you can set it like this
SELECT pg_create_logical_replication_slot('airbyte_cdc_slot', 'pgoutput', false, 'your_database_name');

Write-Ahead Log

THIS IS MOST IMPORTANT IF YOU DO NOT READ THIS CAREFULLY YOUR DB COULD SHUTDOWN

after you make cdc_slot and set identity, the publication

DB engine will make WAL(write-ahead log) for you UNTIL YOU CONSUME.

NO MATTER DB RESTARTS OR NOT, THEY WILL KEEP MAKE AND MAINTAIN WAL

so if you do not consume this, you should remove the slot.

Otherwise, the DB disk will be filled with WAL, and DB will shut down soon.

Max Wal Senders

max_wal_senders is also a PostgreSQL configuration parameter.

It controls the maximum consumer of DB for replication. The default on RDS is 35. so you may not need to modify.

Identity, Publication, Slot

Identity

you should set identify To detect rows added or deleted, update

Publication

publication is a set of identities. You have to mention what table you will publish.

slot

the slot is like a ticket for access to the publication

CDC clients (such as airbyte) will decide what DB or what publication they subscribe

airbyte & bigquery setup tip

when multiple databases are set to big query I recommend making connections N times for prefix name

becuase if you just connect A,B,C database to bigquery B destination there could be conflict DB name.

so set N times according to how much you have a slot,

also, make destination N times for prefix (if you want distinguish)

Check List

# install Postgres on Mac

  1. add this to your .zprofile

 # Postgres
 export PATH=/Applications/Postgres.app/Contents/Versions/latest/bin:$PATH

Postgres CDC with Airbyte on AWS RDS

Airbyte is an open-source data movement infrastructure for building extract and load (EL) data pipelines. It is designed for versatility, scalability, and ease-of-use.

Many common databases support writing all record changes to log files for the purpose of replication. A consumer of these log files (such as Airbyte) can read these logs while keeping track of the current position within the logs in order to read all record changes coming from DELETE/INSERT/UPDATE statements.

Quick Start

Here is an outline of the minimum required steps to configure a Postgres connector:

  1. Create a dedicated read-only Postgres user with permissions for replicating data

  2. Create a new Postgres source in the Airbyte UI using xmin system column

  3. (Airbyte Cloud Only) Allow inbound traffic from Airbyte IPs

These are the additional steps required (after following the quick start) to configure your Postgres source using CDC:

  1. Provide additional REPLICATION permissions to read-only user

  2. Enable logical replication on your Postgres database

  3. Create a replication slot on your Postgres database

  4. Create publication and replication identities for each Postgres table

  5. Enable CDC replication in the Airbyte UI

  1. access DB use account master_user_do_not_use on AWS

  2. run

    CREATE USER airbyte_cdc PASSWORD 'YOUR_PASSWORD';
  3. check schema SELECT schema_name FROM information_schema.schemata;

  4. grant read-only for all schema

GRANT USAGE ON SCHEMA <schema_name> TO <user_name>;
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <user_name>;
ALTER DEFAULT PRIVILEGES IN SCHEMA <schema_name> GRANT SELECT ON TABLES TO <user_name>;

> you have to repeat for all schema you also may not able to GRAND, because default setting already gives you 'USAGE', 'SELECT';

  1. Step 2: Provide additional permissions to read-only user

ALTER USER <user_name> REPLICATION;

RDS not working ALTER REPLICATION!

SELECT r.rolname AS role_with_replication
FROM pg_roles r
         JOIN pg_auth_members m ON r.oid = m.member
         JOIN pg_roles g ON g.oid = m.roleid
WHERE g.rolname = 'rds_replication';

  • grant to user

GRANT <role_name> TO <user_name>;
GRANT custom_admin_role TO airbyte_cdc;
  • verify

SELECT r.rolname AS role_name, 
       ARRAY_AGG(m.rolname) AS members 
FROM pg_roles r
JOIN pg_auth_members a ON r.oid = a.roleid
JOIN pg_roles m ON a.member = m.oid
WHERE r.rolname = 'custom_admin_role'
GROUP BY r.rolname;

Step 3: Enable logical replication on your Postgres database

if you use native postgreSQL

To enable logical replication on bare metal, VMs (EC2/GCE/etc), or Docker, configure the following parameters in the postgresql.conf file for your Postgres database:

if you use AWS RDS

AWS RDS not able to access postgresql.conf. you should use parameter group

postgres AWS RDS - failed to create replication users

search rds.logical_replication

default value is 0

change to 1

modify your DB parameter group from default -> <YOUR_PARAMETER_GROUP>

wal_level -> logical_replication

debezium document - PostgreSQL on Amazon RDS

no need to setup max_wal_senders & max_replication_slots

  • apply parameter group on RDS

Step 4: Create a replication slot on your Postgres database

SELECT pg_create_logical_replication_slot('airbyte_cdc_slot', 'pgoutput');

verify

SELECT slot_name, plugin, slot_type, database, active, restart_lsn
FROM pg_replication_slots
WHERE slot_type = 'logical';

Step 5: Create publication and replication identities for each Postgres table (need root user)

1. Add the replication identity (the method of distinguishing between rows) for each table you want to replicate:

# manual SQL
ALTER TABLE tbl1 REPLICA IDENTITY DEFAULT;
  • Create a shell script (e.g., set_replica_identity.sh):

#!/bin/bash

# Database connection details
HOST="your_host"
PORT="your_port"
USER="your_user"
DBNAME="your_dbname"

# SQL to generate ALTER TABLE statements for all tables
SQL="SELECT 'ALTER TABLE ' || tablename || ' REPLICA IDENTITY DEFAULT;' FROM pg_tables WHERE schemaname = 'public';"

# Execute the SQL and pipe the output to psql for execution
psql -h "$HOST" -p "$PORT" -U "$USER" -d "$DBNAME" -Atc "$SQL" | psql -h "$HOST" -p "$PORT" -U "$USER" -d "$DBNAME"
  • Make the script executable:

    chmod +x set_replica_identity.sh

  • Run the Script:

./set_replica_identity.sh

you need input password twice when run script (I don't know why :D )

2. Create the Postgres publication. You should include all tables you want to replicate as part of the publication:

CREATE PUBLICATION airbyte_publication FOR ALL TABLES;

# manual SQL
CREATE PUBLICATION airbyte_publication FOR TABLE <tbl1, tbl2, tbl3>;

Create a shell script (e.g., create_publication.sh):

#!/bin/bash

# Database connection details
HOST="your_host"
PORT="your_port"
USER="your_user"
DBNAME="your_database"

# Generate the list of tables and create the publication
SQL=$(psql -h "$HOST" -p "$PORT" -U "$USER" -d "$DBNAME" -Atc "SELECT 'CREATE PUBLICATION airbyte_publication FOR TABLE ' || string_agg(quote_ident(schemaname) || '.' || quote_ident(tablename), ', ') || ';' FROM pg_tables WHERE schemaname = 'public';")

# Execute the generated SQL statement
psql -h "$HOST" -p "$PORT" -U "$USER" -d "$DBNAME" -c "$SQL"
DROP PUBLICATION airbyte_publication;

verify

SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete
FROM pg_publication;

when you want to add CDC table automatically

(create database not working)

Step-1 grand more permission (RUN SQL!)

this script based on you made custom_admin_role and use user name airbyte_cdc

-- Connect to the postgres database as the RDS master user

-- Check if the role exists and create it if it doesn't
DO
$$
    BEGIN
        IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'custom_admin_role') THEN
            CREATE ROLE custom_admin_role;
        END IF;
    END
$$;

-- Grant necessary permissions to custom_admin_role
GRANT rds_superuser TO custom_admin_role; -- This is the closest to superuser in RDS
GRANT rds_replication TO custom_admin_role;

-- Grant permissions for event triggers
DO
$$
    BEGIN
        IF EXISTS (SELECT 1 FROM pg_proc WHERE proname = 'pg_create_event_trigger') THEN
            GRANT EXECUTE ON FUNCTION pg_create_event_trigger(text, text, text, text) TO custom_admin_role;
        END IF;
        IF EXISTS (SELECT 1 FROM pg_proc WHERE proname = 'pg_event_trigger_ddl_commands') THEN
            GRANT EXECUTE ON FUNCTION pg_event_trigger_ddl_commands() TO custom_admin_role;
        END IF;
    END
$$;

-- Grant permissions for dblink
CREATE EXTENSION IF NOT EXISTS dblink;
GRANT EXECUTE ON FUNCTION dblink_connect(text) TO custom_admin_role;
GRANT EXECUTE ON FUNCTION dblink_exec(text, text) TO custom_admin_role;

-- Grant additional useful permissions
GRANT CREATE ON DATABASE postgres TO custom_admin_role;
GRANT USAGE ON SCHEMA public TO custom_admin_role;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO custom_admin_role;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO custom_admin_role;
GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA public TO custom_admin_role;

-- Allow custom_admin_role to create roles and databases
ALTER ROLE custom_admin_role WITH CREATEROLE CREATEDB;

-- Grant the role to airbyte_cdc if not already done
GRANT custom_admin_role TO airbyte_cdc;

-- Ensure airbyte_cdc can set its role to custom_admin_role
ALTER ROLE airbyte_cdc SET role TO custom_admin_role;

Last updated

Was this helpful?