PostgreSQL 14 Streaming Replication on Oracle Cloud Infrastructure (OCI) VM

Introduction

PostgreSQL 14 is a powerful and feature-rich open-source relational database management system.

In this guide, we’ll walk through the process of installing and configuring PostgreSQL 14 on Oracle Cloud Infrastructure (OCI) with Oracle Linux 8. The setup includes one master node and two slave nodes, forming a streaming replication setup.

Note : The word slave and replica is used interchangeably in this article when referring to anything which is not a master node

OS– Oracle Linux 8

PostgreSQL Version – 14.10

1 Master Node – IP- 10.180.2.102

2 Slave Nodes – IPs- 10.180.2.152, 10.180.2.58

3 Node PostgreSQL 14 Cluster on OCI

You can create a DR architecture using streaming replication. Put 1 replica in the same region and 2 additional replicas in another OCI region.The VCN’s in both OCI regions have to be remotely peered using a DRG and all routes should permit the traffic over the different subnets and allow communication over port 5432. You can refer to this articleo n how to configure VCN remote peering on OCI: https://docs.oracle.com/en-us/iaas/Content/Network/Tasks/scenario_e.htm

4-Node Cross-Region PostgreSQL 14 Cluster on OCI

Step 1: Installing PostgreSQL 14 on Master and Slave Nodes

Start by updating the system and installing necessary dependencies on both the master and slave nodes:

sudo dnf update -y

sudo dnf module list postgresql

sudo yum -y install gnupg2 wget vim tar zlib openssl

sudo dnf install https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm

sudo yum -qy module disable postgresql

sudo yum install postgresql14-server -y

sudo yum install postgresql14-contrib -y

sudo systemctl enable postgresql-14

sudo postgresql-14-setup initdb

sudo systemctl start postgresql-14

sudo systemctl status postgresql-14

Step 2: Enabling Postgres User and Streaming Replication

Enable the Postgres user and configure streaming replication on both the master and slave nodes:

sudo -iu postgres

psql -c "ALTER USER postgres WITH PASSWORD 'RAbbithole1234#_';"

tree -L 1 /var/lib/pgsql/14/data

psql -U postgres -c 'SHOW config_file'

              config_file
----------------------------------------
 /var/lib/pgsql/14/data/postgresql.conf
(1 row)

Step 3: Configuring pg_hba.conf and Firewall Settings

Update the pg_hba.conf file on both the master and slave nodes to allow connections and adjust firewall settings:

sudo -iu postgres

vim /var/lib/pgsql/14/data/pg_hba.conf

# If ident is available in file then replace 'ident' with 'md5' or 'scram-sha-256'

# Change this line to allow all hosts 0.0.0.0/0 

# IPv4 local connections:
host    all             all             0.0.0.0/0               scram-sha-256

exit

sudo systemctl restart postgresql-14

#Whitelist Ports on Instance

sudo firewall-cmd --list-ports

sudo firewall-cmd --zone=public --permanent --add-port=5432/tcp

sudo firewall-cmd --reload

sudo firewall-cmd --list-ports

Step 4: Configuring Master Node for Streaming Replication

On the master node (10.180.2.102), configure streaming replication:

sudo -iu postgres

mkdir -p /var/lib/pgsql/14/data/archive

vim /var/lib/pgsql/14/data/postgresql.conf

## Uncomment and set below parameters
listen_addresses = '*'
archive_mode = on    # enables archiving; off, on, or always
archive_command = 'cp %p /var/lib/pgsql/14/data/archive/%f' 
max_wal_senders = 10            # max number of walsender processes
max_replication_slots = 10      # max number of replication slots
wal_keep_size = 50000           # Size of WAL in megabytes; 0 disables
wal_level = replica             # minimal, replica, or logical
wal_log_hints = on               # also do full page writes of non-critical updates

## Only set below if you want to create synchronous replication##
synchronous_commit = remote_apply
synchronous_standby_names = '*'

exit

sudo systemctl restart postgresql-14

netstat -an | grep 5432

Update pg_hba.conf on the master node:

sudo -iu postgres

vim /var/lib/pgsql/14/data/pg_hba.conf

#Add below entry to end of file

host    replication     all             10.180.2.152/32         scram-sha-256
host    replication     all             10.180.2.58/32         scram-sha-256

exit

sudo systemctl restart postgresql-14

Step 5: Configuring Slave Nodes for Streaming Replication

On the slave nodes (10.180.2.152 and 10.180.2.58), configure streaming replication:

sudo -iu postgres

mkdir -p /var/lib/pgsql/14/data/backup

vim /var/lib/pgsql/14/data/pg_hba.conf

exit

sudo systemctl restart postgresql-14

sudo chmod 0700 /var/lib/pgsql/14/data/backup

sudo -iu postgres

#Backup and Clone Database from Slave Node using IP of Master Node

pg_basebackup -D /var/lib/pgsql/14/data/backup -X fetch -p 5432 -U postgres -h 10.180.2.102 -R

cd /var/lib/pgsql/14/data/backup

cat postgresql.auto.conf

#Stop the Instance and Restart using Data in New location

/usr/pgsql-14/bin/pg_ctl stop

/usr/pgsql-14/bin/pg_ctl start -D /var/lib/pgsql/14/data/backup

waiting for server to start....2023-11-27 03:36:48.205 GMT [169621] LOG:  redirecting log output to logging collector process
2023-11-27 03:36:48.205 GMT [169621] HINT:  Future log output will appear in directory "log".
 done
server started

Step 6: Checking Replication Status from Slave Nodes

Check the status of streaming replication from slave nodes using psql:

psql -h localhost -p 5432 -U postgres -d postgres

postgres# select pg_is_wal_replay_paused();

 pg_is_wal_replay_paused
-------------------------
 f
(1 row)

Note - f means , recovery is running fine. t means it is stopped.



postgres# select * from pg_stat_wal_receiver;

-[ RECORD 1 ]---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
pid                   | 414090
status                | streaming
receive_start_lsn     | 0/A000000
receive_start_tli     | 1
written_lsn           | 0/A002240
flushed_lsn           | 0/A002240
received_tli          | 1
last_msg_send_time    | 2023-12-04 11:40:51.853918+00
last_msg_receipt_time | 2023-12-04 11:40:51.853988+00
latest_end_lsn        | 0/A002240
latest_end_time       | 2023-11-30 08:16:43.217865+00
slot_name             |
sender_host           | 10.180.2.102
sender_port           | 5432
conninfo              | user=postgres password=******** channel_binding=prefer dbname=replication host=10.180.2.102 port=5432 fallback_application_name=walreceiver sslmode=prefer sslcompression=0 sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=prefer krbsrvname=postgres target_session_attrs=any

Step 7: Checking Replication Status from Master Node

On the master node, check the status of replication:

psql -h localhost -p 5432 -U postgres -d postgres:

postgres# select * from pg_stat_replication;

-[ RECORD 1 ]----+------------------------------
pid              | 382513
usesysid         | 10
usename          | postgres
application_name | walreceiver
client_addr      | 10.180.2.152
client_hostname  |
client_port      | 47312
backend_start    | 2023-11-30 08:11:42.536364+00
backend_xmin     |
state            | streaming
sent_lsn         | 0/A002240
write_lsn        | 0/A002240
flush_lsn        | 0/A002240
replay_lsn       | 0/A002240
write_lag        |
flush_lag        |
replay_lag       |
sync_priority    | 0
sync_state       | async
reply_time       | 2023-12-04 11:43:12.033364+00

-[ RECORD 2 ]----+------------------------------
pid              | 382514
usesysid         | 10
usename          | postgres
application_name | walreceiver
client_addr      | 10.180.2.58
client_hostname  |
client_port      | 35294
backend_start    | 2023-11-30 08:11:42.542539+00
backend_xmin     |
state            | streaming
sent_lsn         | 0/A002240
write_lsn        | 0/A002240
flush_lsn        | 0/A002240
replay_lsn       | 0/A002240
write_lag        |
flush_lag        |
replay_lag       |
sync_priority    | 0
sync_state       | async
reply_time       | 2023-12-04 11:43:10.113253+00

Step 8: Additional Notes and References

To restart slave nodes, use the following commands:

/usr/pgsql-14/bin/pg_ctl stop

sudo rm -rf /var/lib/pgsql/14/data/backup/postmaster.pid

/usr/pgsql-14/bin/pg_ctl start -D /var/lib/pgsql/14/data/backup

  1. DBA Class
  2. Narasimman Tech
  3. PostgreSQL Continuous Archiving Documentation
  4. Stack Overflow
  5. Girders
  6. Kinsta

Follow this comprehensive guide to set up PostgreSQL 14 streaming replication on Oracle Cloud Infrastructure with Oracle Linux 8. Ensure high availability and robust backup capabilities for your PostgreSQL database

