wal2json for PostgreSQL

Aca mis pruebas, abajo la original:

yum install wal2json

create database daftest;
daftest=# CREATE TABLE test_table (
id char(10) NOT NULL,
code char(10),
PRIMARY KEY (id)
);


export PATH="$PATH:/usr/pgsql-12/bin"
pg_recvlogical
pg_recvlogical: error: no slot specified

pg_recvlogical -d daftest --slot test_slot --create-slot -P wal2json
pg_recvlogical -d daftest --slot test_slot --start -o pretty-print=1 -f - ## Va a quedar como esperando algo...

En una segunda terminal ejecuto: 
daftest=# insert into test_table (id,code) select 1, 'Hola';
INSERT 0 1

En la primera se ve:

{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["1 ", "Hola "]
}
]
}

En la 2da: daftest=# update test_table set code = 'Chau' where id = '1';
UPDATE 1

En la 1ra:
{
"change": [
{
"kind": "update",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["1 ", "Chau "],
"oldkeys": {
"keynames": ["id"],
"keytypes": ["character(10)"],
"keyvalues": ["1 "]
}
}
]
}

En la 2da: daftest=# delete from test_table where id = '1';
DELETE 1

En la 1ra:
{
"change": [
{
"kind": "delete",
"schema": "public",
"table": "test_table",
"oldkeys": {
"keynames": ["id"],
"keytypes": ["character(10)"],
"keyvalues": ["1 "]
}
}
]
}

ctrl+C
pg_recvlogical -d daftest --slot test_slot --drop-slot

Logical Decoding Output Plug-in Installation for PostgreSQL

This document describes the database setup required for streaming data changes out of PostgreSQL. This comprises configuration applying to the database itself as well as the installation of the wal2json logical decoding output plug-in. The installation and the tests are performed at the following environment/configuration:

Similar steps need to be taken for other Postgres and OS versions and the Decoderbufs logical decoding plug-in which also is supported by Debezium.

As of Debezium 0.10, the connector supports PostgreSQL 10+ logical replication streaming using pgoutput. This means that a logical decoding output plug-in is no longer necessary and changes can be emitted directly from the replication stream by the connector.

Logical Decoding Plug-ins

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

As of PostgreSQL 9.4, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements. In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database. The output plug-ins transform the data from the write-ahead log’s internal representation into the format the consumer of a replication slot desires. Plug-ins are written in C, compiled, and installed on the machine which runs the PostgreSQL server, and they use a number of PostgreSQL specific APIs, as described by the PostgreSQL documentation.

Debezium’s PostgreSQL connector works with one of Debezium’s supported logical decoding plug-ins,

to encode the changes in either Protobuf format or JSON format.

For simplicity, Debezium also provides a Docker image based on a vanilla PostgreSQL server image on top of which it compiles and installs the plug-ins.

The Debezium logical decoding plug-ins have only been installed and tested on Linux machines. For Windows and other platforms it may require different installation steps

Differences between Plug-ins

The plug-ins’ behaviour is not completely same for all cases. So far these differences have been identified

  • wal2json plug-in is not able to process quoted identifiers (issue)
  • wal2json plug-in does not emit events for tables without primary keys
  • wal2json plug-in does not support special values (NaN or infinity) for floating point types

All up-to-date differences are tracked in a test suite Java class.

More information about the logical decoding and output plug-ins can be found at:

Installation

At the current installation example, the wal2json output plug-in for logical decoding is used. The wal2json output plug-in produces a JSON object per transaction. All of the new/old tuples are available in the JSON object. The plug-in compilation and installation is performed by executing the related commands extracted from the Debezium docker image file.

Before executing the commands, make sure that the user has the privileges to write the wal2json library at the PostgreSQL lib directory (at the test environment, the directory is: /usr/pgsql-9.6/lib/). Also note that the installation process requires the PostgreSQL utility pg_config. Verify that the PATH environment variable is set so as the utility can be found. If not, update the PATH environment variable appropriately. For example at the test environment:

