PostgreSQL database programming with Ruby and pgx

January 10, 2016

Abstract

If you use Ruby with PostgreSQL you probably use the excellent pg gem. This article documents pgx, a pure ruby “thin” extension of pg that provides logging, transaction retrying, and database reconnecting functionality.

Summary

pgx injects additional higher level functionality into the pg API without changing its method signatures, except for new(). Specifically it adds the following:

  • Logging
  • Retrying
  • Reconnecting

These features are only available for the synchronous pg methods.

Installation

Install pgx with:

gem install pgx

Logging

Upon connecting you can pass a ruby logger object, or a lambda returning the ruby logger object. You can also pass an array of logger objects in case you want to log to multiple logs (eg: stdout and a log file.)

To log everything to a file you can do:

log = Logger.new('/tmp/debug.log')
log.level = Logger::DEBUG
con = PGx::Connection.new(logger: log, dbname: 'test')

You can also use a lambda to return the logger object, which makes the logging of the log object itself easier on the eyes:

log_l = lambda {
    log = Logger.new('/tmp/debug.log')
    log.level = Logger::DEBUG
    return log
}
con = PGx::Connection.new(logger: log_l, dbname: 'test')

To log everything to a file, and warnings plus errors to stdout you can do:

log_l = lambda {
    log_d = Logger.new('/tmp/debug.log')
    log_d.level = Logger::DEBUG
    log_w = Logger.new($stdout)
    log_w.level = Logger::WARN
    return [log_d, log_w]
}
con = PGx::Connection.new(logger: log_l, dbname: 'test')

Retrying

Any transaction that fails because of a “retriable” error is automatically retried. You can define which errors are “retriable” upon connecting by passing method_retriable_error_states with an array of retriable error states. The maximum number of retries is controlled by the method_try_count_max parameter. Example:

con = PGx::Connection.new(
    method_try_count_max: 3,
    -- retry deadlocks and serialization failures
    method_retriable_error_states: ['40P01', '40001'],
    host: 'somehost',
    dbname: 'test'
)

Connecting and Re-connecting

If a transaction fails because of a bad database connection, pgx goes into a loop trying to re-establish the connection, and when it does successfully reconnect, it retries the failed transaction. There are two arguments related to establishing and re-establishing database connections: connect_init and connect_retry_sleep.

connect_init can be passed a lambda that contains code that should be executed immediately after establishing (or re-establishing) a connection to the database. The lambda is called with one argument, the database connection object. This can be useful for setting the search_path.

connect_retry_sleep can be passed a lambda, an integer, or nil. If it's passed a lambda, the lambda is called with one argument (try counter) which starts at 1 and increments by one with every unccesseful connection attempt. The lambda should implement an algorithm for sleeping between connection attempts. If it's passed an integer, it indicates how long to sleep between connection attempts, in seconds. If a nil is passed, no attempt to reconnect is made and an exception is raised.

Here is a complete program that illustrates all of the above:

#! /usr/bin/ruby

require 'pgx'

con = PGx::Connection.new(
    logger: lambda {
        log = Logger.new($stdout);
        log.level = Logger::DEBUG;
        return log;
    },
    method_try_count_max: 3,
    method_retriable_error_states: ['40P01', '40001'],
    connect_init: lambda { |con|
        con.exec %q{
            select set_search_path('mydb', 'development')
        }
    },
    connect_retry_sleep: lambda { |try_count|
        sleep([try_count ** 2, 120].min)
    },
    host: 'somehost',
    port: 5432,
    dbname: 'test',
    user: 'wizkid'
)

def pid(con)
    return
        con.exec(
            'select pg_backend_pid()'
        ).first['pg_backend_pid']
end

pid = pid(con)

con.prepare 'sql1', 'select ($1)::int + ($1)::int as x'
con.prepare 'sql2', 'select ($1)::int * ($1)::int as x'

con.transaction do

    # simulate losing the database connection in the middle
    # of a transaction
    con.reset if pid == pid(con)

    #
    # Once the connection is reset, the prepared statements
    # from above are lost! Thus, after each exec_prepared
    # below, there is a failure which causes the prepared
    # statement to be automatically reprepared before retrying
    # the transaction; this causes the transaction to be
    # retried twice before it finally succeeds.
    #

    # should output 6
    con.exec_prepared 'sql1', [3] { |rs| puts rs.first['x'] }

    # should output 9
    con.exec_prepared 'sql2', [3] { |rs| puts rs.first['x'] }
end

In the above example, the first five arguments to PGx::Connection.new() are pgx specific, the rest are PG::Connection.new() arguments. Note the connect_retry_sleep lambda implements a function that sleeps 1, 2, 4, 8, ... seconds, with each retry sleeping double the amount until it reaches 120 seconds; from that point on it tries to connect to the database every two minutes.

Examples

Below are examples that illustrate a few of the ways you can iterate over result sets. The example snippets assume a database connection has been established in the variable con. Refer to pg (mostly the PG::Connection and PG::Result classes) for documentation on the methods that are used.

Display the contents of the products table:

con.exec 'select * from products' do |rs|
    # iterate rows
    rs.values.each do |row|
        # iterate row columns
        row.each_with_index do |col, i|
            # print column name
            print rs.fname(i) + ': '
            # print column value
            puts col.nil? ? 'NULL' : col
        end
    end
end

Same as above, but uses the each_row() method which is slightly more efficient for large result sets:

con.exec 'select * from products' do |rs|
    # iterate result set rows
    rs.each_row do |row|
        # iterate row columns
        row.each_with_index do |col, i|
            # print column name
            print rs.fname(i) + ': '
            # print column value
            puts col.nil? ? 'NULL' : col
        end
    end
end

Same as above, but uses the [] operator on the result set which returns a hash containing the row.

con.exec 'select * from products' do |rs|
    # iterate result set rows
    rs.cmd_tuples().times do |n|
        # iterate row columns
        rs[n].each do |key, value|
            # print column name
            print key + ': '
            # print column value
            puts value.nil? ? 'NULL' : value
        end
    end
end