Amazon DynamoDB using awscli


Install latest version of aws-cli

sudo yum remove awscli

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"

unzip awscliv2.zip

sudo ./aws/install

/usr/local/bin/aws --version

Add in Bash Profile path /usr/local/bin

vim ~/.bash_profile

aws --version

aws configure

Create DynamoDB Table

aws dynamodb create-table \
--table-name CustomerRecords \
--attribute-definitions \
AttributeName=CustomerID,AttributeType=S \
AttributeName=RecordDate,AttributeType=S \
--key-schema \
AttributeName=CustomerID,KeyType=HASH \
AttributeName=RecordDate,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST

# Delete DynamoDB Table
aws dynamodb delete-table --table-name CustomerRecords

# Enable Point-in-Time-Recovery
aws dynamodb update-continuous-backups --table-name CustomerRecords --point-in-time-recovery-specification PointInTimeRecoveryEnabled=True

Load Records

import boto3
import faker
import sys

# Generate fake data
def generate_data(size):
fake = faker.Faker()
records = []
for _ in range(size):
record = {
'CustomerID': fake.uuid4(),
'RecordDate': fake.date(),
'Name': fake.name(),
'Age': fake.random_int(min=0, max=100),
'Gender': fake.random_element(elements=('Male', 'Female', 'Other')),
'Address': fake.sentence(),
'Description': fake.sentence(),
'OrderID': fake.uuid4()
}
records.append(record)
return records

def write_data_in_chunks(table_name, data, chunk_size):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
for i in range(0, len(data), chunk_size):
with table.batch_writer() as batch:
for record in data[i:i+chunk_size]:
batch.put_item(Item=record)
print(f"Successfully wrote {len(data)} records to {table_name} in chunks of {chunk_size}.")

if __name__ == "__main__":
table_name = 'CustomerRecords'
chunk_size = int(sys.argv[1]) if len(sys.argv) > 1 else 1000
data = generate_data(chunk_size)
write_data_in_chunks(table_name, data, chunk_size)
$ python3 load_to_dynamodb.py 1000

Calculate Unix Epoch time in milliseconds

date +%s
1710374718

Full export

aws dynamodb export-table-to-point-in-time \
--table-arn arn:aws:dynamodb:ap-southeast-2:11111111:table/CustomerRecords \
--s3-bucket customerrecords-dynamodb \
--s3-prefix exports/ \
--s3-sse-algorithm AES256
--export-time 1710374718

Incremental export, starting at the end time of the full export

aws dynamodb export-table-to-point-in-time \
--table-arn arn:aws:dynamodb:ap-southeast-2:11111111:table/CustomerRecords \
--s3-bucket customerrecords-dynamodb \
--s3-prefix exports_incremental/ \
--incremental-export-specification ExportFromTime=1710374718,ExportToTime=1710375760,ExportViewType=NEW_IMAGE \
--export-type INCREMENTAL_EXPORT

Important Note :

  1. ExportFromTime here is the finish time of the Full export and ExportToTime is the current datetime calculated using date +%s command
  2. Difference between export period from time and export period cannot be less than 15 minutes

Postgres 14 Sharding with Citus

Postgres sharding with Citus is designed to horizontally scale PostgreSQL across multiple nodes. Citus extends PostgreSQL by adding the ability to distribute tables and queries across a cluster of servers.

Tables are horizontally partitioned into smaller, manageable shards that reside on different nodes. Each node contains a subset of the data and Citus intelligently routes queries to the appropriate nodes.

Sharding architecture enhances both read and write scalability, makes it well-suited for applications with growing data volumes and demanding workloads.

________________________ Step by Step Instructions to Setup Postgres Sharding ______________________________

  1. Create OL8 or RHEL8 Instance and Run the below commands on all Nodes :

a. SSH into all the Instances and configure it as below :

sudo dnf module list postgresql

sudo yum -y install gnupg2 wget vim tar zlib openssl

sudo dnf install https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm

sudo yum -qy module disable postgresql

sudo yum install postgresql14-server -y

sudo yum install postgresql14-contrib -y

## Due to policies for Red Hat family distributions, the PostgreSQL installation will not be enabled for automatic start or have the database initialized automatically

sudo systemctl enable postgresql-14

sudo postgresql-14-setup initdb

sudo systemctl start postgresql-14

sudo systemctl status postgresql-14 

b. Enable Postgres user and set Super user password

sudo -iu postgres

psql -c "ALTER USER postgres WITH PASSWORD 'RAbbithole1234#_';"

exit

c. Install Citus community edition binary and Create the Extension

# Add Citus repository for package manager

curl https://install.citusdata.com/community/rpm.sh | sudo bash

sudo yum install -y citus121_14
#Preload Citus and pg_stat_statements extensions on all Nodes

sudo -iu postgres

psql -U postgres -c 'SHOW config_file'

              config_file
----------------------------------------
 /var/lib/pgsql/14/data/postgresql.conf
(1 row)

vim /var/lib/pgsql/14/data/postgresql.conf

## Add below entry and uncomment 'shared_preload_libraries'

shared_preload_libraries = 'citus,pg_stat_statements'

## Note that “citus” has to be the first extension in the list. Otherwise, the server won’t start.

exit

sudo systemctl restart postgresql-14

sudo systemctl status postgresql-14


# Enable auto-start of Postgres 14 server when the server reboots

sudo chkconfig postgresql-14 on

sudo -i -u postgres psql -c "CREATE EXTENSION citus;"
sudo -i -u postgres psql -c "CREATE EXTENSION pg_stat_statements;"

d. Configure connection and authentication

sudo -iu postgres

vim /var/lib/pgsql/14/data/postgresql.conf

# Uncomment listen_addresses and set it as below
listen_addresses = '*'

# Uncomment and change wal_level = 'logical'
wal_level = 'logical'
vim /var/lib/pgsql/14/data/pg_hba.conf

# Change this line to allow all hosts 10.180.2.0/24 with trust
## Important Note : 10.180.2.0/24 is the subnet in which the instances reside. The subnet should have egress and ingress for the Postgres port. Alternately instead of doing a password-less setup, you can also use pgpass file to store the password on all nodes and use the normal authentication method. ##

# IPv4 local connections:
host    all             all             10.180.2.0/24           trust

exit

sudo systemctl restart postgresql-14

sudo systemctl status postgresql-14

## Whitelist Postgres Port##

sudo firewall-cmd --list-ports
sudo firewall-cmd --zone=public --permanent --add-port=5432/tcp
sudo firewall-cmd --reload
sudo firewall-cmd --list-ports

I’ve created a small automation script to perform the above steps. Save it as a .sh file, change the parameters according to your Postgres, citus version and simply execute on all the nodes:

#!/bin/bash

# Function to print commands and exit on failure
function run_command() {
    echo "$ $1"
    eval $1
    if [ $? -ne 0 ]; then
        echo "Error executing command. Exiting."
        exit 1
    fi
}

# Step 1: Install Postgres 14 Server on all Nodes
run_command "sudo dnf module list postgresql"
run_command "sudo yum -y install gnupg2 wget vim tar zlib openssl"
run_command "sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm"
run_command "sudo yum -qy module disable postgresql"
run_command "sudo yum install postgresql14-server -y"
run_command "sudo yum install postgresql14-contrib -y"
run_command "sudo systemctl enable postgresql-14"

# Check if the data directory is empty
if [ -z "$(sudo -i -u postgres ls -A /var/lib/pgsql/14/data)" ]; then
    run_command "sudo postgresql-14-setup initdb"
else
    echo "Data directory is not empty. Skipping initialization."
fi

run_command "sudo systemctl start postgresql-14"
run_command "sudo chkconfig postgresql-14 on"

# Step 2: Enable Postgres user on all Nodes and set superuser password
run_command "sudo -i -u postgres psql -c \"ALTER USER postgres WITH PASSWORD 'YOurPassword1234#_';\""

# Step 3: Install Citus on all Nodes
run_command "curl https://install.citusdata.com/community/rpm.sh | sudo bash"
run_command "sudo yum install -y citus121_14"

# Step 4: Preload Citus and pg_stat_statements extensions on all Nodes
run_command "sudo -i -u postgres psql -U postgres -c 'SHOW config_file'"
run_command "sudo -i -u postgres sed -i -E 's/^#?(listen_addresses[ \t]*=[ \t]*).*/\1'\''*'\''/' /var/lib/pgsql/14/data/postgresql.conf"
run_command "sudo -i -u postgres sed -i -E 's/^#?(shared_preload_libraries[ \t]*=[ \t]*).*/\1'\''citus,pg_stat_statements'\''/' /var/lib/pgsql/14/data/postgresql.conf"
run_command "sudo -i -u postgres sed -i -E 's/^#?(wal_level[ \t]*=[ \t]*).*/\1'\''logical'\''/' /var/lib/pgsql/14/data/postgresql.conf"
run_command "sudo -i -u postgres sed -i -E '/^# IPv4 local connections:/ { n; s/^(host[ \t]*all[ \t]*all[ \t]*)127.0.0.1\/32[ \t]*scram-sha-256$/\10.0.0.0\/0           trust/ }' /var/lib/pgsql/14/data/pg_hba.conf"

