diff --git a/docs/source/testgres.rst b/docs/source/testgres.rst index fd9c2d4d..80c86e84 100644 --- a/docs/source/testgres.rst +++ b/docs/source/testgres.rst @@ -59,4 +59,19 @@ testgres.node .. automethod:: __init__ .. autoclass:: testgres.node.ProcessProxy - :members: \ No newline at end of file + :members: + +testgres.pubsub +--------------- + +.. automodule:: testgres.pubsub + +.. autoclass:: testgres.node.Publication + :members: + + .. automethod:: __init__ + +.. autoclass:: testgres.node.Subscription + :members: + + .. automethod:: __init__ diff --git a/testgres/connection.py b/testgres/connection.py index 6447f685..3943a4e2 100644 --- a/testgres/connection.py +++ b/testgres/connection.py @@ -27,7 +27,12 @@ class NodeConnection(object): Transaction wrapper returned by Node """ - def __init__(self, node, dbname=None, username=None, password=None): + def __init__(self, + node, + dbname=None, + username=None, + password=None, + autocommit=False): # Set default arguments dbname = dbname or default_dbname() @@ -42,6 +47,7 @@ def __init__(self, node, dbname=None, username=None, password=None): host=node.host, port=node.port) + self._connection.autocommit = autocommit self._cursor = self.connection.cursor() @property diff --git a/testgres/consts.py b/testgres/consts.py index 123a034c..f7f01d9d 100644 --- a/testgres/consts.py +++ b/testgres/consts.py @@ -29,3 +29,6 @@ MAX_REPLICATION_SLOTS = 10 MAX_WAL_SENDERS = 10 WAL_KEEP_SEGMENTS = 20 + +# logical replication settings +LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS = 60 diff --git a/testgres/node.py b/testgres/node.py index d8ce1f03..54930a88 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -57,11 +57,14 @@ QueryException, \ StartNodeException, \ TimeoutException, \ + InitNodeException, \ TestgresException, \ BackupException from .logger import TestgresLogger +from .pubsub import Publication, Subscription + from .utils import \ eprint, \ get_bin_path, \ @@ -70,6 +73,7 @@ reserve_port, \ release_port, \ execute_utility, \ + options_string, \ clean_on_error from .backup import NodeBackup @@ -300,24 +304,24 @@ def _create_recovery_conf(self, username, slot=None): master = self.master assert master is not None - conninfo = ( - u"application_name={} " - u"port={} " - u"user={} " - ).format(self.name, master.port, username) # yapf: disable + conninfo = { + "application_name": self.name, + "port": master.port, + "user": username + } # yapf: disable # host is tricky try: import ipaddress ipaddress.ip_address(master.host) - conninfo += u"hostaddr={}".format(master.host) + conninfo["hostaddr"] = master.host except ValueError: - conninfo += u"host={}".format(master.host) + conninfo["host"] = master.host line = ( "primary_conninfo='{}'\n" "standby_mode=on\n" - ).format(conninfo) # yapf: disable + ).format(options_string(**conninfo)) # yapf: disable if slot: # Connect to master for some additional actions @@ -413,6 +417,7 @@ def default_conf(self, fsync=False, unix_sockets=True, allow_streaming=True, + allow_logical=False, log_statement='all'): """ Apply default settings to this node. @@ -421,6 +426,7 @@ def default_conf(self, fsync: should this node use fsync to keep data safe? unix_sockets: should we enable UNIX sockets? allow_streaming: should this node add a hba entry for replication? + allow_logical: can this node be used as a logical replication publisher? log_statement: one of ('all', 'off', 'mod', 'ddl'). Returns: @@ -497,6 +503,13 @@ def get_auth_method(t): WAL_KEEP_SEGMENTS, wal_level)) # yapf: disable + if allow_logical: + if not pg_version_ge('10'): + raise InitNodeException( + "Logical replication is only available for Postgres 10 " + "and newer") + conf.write(u"wal_level = logical\n") + # disable UNIX sockets if asked to if not unix_sockets: conf.write(u"unix_socket_directories = ''\n") @@ -937,13 +950,14 @@ def poll_query_until(self, if res is None: raise QueryException('Query returned None', query) - if len(res) == 0: - raise QueryException('Query returned 0 rows', query) - - if len(res[0]) == 0: - raise QueryException('Query returned 0 columns', query) - - if res[0][0] == expected: + # result set is not empty + if len(res): + if len(res[0]) == 0: + raise QueryException('Query returned 0 columns', query) + if res[0][0] == expected: + return # done + # empty result set is considered as None + elif expected is None: return # done except ProgrammingError as e: @@ -982,13 +996,11 @@ def execute(self, with self.connect(dbname=dbname, username=username, - password=password) as node_con: # yapf: disable + password=password, + autocommit=commit) as node_con: # yapf: disable res = node_con.execute(query) - if commit: - node_con.commit() - return res def backup(self, **kwargs): @@ -1052,6 +1064,37 @@ def catchup(self, dbname=None, username=None): except Exception as e: raise_from(CatchUpException("Failed to catch up", poll_lsn), e) + def publish(self, name, **kwargs): + """ + Create publication for logical replication + + Args: + pubname: publication name + tables: tables names list + dbname: database name where objects or interest are located + username: replication username + """ + return Publication(name=name, node=self, **kwargs) + + def subscribe(self, publication, name, dbname=None, username=None, + **params): + """ + Create subscription for logical replication + + Args: + name: subscription name + publication: publication object obtained from publish() + dbname: database name + username: replication username + params: subscription parameters (see documentation on `CREATE SUBSCRIPTION + `_ + for details) + """ + # yapf: disable + return Subscription(name=name, node=self, publication=publication, + dbname=dbname, username=username, **params) + # yapf: enable + def pgbench(self, dbname=None, username=None, @@ -1150,7 +1193,11 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs): return execute_utility(_params, self.utils_log_file) - def connect(self, dbname=None, username=None, password=None): + def connect(self, + dbname=None, + username=None, + password=None, + autocommit=False): """ Connect to a database. @@ -1158,6 +1205,9 @@ def connect(self, dbname=None, username=None, password=None): dbname: database name to connect to. username: database user name. password: user's password. + autocommit: commit each statement automatically. Also it should be + set to `True` for statements requiring to be run outside + a transaction? such as `VACUUM` or `CREATE DATABASE`. Returns: An instance of :class:`.NodeConnection`. @@ -1166,4 +1216,5 @@ def connect(self, dbname=None, username=None, password=None): return NodeConnection(node=self, dbname=dbname, username=username, - password=password) # yapf: disable + password=password, + autocommit=autocommit) # yapf: disable diff --git a/testgres/pubsub.py b/testgres/pubsub.py new file mode 100644 index 00000000..bb153913 --- /dev/null +++ b/testgres/pubsub.py @@ -0,0 +1,202 @@ +# coding: utf-8 +""" +Unlike physical replication the logical replication allows users replicate only +specified databases and tables. It uses publish-subscribe model with possibly +multiple publishers and multiple subscribers. When initializing publisher's +node ``allow_logical=True`` should be passed to the :meth:`.PostgresNode.init()` +method to enable PostgreSQL to write extra information to the WAL needed by +logical replication. + +To replicate table ``X`` from node A to node B the same table structure should +be defined on the subscriber's node as logical replication don't replicate DDL. +After that :meth:`~.PostgresNode.publish()` and :meth:`~.PostgresNode.subscribe()` +methods may be used to setup replication. Example: + +>>> from testgres import get_new_node +>>> with get_new_node() as nodeA, get_new_node() as nodeB: +... nodeA.init(allow_logical=True).start() +... nodeB.init().start() +... +... # create same table both on publisher and subscriber +... create_table = 'create table test (a int, b int)' +... nodeA.safe_psql(create_table) +... nodeB.safe_psql(create_table) +... +... # create publication +... pub = nodeA.publish('mypub') +... # create subscription +... sub = nodeB.subscribe(pub, 'mysub') +... +... # insert some data to the publisher's node +... nodeA.execute('insert into test values (1, 1), (2, 2)') +... +... # wait until changes apply on subscriber and check them +... sub.catchup() +... +... # read the data from subscriber's node +... nodeB.execute('select * from test') +PostgresNode(name='...', port=..., base_dir='...') +PostgresNode(name='...', port=..., base_dir='...') +'' +'' +[(1, 1), (2, 2)] +""" + +from six import raise_from + +from .consts import LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS +from .defaults import default_dbname, default_username +from .exceptions import CatchUpException +from .utils import options_string + + +class Publication(object): + def __init__(self, name, node, tables=None, dbname=None, username=None): + """ + Constructor. Use :meth:`.PostgresNode.publish()` instead of direct + constructing publication objects. + + Args: + name: publication name. + node: publisher's node. + tables: tables list or None for all tables. + dbname: database name used to connect and perform subscription. + username: username used to connect to the database. + """ + self.name = name + self.node = node + self.dbname = dbname or default_dbname() + self.username = username or default_username() + + # create publication in database + t = "table " + ", ".join(tables) if tables else "all tables" + query = "create publication {} for {}" + node.execute(query.format(name, t), dbname=dbname, username=username) + + def drop(self, dbname=None, username=None): + """ + Drop publication + """ + self.node.execute( + "drop publication {}".format(self.name), + dbname=dbname, + username=username) + + def add_tables(self, tables, dbname=None, username=None): + """ + Add tables to the publication. Cannot be used if publication was + created with empty tables list. + + Args: + tables: a list of tables to be added to the publication. + """ + if not tables: + raise ValueError("Tables list is empty") + + query = "alter publication {} add table {}" + self.node.execute( + query.format(self.name, ", ".join(tables)), + dbname=dbname or self.dbname, + username=username or self.username) + + +class Subscription(object): + def __init__(self, + node, + publication, + name=None, + dbname=None, + username=None, + **params): + """ + Constructor. Use :meth:`.PostgresNode.subscribe()` instead of direct + constructing subscription objects. + + Args: + name: subscription name. + node: subscriber's node. + publication: :class:`.Publication` object we are subscribing to + (see :meth:`.PostgresNode.publish()`). + dbname: database name used to connect and perform subscription. + username: username used to connect to the database. + params: subscription parameters (see documentation on `CREATE SUBSCRIPTION + `_ + for details). + """ + self.name = name + self.node = node + self.pub = publication + + # connection info + conninfo = { + "dbname": self.pub.dbname, + "user": self.pub.username, + "host": self.pub.node.host, + "port": self.pub.node.port + } + + query = ( + "create subscription {} connection '{}' publication {}").format( + name, options_string(**conninfo), self.pub.name) + + # additional parameters + if params: + query += " with ({})".format(options_string(**params)) + + # Note: cannot run 'create subscription' query in transaction mode + node.execute(query, dbname=dbname, username=username) + + def disable(self, dbname=None, username=None): + """ + Disables the running subscription. + """ + query = "alter subscription {} disable" + self.node.execute(query.format(self.name), dbname=None, username=None) + + def enable(self, dbname=None, username=None): + """ + Enables the previously disabled subscription. + """ + query = "alter subscription {} enable" + self.node.execute(query.format(self.name), dbname=None, username=None) + + def refresh(self, copy_data=True, dbname=None, username=None): + """ + Disables the running subscription. + """ + query = "alter subscription {} refresh publication with (copy_data={})" + self.node.execute( + query.format(self.name, copy_data), + dbname=dbname, + username=username) + + def drop(self, dbname=None, username=None): + """ + Drops subscription + """ + self.node.execute( + "drop subscription {}".format(self.name), + dbname=dbname, + username=username) + + def catchup(self, username=None): + """ + Wait until subscription catches up with publication. + + Args: + username: remote node's user name. + """ + query = """ + select pg_current_wal_lsn() - replay_lsn = 0 + from pg_catalog.pg_stat_replication where application_name = '{}' + """.format(self.name) + + try: + # wait until this LSN reaches subscriber + self.pub.node.poll_query_until( + query=query, + dbname=self.pub.dbname, + username=username or self.pub.username, + max_attempts=LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS) + except Exception as e: + raise_from(CatchUpException("Failed to catch up", query), e) diff --git a/testgres/utils.py b/testgres/utils.py index 6704571b..d8eeef53 100644 --- a/testgres/utils.py +++ b/testgres/utils.py @@ -11,6 +11,7 @@ from contextlib import contextmanager from distutils.version import LooseVersion +from six import iteritems from .config import testgres_config from .exceptions import ExecUtilException @@ -225,6 +226,10 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) +def options_string(separator=u" ", **kwargs): + return separator.join(u"{}={}".format(k, v) for k, v in iteritems(kwargs)) + + @contextmanager def clean_on_error(node): """ diff --git a/tests/test_simple.py b/tests/test_simple.py index 33defb12..1529c8e0 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -402,6 +402,107 @@ def test_replicate(self): res = node.execute('select * from test') self.assertListEqual(res, []) + @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') + def test_logical_replication(self): + with get_new_node() as node1, get_new_node() as node2: + node1.init(allow_logical=True) + node1.start() + node2.init().start() + + create_table = 'create table test (a int, b int)' + node1.safe_psql(create_table) + node2.safe_psql(create_table) + + # create publication / create subscription + pub = node1.publish('mypub') + sub = node2.subscribe(pub, 'mysub') + + node1.safe_psql('insert into test values (1, 1), (2, 2)') + + # wait until changes apply on subscriber and check them + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [(1, 1), (2, 2)]) + + # disable and put some new data + sub.disable() + node1.safe_psql('insert into test values (3, 3)') + + # enable and ensure that data successfully transfered + sub.enable() + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [(1, 1), (2, 2), (3, 3)]) + + # Add new tables. Since we added "all tables" to publication + # (default behaviour of publish() method) we don't need + # to explicitely perform pub.add_tables() + create_table = 'create table test2 (c char)' + node1.safe_psql(create_table) + node2.safe_psql(create_table) + sub.refresh() + + # put new data + node1.safe_psql('insert into test2 values (\'a\'), (\'b\')') + sub.catchup() + res = node2.execute('select * from test2') + self.assertListEqual(res, [('a', ), ('b', )]) + + # drop subscription + sub.drop() + pub.drop() + + # create new publication and subscription for specific table + # (ommitting copying data as it's already done) + pub = node1.publish('newpub', tables=['test']) + sub = node2.subscribe(pub, 'newsub', copy_data=False) + + node1.safe_psql('insert into test values (4, 4)') + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [(1, 1), (2, 2), (3, 3), (4, 4)]) + + # explicitely add table + with self.assertRaises(ValueError): + pub.add_tables([]) # fail + pub.add_tables(['test2']) + node1.safe_psql('insert into test2 values (\'c\')') + sub.catchup() + res = node2.execute('select * from test2') + self.assertListEqual(res, [('a', ), ('b', )]) + + @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') + def test_logical_catchup(self): + """ Runs catchup for 100 times to be sure that it is consistent """ + with get_new_node() as node1, get_new_node() as node2: + node1.init(allow_logical=True) + node1.start() + node2.init().start() + + create_table = 'create table test (key int primary key, val int); ' + node1.safe_psql(create_table) + node1.safe_psql('alter table test replica identity default') + node2.safe_psql(create_table) + + # create publication / create subscription + sub = node2.subscribe(node1.publish('mypub'), 'mysub') + + for i in range(0, 100): + node1.execute('insert into test values ({0}, {0})'.format(i)) + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [( + i, + i, + )]) + node1.execute('delete from test') + + @unittest.skipIf(pg_version_ge('10'), 'requires <10') + def test_logical_replication_fail(self): + with get_new_node() as node: + with self.assertRaises(InitNodeException): + node.init(allow_logical=True) + def test_replication_slots(self): with get_new_node() as node: node.init(allow_streaming=True).start() @@ -459,14 +560,10 @@ def test_poll_query_until(self): self.assertTrue(end_time - start_time >= 5) - # check 0 rows - with self.assertRaises(QueryException): - node.poll_query_until( - query='select * from pg_class where true = false') - # check 0 columns with self.assertRaises(QueryException): - node.poll_query_until(query='select from pg_class limit 1') + node.poll_query_until( + query='select from pg_catalog.pg_class limit 1') # check None, fail with self.assertRaises(QueryException): @@ -476,6 +573,11 @@ def test_poll_query_until(self): node.poll_query_until( query='create table def()', expected=None) # returns nothing + # check 0 rows equivalent to expected=None + node.poll_query_until( + query='select * from pg_catalog.pg_class where true = false', + expected=None) + # check arbitrary expected value, fail with self.assertRaises(TimeoutException): node.poll_query_until(