export PATH="$PATH:/usr/pgsql-9.6/bin"
wal2json installation commands
$ git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& git checkout d2b7fef021c46e0d429f2c1768de361069e58696 \
&& make && make install \
&& cd .. \
&& rm -rf wal2json
wal2json installation output
Cloning into 'wal2json'...
remote: Counting objects: 445, done.
remote: Total 445 (delta 0), reused 0 (delta 0), pack-reused 445
Receiving objects: 100% (445/445), 180.70 KiB | 0 bytes/s, done.
Resolving deltas: 100% (317/317), done.
Note: checking out 'd2b7fef021c46e0d429f2c1768de361069e58696'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

  git checkout -b new_branch_name

HEAD is now at d2b7fef... Improve style
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -I. -I./ -I/usr/pgsql-9.6/include/server -I/usr/pgsql-9.6/include/internal -D_GNU_SOURCE -I/usr/include/libxml2  -I/usr/include  -c -o wal2json.o wal2json.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -L/usr/pgsql-9.6/lib -Wl,--as-needed  -L/usr/lib64 -Wl,--as-needed -Wl,-rpath,'/usr/pgsql-9.6/lib',--enable-new-dtags  -shared -o wal2json.so wal2json.o
/usr/bin/mkdir -p '/usr/pgsql-9.6/lib'
/usr/bin/install -c -m 755  wal2json.so '/usr/pgsql-9.6/lib/'

Installation on Fedora 30+

Debezium provides RPM package for Fedora operating system too. The package is updated always after a final Debezium release is done. To use the RPM in question just issue the standard Fedora installation command:

$ sudo dnf -y install postgres-decoderbufs

The rest of the configuration is same as described below for wal2json plugin.

PostgreSQL Server Configuration

Once the wal2json plug-in has been installed, the database server should be configured.

Setting up libraries, WAL and replication parameters

Add the following lines at the end of the postgresql.conf PostgreSQL configuration file in order to include the plug-in at the shared libraries and to adjust some WAL and streaming replication settings. The configuration is extracted from postgresql.conf.sample. You may need to modify it, if for example you have additionally installed shared_preload_libraries.

postgresql.conf , configuration file parameters settings
############ REPLICATION ##############
# MODULES
shared_preload_libraries = 'wal2json'   

# REPLICATION
wal_level = logical                     
max_wal_senders = 4                     
max_replication_slots = 4               
tells the server that it should load at startup the wal2json (use decoderbufs for protobuf) logical decoding plug-in(s) (the names of the plug-ins are set in protobuf and wal2json Makefiles)
tells the server that it should use logical decoding with the write-ahead log
tells the server that it should use a maximum of 4 separate processes for processing WAL changes
tells the server that it should allow a maximum of 4 replication slots to be created for streaming WAL changes

Debezium needs a PostgreSQL’s WAL to be kept during Debezium outages. If your WAL retention is too small and outages too long, then Debezium will not be able to recover after restart as it will miss part of the data changes. The usual indicator is an error similar to this thrown during the startup: ERROR: requested WAL segment 000000010000000000000001 has already been removed.

When this happens then it is necessary to re-execute the snapshot of the database. We also recommend to set parameter wal_keep_segments = 0. Please follow PostgreSQL official documentation for fine-tuning of WAL retention.

We strongly recommend reading and understanding the official documentation regarding the mechanics and configuration of the PostgreSQL write-ahead log.

Setting up replication permissions

Replication can only be performed by a database user that has appropriate permissions and only for a configured number of hosts. In order to give a user replication permissions, define a PostgreSQL role that has at least the REPLICATION and LOGIN permissions. For example:

CREATE ROLE name REPLICATION LOGIN;

Superusers have by default both of the above roles.

Add the following lines at the end of the pg_hba.conf PostgreSQL configuration file, so as to configure the client authentication for the database replication. The PostgreSQL server should allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running.

Note that the authentication refers to the database superuser postgres. You may change this accordingly, if some other user with REPLICATION and LOGIN permissions has been created.

pg_hba.conf , configuration file parameters settings
############ REPLICATION ##############
local   replication     postgres                          trust		
host    replication     postgres  127.0.0.1/32            trust		
host    replication     postgres  ::1/128                 trust		
tells the server to allow replication for postgres locally (i.e. on the server machine)
tells the server to allow postgres on localhost to receive replication changes using IPV4
tells the server to allow postgres on localhost to receive replication changes using IPV6

