PostgreSQL Real Time Notification
Overview
One of our clients there was a requirement to modify the data streamed into Elasticsearch according to changed network mapping in PostgreSQL.
The customer used Elasticsearch as its big data platform and PostgreSQL to store metadata such as network mapping.
Many Applications which work with databases need to get updates data from the database for their processing. For Example:
- Metadata tables which define application behavior : such as timeout parameters , bulk size when pushing / retrieving data from the database
- Lookup tables which used for data transformation, filtering and enrichment (ETL)
- Displaying / processing real time events which are pushed to the database from external sources like sensors data.
The requirement:
Application clients (Backend server, UI) want to know when there is new data or data is changed in the database and get notified about it in real time.
The Solution:
There are 2 common ways to achieve it:
- Having dedicate process which examine the relevant tables in the database every interval of time and in case of change (By Timestamp / ID) it send notification to the application and the application select the relevant data from the changed tables.
- Using Listening and Notification processes .In one side client side listen for change notifications on the other side data change processes send notification which the client consume and process accordingly.
PostgreSQL Solution
PostgreSQL has a nice NOTIFY and LISTEN feature which enable to send notification in case data is changed and clients which listen to this messages can act accordingly.
The following example demonstrate how to create C++ program which listen to specific message and upon data change , the data is being send as JSON from the database to the client program with no to run query in the database .
- Create the Database Objects:
- Create temp table which will be used for real time notifications:
create table temp_table (a integer , b varchar(40),c varchar(40));
- Create trigger procedure
CREATE OR REPLACE FUNCTION add_notification_json () RETURNS TRIGGER AS $BODY$
DECLARE
v_row json;
v_notify_id integer;
cmd varchar(4000);
BEGIN
–Convert Row Data to JSON;
v_row = row_to_json(new);
RAISE INFO ‘This is %’ , v_row;
cmd=’NOTIFY TEMP_TABLE, ”’ || v_row||””;
execute cmd;
returning notify_id into v_notify_id;
RETURN new;
END;
$BODY$
language plpgsql;
- Create a trigger which call the trigger procedure
create table temp_table (a integer , b varchar(40),c varchar(40));
CREATE TRIGGER temp_table_notify_trg
AFTER INSERT OR UPDATE ON temp_table
FOR EACH ROW
EXECUTE PROCEDURE add_notification_json();
- Create the C++ Program
The C++ database drive for PostgreSQL is libpq
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#include <libpq-fe.h>
/* Define Database Connection Properties */
#define PG_HOST “10.10.10.10”
#define PG_USER “postgres”
#define PG_DB “test”
#define PG_PASS “postgres”
#define PG_PORT 5432
static void
exit_nicely(PGconn *conn)
{
PQfinish(conn);
exit(1);
}
int
main(int argc, char **argv)
{
char *conninfo;
PGconn *conn;
PGresult *res;
PGnotify *notify;
int nnotifies;
sprintf(conninfo,
“user=%s password=%s dbname=%s hostaddr=%s port=%d”,
PG_USER, PG_PASS, PG_DB, PG_HOST, PG_PORT);
/* Make a connection to the database */
conn = PQconnectdb(conninfo);
/* Check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, “Connection to database failed: %s”,
PQerrorMessage(conn));
exit_nicely(conn);
}
/*
* Issue LISTEN command to enable notifications from the trigger NOTIFY.
*/
res = PQexec(conn, “LISTEN TEMP_TABLE”);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, “LISTEN command failed: %s”, PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
/*
* should PQclear PGresult whenever it is no longer needed to avoid memory
* leaks
*/
PQclear(res);
/* Quit after four notifies are received. */
nnotifies = 0;
while (nnotifies < 4)
{
/*
* Sleep until something happens on the connection. We use select(2)
* to wait for input, but you could also use poll() or similar
* facilities.
*/
int sock;
fd_set input_mask;
sock = PQsocket(conn);
if (sock < 0)
break; /* shouldn’t happen */
FD_ZERO(&input_mask);
FD_SET(sock, &input_mask);
if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
{
fprintf(stderr, “select() failed: %s\n”, strerror(errno));
exit_nicely(conn);
}
/* Now check for input , the extra contain the json data */
PQconsumeInput(conn);
while ((notify = PQnotifies(conn)) != NULL)
{
fprintf(stderr,
“ASYNC NOTIFY of ‘%s’ received from backend ,Message %s , PID %d\n”,
notify->relname,notify -> extra, notify->be_pid);
PQfreemem(notify);
nnotifies++;
}
}
fprintf(stderr, “Done.\n”);
/* close the connection to the database and cleanup */
PQfinish(conn);
return 0;
}
- Testing
insert into temp_table values (10,’alon’,’eldi’);
select * from temp_table
Written By Alon Eldi – CEO , Big Data & Cloud Expert in SeaData
SeaData is specialized in Elasticsearch and PostgreSQL
For Database Consulting and Support please Contact:
972-54-4080111
We will be happy to assist in any time.