# Step 5: Configure connection and authentication on all Nodes
run_command "sudo systemctl restart postgresql-14"
run_command "sudo firewall-cmd --list-ports"
run_command "sudo firewall-cmd --zone=public --permanent --add-port=5432/tcp"
run_command "sudo firewall-cmd --reload"
run_command "sudo firewall-cmd --list-ports"

# Step 6: Create Citus extension on all Nodes
run_command "sudo -i -u postgres psql -c \"CREATE EXTENSION citus;\""
run_command "sudo -i -u postgres psql -c \"CREATE EXTENSION pg_stat_statements;\""

echo "Script execution completed successfully."



2. Create Co-ordinator and Worker nodes

We have now prepared 3 instances for sharding in total. Step 1 should have been performed on all the below instances :

IP        HOSTNAME             ROLE
10.180.2.45     Postgres-Citus-Coordinator    Worker Node

10.180.2.198     Postgres-Citus-Worker-Node-1  Worker Node

10.180.2.86     Postgres-Citus-Worker-Node-2   Worker Node

Execute the below from the Co-ordinator node and run the below commands on the same node

ssh opc@10.180.2.222

# Add co-ordinator node
sudo -i -u postgres psql -c "SELECT citus_set_coordinator_host('10.180.2.45', 5432);"
 
# Add Worker Nodes
sudo -i -u postgres psql -c "SELECT * from citus_add_node('10.180.2.198', 5432);"
sudo -i -u postgres psql -c "SELECT * from citus_add_node('10.180.2.86', 5432);"


# Check Active Worker Nodes 
sudo -i -u postgres psql -c "SELECT * FROM citus_get_active_worker_nodes();"

  node_name   | node_port
--------------+-----------
 10.180.2.198 |      5432
 10.180.2.86  |      5432



3. Create a Distributed table

All steps below to be executed from Co-ordinator node :

CREATE TABLE orders (
order_id    bigserial, 
shard_key   int PRIMARY KEY, 
n           int, 
description char(100) DEFAULT 'x');

# Create Index to further optimize the SQL performance 
CREATE UNIQUE INDEX shard_key_idx on orders (shard_key);

# Add Distributed table
SELECT create_distributed_table('orders', 'shard_key');

\timing

# Generate 5 Million rows
INSERT INTO orders (shard_key, n, description)
SELECT 
    id AS shard_key,
    (random() * 1000000)::int AS n,
    'x' AS description
FROM generate_series(1, 5000000) AS id
ON CONFLICT DO NOTHING;


#Check the Size of the table using the Citus Table and not Standard Postgres comman
\x
SELECT * FROM citus_tables ;

# Check Explain plan of Query
\x
explain (analyze, buffers, timing) SELECT count(*) from orders;
explain (analyze, buffers, timing) SELECT count(*) from orders where shard_key=2 ;

4. Add another node by performing all commands in Step 1. and add it to the cluster

IP : 10.180.2.17

Run from the Co-ordinator node

sudo -i -u postgres psql -c "SELECT * from citus_add_node('10.180.2.17', 5432);"

sudo -i -u postgres psql -c "SELECT * FROM citus_get_active_worker_nodes();"

  node_name   | node_port
--------------+-----------
 10.180.2.198 |      5432
 10.180.2.86  |      5432
 10.180.2.17  |      5432
(3 rows)

# Add .pgpass file on co-ordinator node and add the DB details >> hostname:port:database:username:password
vim /var/lib/pgsql/.pgpass

localhost:5432:postgres:postgres:YOurPassword1234#_

chmod 600 .pgpass

# Re-balance the shards without downtime
psql -U postgres -h localhost

ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
SELECT citus_rebalance_start();

NOTICE:  Scheduled 10 moves as job 1
DETAIL:  Rebalance scheduled as background job
HINT:  To monitor progress, run: SELECT * FROM citus_rebalance_status();
 citus_rebalance_start
-----------------------
                     1

#Check Status of rebalancing
SELECT * FROM citus_rebalance_status();

      1 | running | rebalance | Rebalance all colocation groups | 2023-12-24 09:44:16.813663+00 |             | {"t
asks": [{"LSN": {"lag": null, "source": "0/371A5128", "target": null}, "size": {"source": "29 MB", "target": "26 MB
"}, "hosts": {"source": "10.180.2.198:5432", "target": "10.180.2.17:5432"}, "phase": "Catching Up", "state": "runni
ng", "command": "SELECT pg_catalog.citus_move_shard_placement(102012,2,4,'auto')", "message": "", "retried": 0, "ta
sk_id": 4}], "task_state_counts": {"done": 3, "blocked": 6, "running": 1}}
(1 row)

#Once completed the output will be as below :

SELECT * FROM citus_rebalance_status();

 job_id |  state   | job_type  |           description           |          started_at           |          finishe
d_at          |                     details
--------+----------+-----------+---------------------------------+-------------------------------+-----------------
--------------+--------------------------------------------------
      1 | finished | rebalance | Rebalance all colocation groups | 2023-12-24 09:44:16.813663+00 | 2023-12-24 10:18
:24.886163+00 | {"tasks": [], "task_state_counts": {"done": 10}}

# Check the Shard views
SELECT * from pg_dist_shard;
SELECT * FROM citus_shards;

#Misc rebalancing SQL queries
select get_rebalance_table_shards_plan();
SELECT citus_set_default_rebalance_strategy('by_disk_size');
SELECT * from citus_remote_connection_stats();


Enable pg_stat_statements extension on Postgres 14


postgres=# SELECT * FROM pg_stat_statements;


postgres=# select * From pg_available_extensions where name ilike 'pg_stat_statements';

        name        | default_version | installed_version |                                comment

--------------------+-----------------+-------------------+--------------------------------------------------------
----------------
 pg_stat_statements | 1.9             |                   | track planning and execution statistics of all SQL stat
ements executed
(1 row)

    
postgres=# SHOW shared_preload_libraries;

shared_preload_libraries
--------------------------

(1 row)


postgres=# CREATE EXTENSION pg_stat_statements;
CREATE EXTENSION


postgres=# \d pg_stat_statements



postgres=# SELECT *
	 			FROM pg_available_extensions
	 		WHERE
	    		 name = 'pg_stat_statements' and
	   		  installed_version is not null;
	   		  
	   		  
        name        | default_version | installed_version |                                comment

--------------------+-----------------+-------------------+--------------------------------------------------------
----------------
 pg_stat_statements | 1.9             | 1.9               | track planning and execution statistics of all SQL stat
ements executed
(1 row)


postgres=# alter system set shared_preload_libraries='pg_stat_statements';
ALTER SYSTEM



postgres=# select * from pg_file_Settings where name='shared_preload_libraries';


                 sourcefile                  | sourceline | seqno |           name           |      setting       |
 applied |            error
---------------------------------------------+------------+-------+--------------------------+--------------------+
---------+------------------------------
 /var/lib/pgsql/14/data/postgresql.auto.conf |          3 |    30 | shared_preload_libraries | pg_stat_statements |
 f       | setting could not be applied
(1 row)



postgres=# exit

##Restart the Instance

sudo systemctl restart postgresql-14

sudo -iu postgres
 
 
psql -h localhost -p 5432 -U postgres -d postgres


postgres=# SELECT * FROM pg_stat_statements;

 userid | dbid | toplevel | queryid | query | plans | total_plan_time | min_plan_time | max_plan_time | mean_plan_t
ime | stddev_plan_time | calls | total_exec_time | min_exec_time | max_exec_time | mean_exec_time | stddev_exec_tim
e | rows | shared_blks_hit | shared_blks_read | shared_blks_dirtied | shared_blks_written | local_blks_hit | local_
blks_read | local_blks_dirtied | local_blks_written | temp_blks_read | temp_blks_written | blk_read_time | blk_writ
e_time | wal_records | wal_fpi | wal_bytes
--------+------+----------+---------+-------+-------+-----------------+---------------+---------------+------------
----+------------------+-------+-----------------+---------------+---------------+----------------+----------------
--+------+-----------------+------------------+---------------------+---------------------+----------------+-------
----------+--------------------+--------------------+----------------+-------------------+---------------+---------
-------+-------------+---------+-----------
(0 rows)

Tracking Changes in Your PostgreSQL Tables: Implementing a Custom Change Data Capture (CDC)

Introduction:
Change Data Capture (CDC) is a technique used to track changes in a database, such as inserts, updates, and deletes. In this blog post, we will show you how to implement a custom CDC in PostgreSQL to track changes in your database. By using a custom CDC, you can keep a record of changes in your database and use that information in your applications, such as to provide a history of changes, track auditing information, or trigger updates in other systems

Implementing a Custom CDC in PostgreSQL:
To implement a custom CDC in PostgreSQL, you will need to create a new table to store the change information, create a trigger function that will be executed whenever a change is made in the target table, and create a trigger that will call the trigger function. The trigger function will insert a new row into the change table with the relevant information, such as the old and new values of the record, the time of the change, and any other relevant information.

To demonstrate this, we will show you an example of a custom CDC for a table called “employee”. The change table will be called “employee_cdc” and will contain columns for the timestamp, employee ID, old values, and new values of the employee record. The trigger function will be executed after an update on the “employee” table and will insert a new row into the “employee_cdc” table with the relevant information. Finally, we will show you how to query the “employee_cdc” table to retrieve a list of all changes that have occurred in the “employee” table since a certain timestamp.

  1. Create the Employee and CDC table

    To store the CDC information, you need to create a new table in your PostgreSQL database. In this example, we will create a table called “employee”, “employee_cdc”, “employee_audit” with the following columns:

CREATE TABLE employee (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(50) NOT NULL,
salary NUMERIC(10,2) NOT NULL,
hire_date DATE NOT NULL
);

CREATE TABLE employee_cdc (
timestamp TIMESTAMP DEFAULT now(),
employee_id INTEGER,
old_values JSONB,
new_values JSONB
);

In this table, “id” is an auto-incremented primary key, “timestamp” is a timestamp with time zone to store the time of the change, “employee_id” is the primary key of the employee record that was changed, and “old_values” and “new_values” are text columns to store the old and new values of the employee record, respectively.

2. Create the Audit table

CREATE TABLE employee_audit (
audit_timestamp TIMESTAMP DEFAULT now(),
employee_id INTEGER,
old_values JSONB,
new_values JSONB
);

3. Create the trigger function

To capture the changes in the employee table, you will need to create a trigger function that will be executed whenever a record is inserted, updated, or deleted in the table. The trigger function will insert a new row into the “employee_cdc” table with the relevant information. Here is an example trigger function:

CREATE OR REPLACE FUNCTION employee_cdc() RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'UPDATE') THEN
INSERT INTO employee_cdc (timestamp, employee_id, old_values, new_values)
VALUES (now(), NEW.id, row_to_json(OLD), row_to_json(NEW));
INSERT INTO employee_audit (employee_id, old_values, new_values)
VALUES (NEW.id, row_to_json(OLD), row_to_json(NEW));
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

This trigger function uses the “row_to_json” function to convert the old and new values of the employee record into JSON strings, which are then stored in the “old_values” and “new_values” columns of the “employee_cdc” table. The “NOW()” function is used to get the current timestamp.

4. Create the trigger

Now that the trigger function has been created, you need to create the trigger on the “employee” table that will call the function whenever a record is updated. You can create the trigger with the following command:

CREATE TRIGGER employee_cdc_trigger
AFTER UPDATE ON employee
FOR EACH ROW
EXECUTE FUNCTION employee_cdc();

4. Query the CDC table

In your application code, you can query the “employee_cdc” table to get a list of all changes that have occurred since a certain timestamp. For example, to get all changes since January 1st, 2023, you can use the following SQL query:

SELECT * FROM employee_cdc
WHERE timestamp >= '2023-01-01 00:00:00';

You can then process these changes as needed in your application code.

Conclusion:
In this blog post, we have shown you how to implement a custom Change Data Capture (CDC) in PostgreSQL to track changes in your database. By using a custom CDC, you can keep a record of changes in your database and use that information in your applications. Whether you are tracking changes for auditing purposes, providing a history of changes, or triggering updates in other systems, a custom CDC is a useful tool to have in your PostgreSQL toolkit.

Real-time Data Replication from Amazon RDS to Oracle Autonomous Database using OCI GoldenGate

Article first appeared here

Introduction

Goldengate Microservices 21c is the latest version of the microservices architecture which makes creating data mesh and data fabric across different public clouds as easy as a few clicks. Goldengate is available on OCI as a fully managed service with auto-scaling. It does not.require installation of Goldengate software at either the source or Target db instances. Goldengate uses a capture and apply mechanism for replication using trail files. Both the extract (capture) and replicat (apply) processes run on the Goldengate replication instance which acts as a hub.

Let us go ahead and create a data pipeline for replicating Data in real-time using Oracle Cloud Infrastructure (OCI) Goldengate 21c from Amazon RDS Oracle Instance to an Oracle Autonomous database in OCI. Below are some of the common use cases for this solution :

Use Cases

  1. Cross-cloud replication of Oracle Database from AWS RDS to OCI
  2. Migration of Oracle Database with Zero Downtime from AWS RDS to OCI
  3. Creating Multi-Cloud Microservices Application with Oracle database as the persistent data store
  4. Creating a Multi-cloud Data Mesh for Oracle Database

Architecture

Source : Amazon RDS Oracle 19c EE

Target : OCI Autonomous Transaction Processing 19c

Replication Hub : OCI Goldengate 21c Microservices

Network : Site-to Site IPsec VPN or Fastconnect (Direct Connect on AWS)

The solution is broadly divided into four phases :

  1. Setup of RDS Instance and Preparing Source for Goldengate replication
  2. Setup of OCI Autonomous Database and Preparing Target for Goldengate Replication
  3. Deployment of OCI Goldengate and Creation of Deployment and Register Source and Target Databases
  4. Create Extract (Capture) and Replicate (Apply) process on OCI Goldengate

Phase 1 — AWS Setup : RDS Source and Enable Goldengate Capture

The first part of the setup requires us to provision a VPC, Subnet Group and Oracle 19c RDS Instance on AWS. Please ensure all the requistie Network constructs like security groups are in place for connectivity from OCI Goldengate to RDS. In a production scenario it would be betetr to have the RDS instance without a public endpoint and have a Fastconnect setup from AWS to OCI

  1. Create a VPC and RDS Subnet Group

2. Create RDS Oracle Instance 19.1 EE with super user as ‘admin’

3. Create a new DB Parameter Group for 19.1 EE with parameter ENABLE_GOLDENGATE_REPLICATION set to TRUE

4. Change the parameter group of the RDS instance and reboot the RDS Oracle instance once the parameter group has been applied. Double-check to confirm the parameter ENABLE_GOLDENGATE_REPLICATION is set to True and the correct parameter group is applied to the RDS isntance

5. Set the log retention period on the source DB with ‘admin’ user

exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours',24);commit;





6. Create a new user account to be used for Goldengateon the RDS instance with ‘admin’ user

CREATE TABLESPACE administrator;

CREATE USER oggadm1 IDENTIFIED BY “*********” DEFAULT TABLESPACE ADMINISTRATOR TEMPORARY TABLESPACE TEMP;

commit;

7. Grant account privileges on the source RDS instance with ‘admin’ user

GRANT CREATE SESSION, ALTER SESSION TO oggadm1;

GRANT RESOURCE TO oggadm1;

GRANT SELECT ANY DICTIONARY TO oggadm1;

GRANT FLASHBACK ANY TABLE TO oggadm1;

GRANT SELECT ANY TABLE TO oggadm1;

GRANT SELECT_CATALOG_ROLE TO admin WITH ADMIN OPTION;

exec rdsadmin.rdsadmin_util.grant_sys_object (‘DBA_CLUSTERS’, ‘OGGADM1’);

exec rdsadmin.rdsadmin_util.grant_sys_object (‘DBA_CLUSTERS’, ‘ADMIN’);

GRANT EXECUTE ON DBMS_FLASHBACK TO oggadm1;

GRANT SELECT ON SYS.V_$DATABASE TO oggadm1;

GRANT ALTER ANY TABLE TO oggadm1;

grant unlimited tablespace TO oggadm1;

grant EXECUTE_CATALOG_ROLE to admin WITH ADMIN OPTION;

commit;

8. Finally, grant the privileges needed by a user account to be a GoldenGate administrator. The package that you use to perform the grant, dbms_goldengate_auth or rdsadmin_dbms_goldengate_auth, depends on the Oracle DB engine version.

— With admin user on RDS Oracle instance for Oracle Database version lower than 12.2 —

exec dbms_goldengate_auth.grant_admin_privilege (grantee=>’OGGADM1′,privilege_type=>’capture’,grant_select_privileges=>true, do_grants=>TRUE);

exec dbms_goldengate_auth.grant_admin_privilege(‘OGGADM1′,container=>’all’);

exec dbms_goldengate_auth.grant_admin_privilege(‘OGGADM1’);

commit;

— For Oracle DB versions that are later than or equal to Oracle Database 12c Release 2 (12.2), which requires patch level 12.2.0.1.ru-2019–04.rur-2019–04.r1 or later, run the following PL/SQL program.

exec rdsadmin.rdsadmin_dbms_goldengate_auth.grant_admin_privilege (grantee=>’OGGADM1′, privilege_type=>’capture’,grant_select_privileges=>true, do_grants=>TRUE);

commit;

To revoke privileges, use the procedure revoke_admin_privilege in the same package.

9. TNS entry for AWS RDS Instance

OGGTARGET=(DESCRIPTION=(ENABLE=BROKEN)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=orcl.*****.ap-southeast-2.rds.amazonaws.com)(PORT=1521)))(CONNECT_DATA=(SID=ORCL)))– To be added to Registered Database in OCI –(DESCRIPTION=(ENABLE=BROKEN)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=orcl.****.ap-southeast-2.rds.amazonaws.com)(PORT=1521)))(CONNECT_DATA=(SID=ORCL)))

Alias (to be used later in OCI GG configuration) : ORCLAWS

10. Create Test Table in RDS Oracle Instance

CREATE TABLE oggadm1.test (id number,name varchar2(100));

insert into oggadm1.test values (1,’Shadab’);

insert into oggadm1.test values (2,’Mohammad’);

commit;

11. Enable supplemental logging on with Admin user

Ref :https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.Oracle.CommonDBATasks.Log.html#Appendix.Oracle.CommonDBATasks.SettingForceLogging

— Enable Force logging —

EXEC rdsadmin.rdsadmin_util.force_logging(p_enable => true);

— Enable Supplemental logging —

begin rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => ‘ADD’);

end;

— Enable Force logging —

EXEC rdsadmin.rdsadmin_util.force_logging(p_enable => true);

— Enable Supplemental logging —

begin rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => ‘ADD’);

end;

— Enable Force logging —

EXEC rdsadmin.rdsadmin_util.force_logging(p_enable => true);

— Enable Supplemental logging —

begin rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => ‘ADD’);

end;

/

Phase 2 — OCI Setup : Autonomous Database

We will provision the VCN, Autonomous Database on OCI and enable the goldengate replication user

  1. Create VCN

2. Create Autonomous Transaction Processing Database with Network Options and mTLS not required

3. Unlock ggadmin user in the ATP

                           alter user ggadmin identified by ****** account unlock;

4. Create Table ‘test’ in admin schema and do initial load (Normally this has to be done using data pump but it is beyond the scope of this article)

CREATE TABLE test (id number,name varchar2(100));

insert into test values (1,’Shadab’);

insert into test values (2,’Mohammad’);

commit;

select * from test;

Phase 3 — OCI Setup : Goldengate

  1. Go to OCI Console Go to Oracle Database > Goldengate > Deployments > Create Deployment

2. Go to Oracle Database > Goldengate > Registered Databases

a. Add the ATP database created above with the ggadmin user

b. Add the RDS instance database using oggadm1 user

3. Test the connectivity to both databases , it should in console as Active

4. Go the launch URL for the Goldengate deployment username and password as per step 1.

                         eg : https://e*******q.deployment.goldengate.ap-sydney-1.oci.oraclecloud.com/

Phase 4 — Create , Extract (Capture) and Replicat (Apply) and Start the Replication

1. Create an Integrated Extract from Administration Service, click on the plus symbol next to the extract section

Go to Main Page > Configuration > Login to AWS RDS instance

a. Create Checkpoint table oggadm1.ckpt

b. Add Tran Data for Schema oggadm1

EXTRACT AWSEXT

USERIDALIAS ORCLAWS DOMAIN OracleGoldenGate

EXTTRAIL AW

TABLE OGGADM1.*;

2. Create Non-integrated replicat for ADB on trail file ‘aw’. click on the plus symbol next to the Replicat section

Go to Main Page > Configuration > Login to ATP instance

a. Create Checkpoint table admin.ckpt

b. Add Tran Data for Schema admin

c. Add heartbeat table

REPLICAT adbrep

USERIDALIAS FundsInsight DOMAIN OracleGoldenGate

MAP OGGADM1.TEST, TARGET ADMIN.TEST;

The status should be green on the OCI Goldengate Administration Dashboard

3. Insert transaction at RDS source

                            insert into oggadm1.test values(3,'Utuhengal');commit;

4. Check at ADB Target

                            select * from test;

Conclusion:

We have created cross-cloud replication from an Oracle Database sitting inside AWS to an Oracle Autonomous Database running on OCI. The idea was to demonstrate the capability and ease of Goldengate Microservices to run a a replication hub on OCI and let you create real-time change data capture across two different public clouds. Every component used in this architecture is a fully managed service without the need of managing any servers or installing any agents on either source or target as they are fully managed cloud services without access to under-lying host.

References:

  1. Setup of Goldengate for RDS : https://jinyuwang.weebly.com/cloud-service/how-to-capture-data-from-oracle-database-on-aws-rds-with-oracle-goldengate
  2. Goldengate Setup for RDS Source :https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.OracleGoldenGate.html#Appendix.OracleGoldenGate.rds-source-ec2-hub
  3. RDS Common Tasks :https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.Oracle.CommonDBATasks.Log.html
  4. OCI Goldengate Database Registration : https://docs.oracle.com/en/cloud/paas/goldengate-service/using/database-registrations.html#GUID-899B90FF-DF9A-481D-A531-BB9D25005EB9
  5. Apex Livelab for OCI Goldengate Microservices 21c :https://apexapps.oracle.com/pls/apex/dbpm/r/livelabs/workshop-attendee-2?p210_workshop_id=797&p210_type=3&session=113817274271778
  6. OCI Goldengate Blog : https://blogs.oracle.com/dataintegration/post/new-oci-goldengate-service-is-first-of-any-major-cloud-provider-to-deliver-operational-and-analytic-integration-into-a-single-data-fabric
  7. Getting Started with Goldengate : https://docs.oracle.com/goldengate/c1230/gg-winux/GGCON/getting-started-oracle-goldengate.htm#GGCON-GUID-61088509-F951-4737-AE06-29DAEAD01C0C

Backup and Restore PostgreSQL with Few Easy Shell Scripts

PostgreSQL is the most popular Open source database and there is a lot of information available when it comes to backing up and restoring PgSQL I have used these scripts to backup production databases and restored them to new Postgres Servers. So here it goes

Backup PostgreSQL Database – Backup_Pgsql.sh

#!/bin/bash
hostname=`hostname`
# Dump DBs
  date=`date +"%Y%m%d_%H%M%N"`
  backupdir='/home/opc'
  dbname='demo'
  filename="$backupdir/${hostname}_${dbname}_${date}"
 pg_dump -U postgres --encoding utf8 -F c -f $filename.dump $dbname

Restore PostgreSQL Database – Restore_Pgsql.sh

#!/bin/bash
# Restore DB
filename='/home/opc/pgimportmaster-demo-20211129_1013.dump'
  pg_restore -U postgres -d demo -c < ./$1
exit 0

Usage for Restore

$ ./Restore_Pgsql.sh pgimportmaster-demo-20211129_1013.dump

AWS Lambda Function to Load Data from ‘Requestor Pays’ S3 bucket in One Account to Redshift Cluster in Another Account

Part A : Create Redshift Spectrum Cross-Account Access for S3 


Company Account A: Redshift Cluster Account: 24xxxxxx16
Role: RoleA

Company Account B: S3 Bucket Account: 8xxxxxxxx11
Role: RoleB
Bucket Name (Create with Option “Requestor Pays”): s3://shadmha-us-east-2

Use Case:  1. Read Data from S3 Bucket in different account into Spectrum Table
                   2. Unload Data from Redshift Cluster to S3 bucket in different account

======================================================

Step 1: In Redshift Cluster Account 24xxxxxx16, do this

a)    Go to IAM > Roles > Create Role
b)    Create Role > Redshift > Redshift – Customizable.
c)    No need to add policies or tags, go ahead and save this role as “RoleA”
d)    Add this role to your Redshift cluster. Goto Redshift Console > Select Cluster > Manage IAM > Add “RoleA” to Cluster

Step 2: In account which has the S3 Bucket Account 8xxxxxxxx11, do this: 

a)    Go to IAM > Policies > Create policy
b)    Select the JSON tab and add below IAM policy, replace my bucket name ‘shadmha-us-east-2’ with your bucket name

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {
            “Effect”: “Allow”,
            “Action”: [
                “s3:GetBucketLocation”,
                “s3:GetObject”,
                “s3:PutObject”,
                “s3:ListMultipartUploadParts”,
                “s3:ListBucket”,
                “s3:ListBucketMultipartUploads”
            ],
            “Resource”: [
                “arn:aws:s3:::shadmha-us-east-2”,
                “arn:aws:s3:::shadmha-us-east-2/*”
            ]
        },
        {
            “Effect”: “Allow”,
            “Action”: [
                “glue:CreateDatabase”,
                “glue:DeleteDatabase”,
                “glue:GetDatabase”,
                “glue:GetDatabases”,
                “glue:UpdateDatabase”,
                “glue:CreateTable”,
                “glue:DeleteTable”,
                “glue:BatchDeleteTable”,
                “glue:UpdateTable”,
                “glue:GetTable”,
                “glue:GetTables”,
                “glue:BatchCreatePartition”,
                “glue:CreatePartition”,
                “glue:DeletePartition”,
                “glue:BatchDeletePartition”,
                “glue:UpdatePartition”,
                “glue:GetPartition”,
                “glue:GetPartitions”,
                “glue:BatchGetPartition”
            ],
            “Resource”: “*”
        }
    ]
}

Chose Review Policy & Save the policy as let’s say ‘s3-cross-account-policy’

c)    Go to Roles > Create Role > Select type of trusted entity as ‘Another AWS Account’ tab
d)    Enter Account ID of Redshift Cluster Account ‘24xxxxxx16’ > Permissions > Search policy created in a)  “s3-cross-account-policy’
e)    Go next > create role > save it as “RoleB”
f)    Go to Roles > Select “RoleB” > “Trust Relationships” tab > Edit trust telationships. Add the below policy:

{
  “Version”: “2012-10-17”,
  “Statement”: [
    {
      “Effect”: “Allow”,
      “Principal”: {
        “AWS”: “arn:aws:iam::24xxxxxx16:root”
      },
      “Action”: “sts:AssumeRole”,
      “Condition”: {}
    }
  ]
}

Update the trust policy

Step 3: Go back to Account under which Redshift Cluster is created

a)    Go to IAM > Roles > Select role which you created earlier “RoleA”
b)    Add inline policy to this role and add the below policy and save it

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {
            “Sid”: “Stmt1487639602000”,
            “Effect”: “Allow”,
            “Action”: [
                “sts:AssumeRole”
            ],
            “Resource”: “arn:aws:iam::80xxxxx11:role/RoleB”
        }
    ]
}
c)    Create policy and Save it to role

Part B: Deploy a Lambda Function Using Attached Code(S3-to-Redshift.zip). And Change Your Cluster and Bucket Details Accordingly

Add a Cloud Watch Event Trigger with Cron Expression : cron(0 2 ? * FRI *)

Increase Timeout & Memory of Lambda Function

Configure Test Event

Execute the Lambda Function to Test

Python Code for Lambda Function

#######################################################################################
# Author         :      Shadab Mohammad
# Create Date    :      13-05-2019
# Modified Date  :      26-09-2019
# Name           :      Load Dataset from AWS S3 bucket to your Redshift Cluster
# Dependencies   :      Requires Python 3.6+. Python Libraries required ‘psycopg2’
#######################################################################################
import psycopg2
import csv
import time
import sys
import os
import datetime
from datetime import date
datetime_object = datetime.datetime.now()


print (“###### Load Data From S3 to Redshift ######”)
print (“”)
print (“Start TimeStamp”)
print (“—————“)
print(datetime_object)
print (“”)

def lambda_handler(event, context):
        #Obtaining the connection to RedShift
    con=psycopg2.connect(dbname= ‘testdb’, host=’shadmha-us-east-2.crhzd8dtwytq.us-east-2.redshift.amazonaws.com’, port= ‘5439’, user= ‘awsuser’, password= ‘SomeP@ssword’)

    copy_command_1=”copy connection_log from ‘s3://shadmha-us-east-2/cross-acct-test/connection_events.csv’ delimiter ‘,’ csv iam_role ‘arn:aws:iam::241135536116:role/RoleA,arn:aws:iam::804739925711:role/RoleB’ ignoreheader 1;”

    #Opening a cursor and run truncate query
    cur = con.cursor()
    query= f”’
    DROP TABLE IF EXISTS connection_log CASCADE;

    CREATE TABLE connection_log(
    username varchar(50),
    event varchar(50),
    count int8);
    COMMIT;”’
    cur.execute(query)
    con.commit()

    #Opening a cursor and run copy query
    cur.execute(copy_command_1)
    con.commit()
    #Close the cursor and the connection
    cur.close()
    con.close()

    # Progress Bar Code Ends here

    datetime_object_2 = datetime.datetime.now()
    print (“End TimeStamp”)
    print (“————-“)
    print (datetime_object_2)
    print (“”)

Lambda Function Code : https://github.com/shadabshaukat/serverless/blob/98f42c7867d6eb4d9e602d2b703764ad891fdfed/S3-to-Redshift.zip

Redshift Health-Check SQL Queries

-- Query Performance Review --

$ psql -h redshift-private-2a.c2nh0wlf4z7g.ap-southeast-2.redshift.amazonaws.com -p 5439 -U awsuser -f review_query_pf.sql testdb

$ vim review_query_pf.sql


\o redshiftxxx.txt
\set vpattern 1678
\qecho -- Query Text - stl_explain
select * from stl_querytext where query = :vpattern;
\qecho -- Explain plan - stl_explain
select userid,query,nodeid,parentid,trim(plannode) plannode,trim(info) info from stl_explain where query = :vpattern;
\qecho --Review WLM Queuing for above queries - stl_wlm_query
SELECT TRIM(DATABASE) AS DB,
       w.query,
       SUBSTRING(q.querytxt,1,100) AS querytxt,
       w.queue_start_time,
       w.service_class AS class,
       w.slot_count AS slots,
       w.total_queue_time / 1000000 AS queue_seconds,
       w.total_exec_time / 1000000 exec_seconds,
       (w.total_queue_time + w.total_exec_time) / 1000000 AS total_seconds
FROM stl_wlm_query w
  LEFT JOIN stl_query q
         ON q.query = w.query
        AND q.userid = w.userid
WHERE w.query = :vpattern
--AND w.total_queue_time > 0
ORDER BY w.total_queue_time DESC,
         w.queue_start_time DESC;
\qecho --Get information about commit stats - stl_commit_stats
select startqueue,node, datediff(ms,startqueue,startwork) as queue_time, datediff(ms, startwork, endtime) as commit_time, queuelen
from stl_commit_stats
where xid in (select xid from stl_querytext where query = :vpattern)
order by queuelen desc , queue_time desc;
\qecho --Compile Time
select userid, xid,  pid, query, segment, locus,
datediff(ms, starttime, endtime) as duration, compile
from svl_compile
where query = :vpattern;
--\qecho --Understand other operations within the same PID - svl_statementtext
--select userid,xid,pid,label,starttime,endtime,sequence,type,trim(text) from svl_statementtext where pid in (select pid from stl_querytext where query = :vpattern);
\qecho --Review query work - STL_PLAN_INFO
select * from STL_PLAN_INFO where query = :vpattern;
\qecho --Review query work - svl_query_report
select * from svl_query_report where query = :vpattern order by segment,step,slice;
\qecho --Review query work - svl_query_summary
select * from svl_query_summary where query = :vpattern order by seg,step;
\qecho -- Review alert
select * from stl_alert_event_log where query = :vpattern;
\qecho -- Review STL_ERROR
select userid,process,recordtime,pid,errcode,trim(file),linenum,trim(context),trim(error) from stl_error where recordtime between (select starttime from stl_query where query = :vpattern) and (select endtime from stl_query where query = :vpattern);
\q

Create DMS Replication From MongoDB 4.2 on EC2 Linux to Redshift

Create DMS Replication From MongoDB 4.2 on EC2 Linux to Redshift

Summary

We will create a hub to spoke replication from Mongo DB 4.2 Database to Redshift Schema. MongoDb is installed in the same VPC as Redshift and DMS replication Instance

Main Text

MongoDB is a NoSQL Datastore where data in inserted and read in JSON format. Internally, a MongoDB document is stored as a binary JSON (BSON) file in a compressed format. Terminology for database, schema and tables in MongoDB is a bit different when compared to relational databases. Here’s how each jargon in MongoDb compares to one in Redshift/PostgreSQL

MongoDBRedshift/PostgreSQL
DatabaseSchema
CollectionTable
DocumentsRecords/Rows

MongoDB does not have a a proper structure for a schema and it is is essentially a document store so data can be inserted without defining a proper schema or a structure. This gives MongoDB a lot of flexibility and hence it is the preferred choice for modern applications as you can start development of your application without defining a data model first. It is a schema-less approach to software architecture, which has its own pros and cons.

In our example we will work through the default database in MongoDB called “admin” and create collections(tables) in it and those tables will be replicated to an equivalent schema in Redshift called “admin” which will hold the different tables. We will go about this in 3 stages:

Stage 1 : Create MongoDB on EC2 AMZN Linux, create the collections and connect to MongoDB from your client machine and check the configuration.

Stage 2: Create Redshift Cluster, Aurora PostgreSQL in Private and Public Subnet and Connect to the All Database instances and check.

Stage 3: Create DMS Replication Instance, DMS Replication Endpoints & DMS Replication Tasks.And finally we will check if all data is being replicated by DMS to all the targets.

Architecture : It is a Hub-to-Spoke Architecture with MongoDB Source Being Replicated to Multiple Heterogeneous Targets. However in this Article we will only configure Replication from MongoDB to Redshift.

Stage 1: Create a EC2 Amazon Linux in a Public Subnet and Install MongoDB in it

You can use AWS ‘MongoDB Quick Start’ guide on AWS to deploy MongoDB into a new VPC or deploy into an existing VPC. The guide has two cloud formation templates which can create a new VPC under your account, configure the public & private subnets and launch the EC2 instances with latest version of MongoDB installed. Check this link for the quick deployment options for MongoDB on AWS : https://docs.aws.amazon.com/quickstart/latest/mongodb/step2.html

OR

If you want to install MongoDB manually then you can follow the below procedure :

1. Create Amazon Linux EC2 Instance

2. Add MongoDB Repo and Install MongoDB on AMZN Linux

$ sudo vi /etc/yum.repos.d/mongodb-org-4.0.repo


[mongodb-org-4.0]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/testing/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-4.2.asc

$ sudo yum -y install mongodb-org
$ sudo service mongod start
$ sudo cat /var/log/mongodb/mongod.log


3 . Create Root login

 $ mongo localhost/admin
 > use admin
 > db.createUser( { user:"root", pwd:"root123", roles: [ { role: "root", db: "admin" } ] } )
 > exit

 
4. Modify mongod configuration file /etc/mongod.conf using vi editor
 
     Change below lines from

    # network interfaces
    net:
      port: 27017
      bindIp: 127.0.0.1  # Listen to local interface only, comment to listen on all interfaces.
    #security:

    To

# network interfaces
net:
  port: 27017
  bindIp: 0.0.0.0  # Listen to local interface only, comment to listen on all interfaces.
security:
  authorization: enabled
 
5. Restart mongod service
 

 $ sudo service mongod restart

Use admin DB as authentication database and ‘root’ user can be used for CDC full load task.

6. For CDC replication, a rmongodb replica needs to be setup and permissions need to be modified/added as below :

Modify mongod.conf using vi editor

$ sudo vi /etc/mongod.conf


replication:
  replSetName: rs0

$ sudo service mongod restart


7. Initiate Replica Set for CDC

$ mongo localhost/admin -u root -p
> rs.status()


{
    “ok” : 0,
    “errmsg” : “no replset config has been received”,
    “code” : 94,
    “codeName” : “NotYetInitialized”
}

> rs.initiate()


{
    “info2” : “no configuration specified. Using a default configuration for the set”,
    “me” : “ip-10-0-137-99.ap-southeast-2.compute.internal:27017”,
    “ok” : 1
}

rs0:SECONDARY> rs.status()

{
    “set” : “rs0”,
    “date” : ISODate(“2019-12-31T05:02:22.431Z”),
    “myState” : 1,
    “term” : NumberLong(1),
    “syncingTo” : “”,
    “syncSourceHost” : “”,
    “syncSourceId” : -1,
    “heartbeatIntervalMillis” : NumberLong(2000),
    “majorityVoteCount” : 1,
    “writeMajorityCount” : 1,
    “optimes” : {
        “lastCommittedOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “lastCommittedWallTime” : ISODate(“2019-12-31T05:02:08.362Z”),
        “readConcernMajorityOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “readConcernMajorityWallTime” : ISODate(“2019-12-31T05:02:08.362Z”),
        “appliedOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “durableOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “lastAppliedWallTime” : ISODate(“2019-12-31T05:02:08.362Z”),
        “lastDurableWallTime” : ISODate(“2019-12-31T05:02:08.362Z”)
    },
    “lastStableRecoveryTimestamp” : Timestamp(1577768528, 4),
    “lastStableCheckpointTimestamp” : Timestamp(1577768528, 4),
    “electionCandidateMetrics” : {
        “lastElectionReason” : “electionTimeout”,
        “lastElectionDate” : ISODate(“2019-12-31T05:02:07.346Z”),
        “electionTerm” : NumberLong(1),
        “lastCommittedOpTimeAtElection” : {
            “ts” : Timestamp(0, 0),
            “t” : NumberLong(-1)
        },
        “lastSeenOpTimeAtElection” : {
            “ts” : Timestamp(1577768527, 1),
            “t” : NumberLong(-1)
        },
        “numVotesNeeded” : 1,
        “priorityAtElection” : 1,
        “electionTimeoutMillis” : NumberLong(10000),
        “newTermStartDate” : ISODate(“2019-12-31T05:02:08.354Z”),
        “wMajorityWriteAvailabilityDate” : ISODate(“2019-12-31T05:02:08.362Z”)
    },
    “members” : [
        {
            “_id” : 0,
            “name” : “ip-10-0-137-99.ap-southeast-2.compute.internal:27017”,
            “ip” : “10.0.137.99”,
            “health” : 1,
            “state” : 1,
            “stateStr” : “PRIMARY”,
            “uptime” : 80,
            “optime” : {
                “ts” : Timestamp(1577768528, 5),
                “t” : NumberLong(1)
            },
            “optimeDate” : ISODate(“2019-12-31T05:02:08Z”),
            “syncingTo” : “”,
            “syncSourceHost” : “”,
            “syncSourceId” : -1,
            “infoMessage” : “could not find member to sync from”,
            “electionTime” : Timestamp(1577768527, 2),
            “electionDate” : ISODate(“2019-12-31T05:02:07Z”),
            “configVersion” : 1,
            “self” : true,
            “lastHeartbeatMessage” : “”
        }
    ],
    “ok” : 1,
    “$clusterTime” : {
        “clusterTime” : Timestamp(1577768528, 5),
        “signature” : {
            “hash” : BinData(0,”nDISrR4afyRUVEQVntFkkVpTJKY=”),
            “keyId” : NumberLong(“6776464228418060290”)
        }
    },
    “operationTime” : Timestamp(1577768528, 5)
}
rs0:PRIMARY>



8. Make sure security group is open for dms replication group for the port on your EC2 instance where MongoDB is running (Default MongoDB port is 27017)



9. Add a collection (table) with some data to database ‘admin’ in the mongodb installation

> show collections
db.createCollection("accounts", { capped : true, autoIndexId : true, size : 
   6142800, max : 10000 } )
db.accounts.insert({"company": "Booth-Wade",
    "location": "5681 Mitchell Heights\nFort Adamstad, UT 8019B",
    "ip_address": "192.168.110.4B",
    "name": "Mark Becker",
    "eid": 27561})
   
db.accounts.insert({"company": "Myers,  Smith and Turner",
      "location": "USS BenjaminNlinFP0 AA 40236",
      "ip_address": "172.26.254.156",
      "name": "Tyler clark",
      "eid": 87662})

db.accounts.insert({"company": "Bowen-Harris",
      "location": "Tracey Plaza East Katietown,Sc74695",
      "ip_address": "172.28.45.209",
      "name": "Veronica Gomez",
      "eid": 772122})
     
> db.accounts.find( {} )

— Insert Many —
      
      db.accounts.insertMany(
      [
    {“company”: “Booth-Wade”,
      “location”: “5681 Mitchell Heights\nFort Adamstad, UT 8019B”,
      “ip_address”: “192.168.110.4B”,
      “name”: “Mark Becker”,
      “eid”: 27561},   
    {“company”: “Myers,  Smith and Turner”,
      “location”: “USS BenjaminNlinFP0 AA 40236”,
      “ip_address”: “172.26.254.156”,
      “name”: “Tyler clark”,
      “eid”: 87662},        
    {“company”: “Bowen-Harris”,
      “location”: “Tracey Plaza East Katietown,Sc74695”,
      “ip_address”: “172.28.45.209”,
      “name”: “Veronica Gomez”,
      “eid”: 772122}
      ]);
     

— Array Insert —

  db.accounts.insertMany( [
{
  “_id”: “5e037719f45Btodlcdb492464”,
  “accounts”: [
    {
      “company”: “Booth-Wade”,
      “location”: “5681 Mitchell Heights\nFort Adamstad, UT 8019B”,
      “ip_address”: “192.168.110.4B”,
      “name”: “Mark Becker”,
      “eid”: 27561
    },
    {
      “company”: “Myers,  Smith and Turner”,
      “location”: “USS BenjaminNlinFP0 AA 40236”,
      “ip_address”: “172.26.254.156”,
      “name”: “Tyler clark”,
      “eid”: 87662
    },
    {
      “company”: “Bowen-Harris”,
      “location”: “Tracey Plaza East Katietown,Sc74695”,
      “ip_address”: “172.28.45.209”,
      “name”: “Veronica Gomez”,
      “eid”: 772122
    }
  ]
}
]);

— Inventory Nested Array Collection —
db.createCollection(“inventory”, { capped : true, autoIndexId : true, size :
   6142800, max : 10000 } )
   
   db.createCollection(“inventory_new”, { capped : true, autoIndexId : true, size :
   6142800, max : 10000 } )
   
db.inventory.insertMany( [
   { item: “journal”, instock: [ { warehouse: “A”, qty: 5 }, { warehouse: “C”, qty: 15 } ] },
   { item: “notebook”, instock: [ { warehouse: “C”, qty: 5 } ] },
   { item: “paper”, instock: [ { warehouse: “A”, qty: 60 }, { warehouse: “B”, qty: 15 } ] },
   { item: “planner”, instock: [ { warehouse: “A”, qty: 40 }, { warehouse: “B”, qty: 5 } ] },
   { item: “postcard”, instock: [ { warehouse: “B”, qty: 15 }, { warehouse: “C”, qty: 35 } ] }
]);


db.inventory_new.insertMany([
   { item: “journal”, qty: 25, tags: [“blank”, “red”], size: { h: 14, w: 21, uom: “cm” } },
   { item: “mat”, qty: 85, tags: [“gray”], size: { h: 27.9, w: 35.5, uom: “cm” } },
   { item: “mousepad”, qty: 25, tags: [“gel”, “blue”], size: { h: 19, w: 22.85, uom: “cm” } }
])

> db.inventory.find( {} )

> show collections

accounts
inventory
inventory_new
system.keys
system.users
system.version

We have created 3 user collections called ‘accounts’,’inventory’ & ‘inventory_new’. These 3 collections(tables) shall be replicated to our targets. Connect and Check from MongoDB Compass on your Client Machine

Stage 2: Install Redshift Cluster

Create a VPC with Public and Private Subnets. In a real world production scenario, it is always recommended to put your databases in a Private subnet

1. Create Public and Private Subnet

https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-public-private-vpc.html#run-VPC-wizard

2. Install Redshift in Public Subnet

3. Install Redshift in Private Subnet

If you have different scenario’s of your DMS replication in one VPC and Databases in other VPC or Replicating from on-premise to AWS VPC then you can refer this link : https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.VPC.html

Stage 3: Create DMS Replication Instance, DMS Replication Endpoints & DMS Replication Tasks for MongoDB

Steps : Create Replication Instance > Create Endpoints > Create DMS Tasks

1. Create Replication instance

Go to AWS Console > Database Migration Service  > Replication Instance > Create Replication Instance

2. Create Replication Endpoints

a) MongoDB Replication Endpoint

Go to DMS Console > Endpoints > Create Endpoint. Use this link for configuration for your endpoint > https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.MongoDB.html

In MongoDB as source you have 2 modes available : Document Mode and Table Mode. Some important points to note in this regard are :

  • A record in MongoDB is a document, which is a data structure composed of field and value pairs. The value of a field can include other documents, arrays, and arrays of documents. A document is roughly equivalent to a row in a relational database table.
  • A collection in MongoDB is a group of documents, and is roughly equivalent to a relational database table.
  • Internally, a MongoDB document is stored as a binary JSON (BSON) file in a compressed format that includes a type for each field in the document. Each document has a unique ID.

MongoDB is officially supported on versions 2.6.x and 3.x as a database source only. But I have tested it with MongoDB 4.2, which is the latest community version and it works without any issues, However I would advise to stick with the officially certified versions. AWS DMS supports two migration modes when using MongoDB as a source. You specify the migration mode using the Metadata mode parameter using the AWS Management Console or the extra connection attribute nestingLevel when you create the MongoDB endpoint.

Document mode

In document mode, the MongoDB document is migrated as is, meaning that the document data is consolidated into a single column named _doc in a target table.

Table mode

In table mode, AWS DMS transforms each top-level field in a MongoDB document into a column in the target table. If a field is nested, AWS DMS flattens the nested values into a single column. AWS DMS then adds a key field and data types to the target table’s column set.

Connection Attributes

nestingLevel

Value : NONE

ONE

Description : NONE – Specify NONE to use document mode. Specify ONE to use table mode.

extractDocID

Value :true

false

Description : false – Use this attribute when nestingLevel is set to NONE.

Test the Endpoint

b) Create Redshift Replication Endpoint

Test Redshift Endpoint

Once you create the endpoint for Redshift it will automatically adds a DMS endpoint roles and assigns it to the Redshift role. Further down when we create S3 as target endpoint we need to add the S3 permissions via a managed policy to this same role

dms-access-for-endpoint : arn:aws:iam::775867435088:role/dms-access-for-endpoint

c) Create MongoDB-Redshift Database Migration Task

Go to DMS Console > Conversion & Migration > Database Migrations Tasks > Create Task

Before moving ahead step that the security group of Redshift allows ingress rules for port 5439 for 0.0.0.0/0 or preferably the Security Group ID of your Replication Instance is added to the ingress rules for Redshift SG over port 5439. Check this link for more information : https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Redshift.html

In our case DMS Replication Instance SGID is ‘sg-0a695ef98b6e39963’. So SG of Redshift looks like below:

Refer this documentation for more complex VPC setup, It is beyond the scope of this article : https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.VPC.html

Checking from Redshift..we can see all the 3 tables from mongodb ‘accounts’,’inventory’ & ‘inventory_new’ are created and also the schema ‘admin’ is automatically created by DMS.

Query all the tables to confirm data is replicated

testdb=# select * from admin.accounts;

            _id             |   array_accounts
——————————————————————————————————-
 5e037719f45Btodlcdb492464  | [ { “company” : “Booth-Wade”, “location” : “5681 Mitchell Heights\nFort Adamstad, UT 8019B”, “ip
_address” : “192.168.110.4B”, “name” : “Mark Becker”, “eid” : 27561.0 }, { “company” : “Myers,  Smith and Turner”, “location”
: “USS BenjaminNlinFP0 AA 40236”, “ip_address” : “172.26.254.156”, “name” : “Tyler clark”, “eid” : 87662.0 }, { “company” : “B
owen-Harris”, “location” : “Tracey Plaza East Katietown,Sc74695”, “ip_address” : “172.28.45.209”, “name” : “Veronica Gomez”, “
eid” : 772122.0 } ]
 9772sjs19f45Btodlcdbk49fk4 | [ { “company” : “Trust Co”, “location” : “Zetland Inc.”, “ip_address” : “12.168.210.2B”, “name”
: “Mert Cliff”, “eid” : 4343.0 }, { “company” : “Mist Ltd.”, “location” : “Cliffstone yard”, “ip_address” : “72.32.254.156”, “
name” : “Kris Loff”, “eid” : 76343.0 }, { “company” : “Coles Supermarket”, “location” : “Randwich St”, “ip_address” : “22.28.4
5.110″, “name” : “Will Markbaeur”, “eid” : 13455.0 } ]
(2 rows)

testdb=# select * from admin.inventory;


         oid__id          |   item   |                                array_instock
————————–+———-+——————————————————————————
 5e0bd854fd4602c4b6926d68 | journal  | [ { “warehouse” : “A”, “qty” : 5.0 }, { “warehouse” : “C”, “qty” : 15.0 } ]
 5e0bd854fd4602c4b6926d69 | notebook | [ { “warehouse” : “C”, “qty” : 5.0 } ]
 5e0bd854fd4602c4b6926d6a | paper    | [ { “warehouse” : “A”, “qty” : 60.0 }, { “warehouse” : “B”, “qty” : 15.0 } ]
 5e0bd854fd4602c4b6926d6b | planner  | [ { “warehouse” : “A”, “qty” : 40.0 }, { “warehouse” : “B”, “qty” : 5.0 } ]
 5e0bd854fd4602c4b6926d6c | postcard | [ { “warehouse” : “B”, “qty” : 15.0 }, { “warehouse” : “C”, “qty” : 35.0 } ]
(5 rows)


testdb=# select * from admin.inventory_new;


         oid__id          |   item   | qty |     array_tags     | size.h | size.w | size.uom
————————–+———-+—–+——————–+——–+——–+———-
 5e0bef1775d0b39f2ef66923 | journal  |  25 | [ “blank”, “red” ] |     14 |     21 | cm
 5e0bef1775d0b39f2ef66924 | mat      |  85 | [ “gray” ]         |   27.9 |   35.5 | cm
 5e0bef1775d0b39f2ef66925 | mousepad |  25 | [ “gel”, “blue” ]  |     19 |  22.85 | cm
(3 rows)