See the PostgreSQL documentation for more information on network masks.

Database Test Environment Set-up

For the testing purposes, a database named test with a table named test_table are created with the following DDL commands:

Database SQL commands for test database/table creation
CREATE DATABASE test;

CREATE TABLE test_table (
    id char(10) NOT NULL,
    code        char(10),
    PRIMARY KEY (id)
);

Decoding Output Plug-in Test

Test that the wal2json is working properly by obtaining the test_table changes using the pg_recvlogical PostgreSQL client application that controls PostgreSQL logical decoding streams.

Before starting make sure that you have logged in as a user with database replication permissions, as configured at a previous step. Otherwise, the slot creation and streaming fails with the following error message:

pg_recvlogical: could not connect to server: FATAL:  no pg_hba.conf entry for replication connection from host "[local]", user "root", SSL off

At the test environment, the user with replication permission is the postgres.

Also, make sure that the PATH environment variable is set so as the pg_recvlogical can be found. If not, update the PATH environment variable appropriately. For example at the test environment:

export PATH="$PATH:/usr/pgsql-9.6/bin"
  • Create a slot named test_slot for the database named test, using the logical output plug-in wal2json
$ pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
  • Begin streaming changes from the logical replication slot test_slot for the database test
$ pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
  • Perform some basic DML operations at test_table to trigger INSERT/UPDATE/DELETE change events
Interactive PostgreSQL terminal, SQL commands
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1');
INSERT 0 1
test=# update test_table set code='code2' where id='id1';
UPDATE 1
test=# delete from test_table where id='id1';
DELETE 1

Upon the INSERT, UPDATE and DELETE events, the wal2json plug-in outputs the table changes as captured by pg_recvlogical.

Output for INSERT event
{
  "change": [
    {
      "kind": "insert",
      "schema": "public",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1       ", "code1     "]
    }
  ]
}
Output for UPDATE event
{
  "change": [
    {
      "kind": "update",
      "schema": "public",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1       ", "code2     "],
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["character(10)"],
        "keyvalues": ["id1       "]
      }
    }
  ]
}
Output for DELETE event
{
  "change": [
    {
      "kind": "delete",
      "schema": "public",
      "table": "test_table",
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["character(10)"],
        "keyvalues": ["id1       "]
      }
    }
  ]
}

Note that the REPLICA IDENTITY of the table test_table is set to DEFAULT.

When the test is finished, the slot test_slot for the database test can be removed by the following command:

$ pg_recvlogical -d test --slot test_slot --drop-slot

REPLICA IDENTITY, is a PostgreSQL specific table-level setting which determines the amount of information that is available to logical decoding in case of UPDATE and DELETE events.

There are 4 possible values for REPLICA IDENTITY:

  • DEFAULTUPDATE and DELETE events will only contain the previous values for the primary key columns of a table
  • NOTHINGUPDATE and DELETE events will not contain any information about the previous value on any of the table columns
  • FULLUPDATE and DELETE events will contain the previous values of all the table’s columns
  • INDEX index nameUPDATE and DELETE events will contains the previous values of the columns contained in the index definition named index name

You can modify and check the replica REPLICA IDENTITY for a table with the following commands:

ALTER TABLE test_table REPLICA IDENTITY FULL;
test=# \d+ test_table
                         Table "public.test_table"
 Column |     Type      | Modifiers | Storage  | Stats target | Description
 -------+---------------+-----------+----------+--------------+------------
 id     | character(10) | not null  | extended |              |
 code   | character(10) |           | extended |              |
Indexes:
    "test_table_pkey" PRIMARY KEY, btree (id)
Replica Identity: FULL

Here is the output of wal2json plug-in on DELETE event and REPLICA IDENTITY set to FULL. Compare with the respective output when REPLICA IDENTITY is set to DEFAULT.

Output for `UPDATE`
{
  "change": [
    {
      "kind": "update",
      "schema": "public",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1       ", "code2     "],
      "oldkeys": {
        "keynames": ["id", "code"],
        "keytypes": ["character(10)", "character(10)"],
        "keyvalues": ["id1       ", "code1     "]
      }
    }
  ]
}


https://debezium.io/documentation/reference/0.10/postgres-plugins.html

Posted

in

by

Tags: