-
Notifications
You must be signed in to change notification settings - Fork 35
Logical replication support #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
ca5b546
bb01c7d
782484b
f8b95c6
b1cba73
954879a
0741c70
f4e0bd0
f48623b
138c6cc
f652bf4
bc1002f
08ed6ef
4b279ef
d60cdcb
50e02ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,10 +54,13 @@ | |
QueryException, \ | ||
StartNodeException, \ | ||
TimeoutException, \ | ||
InitNodeException, \ | ||
TestgresException | ||
|
||
from .logger import TestgresLogger | ||
|
||
from .pubsub import Publication, Subscription | ||
|
||
from .utils import \ | ||
eprint, \ | ||
get_bin_path, \ | ||
|
@@ -66,6 +69,7 @@ | |
reserve_port, \ | ||
release_port, \ | ||
execute_utility, \ | ||
options_string, \ | ||
clean_on_error | ||
|
||
from .backup import NodeBackup | ||
|
@@ -296,24 +300,24 @@ def _create_recovery_conf(self, username, slot=None): | |
master = self.master | ||
assert master is not None | ||
|
||
conninfo = ( | ||
u"application_name={} " | ||
u"port={} " | ||
u"user={} " | ||
).format(self.name, master.port, username) # yapf: disable | ||
conninfo = { | ||
"application_name": self.name, | ||
"port": master.port, | ||
"user": username | ||
} # yapf: disable | ||
|
||
# host is tricky | ||
try: | ||
import ipaddress | ||
ipaddress.ip_address(master.host) | ||
conninfo += u"hostaddr={}".format(master.host) | ||
conninfo["hostaddr"] = master.host | ||
except ValueError: | ||
conninfo += u"host={}".format(master.host) | ||
conninfo["host"] = master.host | ||
|
||
line = ( | ||
"primary_conninfo='{}'\n" | ||
"standby_mode=on\n" | ||
).format(conninfo) # yapf: disable | ||
).format(options_string(**conninfo)) # yapf: disable | ||
|
||
if slot: | ||
# Connect to master for some additional actions | ||
|
@@ -409,6 +413,7 @@ def default_conf(self, | |
fsync=False, | ||
unix_sockets=True, | ||
allow_streaming=True, | ||
allow_logical=False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we enable this by default? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because it is not supported on postgres versions below 10 and there is specific message when someone's trying to enable this feature on those versions. Besides it produces extra WAL data and hence could work slightly slower. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, i see. |
||
log_statement='all'): | ||
""" | ||
Apply default settings to this node. | ||
|
@@ -417,6 +422,7 @@ def default_conf(self, | |
fsync: should this node use fsync to keep data safe? | ||
unix_sockets: should we enable UNIX sockets? | ||
allow_streaming: should this node add a hba entry for replication? | ||
allow_logical: can this node be used as a logical replication publisher? | ||
log_statement: one of ('all', 'off', 'mod', 'ddl'). | ||
|
||
Returns: | ||
|
@@ -493,6 +499,13 @@ def get_auth_method(t): | |
WAL_KEEP_SEGMENTS, | ||
wal_level)) # yapf: disable | ||
|
||
if allow_logical: | ||
if not pg_version_ge('10'): | ||
raise InitNodeException( | ||
"Logical replication is only available for Postgres 10 " | ||
"and newer") | ||
conf.write(u"wal_level = logical\n") | ||
|
||
# disable UNIX sockets if asked to | ||
if not unix_sockets: | ||
conf.write(u"unix_socket_directories = ''\n") | ||
|
@@ -894,13 +907,14 @@ def poll_query_until(self, | |
if res is None: | ||
raise QueryException('Query returned None', query) | ||
|
||
if len(res) == 0: | ||
raise QueryException('Query returned 0 rows', query) | ||
|
||
if len(res[0]) == 0: | ||
raise QueryException('Query returned 0 columns', query) | ||
|
||
if res[0][0] == expected: | ||
# result set is not empty | ||
if len(res): | ||
if len(res[0]) == 0: | ||
raise QueryException('Query returned 0 columns', query) | ||
if res[0][0] == expected: | ||
return # done | ||
# empty result set is considered as None | ||
elif expected is None: | ||
return # done | ||
|
||
except ProgrammingError as e: | ||
|
@@ -1009,6 +1023,37 @@ def catchup(self, dbname=None, username=None): | |
except Exception as e: | ||
raise_from(CatchUpException("Failed to catch up", poll_lsn), e) | ||
|
||
def publish(self, name, **kwargs): | ||
""" | ||
Create publication for logical replication | ||
|
||
Args: | ||
pubname: publication name | ||
tables: tables names list | ||
dbname: database name where objects or interest are located | ||
username: replication username | ||
""" | ||
return Publication(name=name, node=self, **kwargs) | ||
|
||
def subscribe(self, publication, name, dbname=None, username=None, | ||
**params): | ||
""" | ||
Create subscription for logical replication | ||
|
||
Args: | ||
name: subscription name | ||
publication: publication object obtained from publish() | ||
dbname: database name | ||
username: replication username | ||
params: subscription parameters (see documentation on `CREATE SUBSCRIPTION | ||
<https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_ | ||
for details) | ||
""" | ||
# yapf: disable | ||
return Subscription(name=name, node=self, publication=publication, | ||
dbname=dbname, username=username, **params) | ||
# yapf: enable | ||
|
||
def pgbench(self, | ||
dbname=None, | ||
username=None, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
# coding: utf-8 | ||
""" | ||
Unlike physical replication the logical replication allows users replicate only | ||
specified databases and tables. It uses publish-subscribe model with possibly | ||
multiple publishers and multiple subscribers. When initializing publisher's | ||
node ``allow_logical=True`` should be passed to the :meth:`.PostgresNode.init()` | ||
method to enable PostgreSQL to write extra information to the WAL needed by | ||
logical replication. | ||
|
||
To replicate table ``X`` from node A to node B the same table structure should | ||
be defined on the subscriber's node as logical replication don't replicate DDL. | ||
After that :meth:`~.PostgresNode.publish()` and :meth:`~.PostgresNode.subscribe()` | ||
methods may be used to setup replication. Example: | ||
|
||
>>> from .api import get_new_node | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be changed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree |
||
>>> with get_new_node() as nodeA, get_new_node() as nodeB: | ||
... nodeA.init(allow_logical=True).start() | ||
... nodeB.init().start() | ||
... | ||
... # create same table both on publisher and subscriber | ||
... create_table = 'create table test (a int, b int)' | ||
... nodeA.safe_psql(create_table) | ||
... nodeB.safe_psql(create_table) | ||
... | ||
... # create publication | ||
... pub = nodeA.publish('mypub') | ||
... # create subscription | ||
... sub = nodeB.subscribe(pub, 'mysub') | ||
... | ||
... # insert some data to the publisher's node | ||
... nodeA.execute('insert into test values (1, 1), (2, 2)') | ||
... | ||
... # wait until changes apply on subscriber and check them | ||
... sub.catchup() | ||
... | ||
... # read the data from subscriber's node | ||
... nodeB.execute('select * from test') | ||
PostgresNode(name='...', port=..., base_dir='...') | ||
PostgresNode(name='...', port=..., base_dir='...') | ||
'' | ||
'' | ||
[(1, 1), (2, 2)] | ||
""" | ||
|
||
from six import raise_from | ||
|
||
from .defaults import default_dbname, default_username | ||
from .exceptions import CatchUpException | ||
from .utils import options_string | ||
|
||
|
||
class Publication(object): | ||
def __init__(self, name, node, tables=None, dbname=None, username=None): | ||
""" | ||
Constructor. Use :meth:`.PostgresNode.publish()` instead of direct | ||
constructing publication objects. | ||
|
||
Args: | ||
name: publication name | ||
node: publisher's node | ||
tables: tables list or None for all tables | ||
dbname: database name used to connect and perform subscription | ||
username: username used to connect to the database | ||
""" | ||
self.name = name | ||
self.node = node | ||
self.dbname = dbname or default_dbname() | ||
self.username = username or default_username() | ||
|
||
# create publication in database | ||
t = "table " + ", ".join(tables) if tables else "all tables" | ||
query = "create publication {} for {}" | ||
node.safe_psql(query.format(name, t), dbname=dbname, username=username) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All occurrences of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. However I had to refactor |
||
|
||
def drop(self, dbname=None, username=None): | ||
""" | ||
Drop publication | ||
""" | ||
self.node.safe_psql( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we replace There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I overlooked it |
||
"drop publication {}".format(self.name), | ||
dbname=dbname, | ||
username=username) | ||
|
||
def add_tables(self, tables, dbname=None, username=None): | ||
""" | ||
Add tables to the publication. Cannot be used if publication was | ||
created with empty tables list. | ||
|
||
Args: | ||
tables: a list of tables to be added to the publication | ||
""" | ||
if not tables: | ||
raise ValueError("Tables list is empty") | ||
|
||
query = "alter publication {} add table {}" | ||
self.node.safe_psql( | ||
query.format(self.name, ", ".join(tables)), | ||
dbname=dbname or self.dbname, | ||
username=username or self.username) | ||
|
||
|
||
class Subscription(object): | ||
def __init__(self, | ||
name, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to place |
||
node, | ||
publication, | ||
dbname=None, | ||
username=None, | ||
**params): | ||
""" | ||
Constructor. Use :meth:`.PostgresNode.subscribe()` instead of direct | ||
constructing subscription objects. | ||
|
||
Args: | ||
name: subscription name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've also noticed that some doc strings end with commas while others don't. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
node: subscriber's node | ||
publication: :class:`.Publication` object we are subscribing to | ||
(see :meth:`.PostgresNode.publish()`) | ||
dbname: database name used to connect and perform subscription | ||
username: username used to connect to the database | ||
params: subscription parameters (see documentation on `CREATE SUBSCRIPTION | ||
<https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_ | ||
for details) | ||
""" | ||
self.name = name | ||
self.node = node | ||
self.pub = publication | ||
|
||
# connection info | ||
conninfo = { | ||
"dbname": self.pub.dbname, | ||
"user": self.pub.username, | ||
"host": self.pub.node.host, | ||
"port": self.pub.node.port | ||
} | ||
|
||
query = ( | ||
"create subscription {} connection '{}' publication {}").format( | ||
name, options_string(**conninfo), self.pub.name) | ||
|
||
# additional parameters | ||
if params: | ||
query += " with ({})".format(options_string(**params)) | ||
|
||
node.safe_psql(query, dbname=dbname, username=username) | ||
|
||
def disable(self, dbname=None, username=None): | ||
""" | ||
Disables the running subscription. | ||
""" | ||
query = "alter subscription {} disable" | ||
self.node.safe_psql(query.format(self.name), dbname=None, username=None) | ||
|
||
def enable(self, dbname=None, username=None): | ||
""" | ||
Enables the previously disabled subscription. | ||
""" | ||
query = "alter subscription {} enable" | ||
self.node.safe_psql(query.format(self.name), dbname=None, username=None) | ||
|
||
def refresh(self, copy_data=True, dbname=None, username=None): | ||
""" | ||
Disables the running subscription. | ||
""" | ||
query = "alter subscription {} refresh publication with (copy_data={})" | ||
self.node.safe_psql( | ||
query.format(self.name, copy_data), | ||
dbname=dbname, | ||
username=username) | ||
|
||
def drop(self, dbname=None, username=None): | ||
""" | ||
Drops subscription | ||
""" | ||
self.node.safe_psql( | ||
"drop subscription {}".format(self.name), | ||
dbname=dbname, | ||
username=username) | ||
|
||
def catchup(self, username=None): | ||
""" | ||
Wait until subscription catches up with publication. | ||
|
||
Args: | ||
username: remote node's user name | ||
""" | ||
query = ( | ||
"select pg_current_wal_lsn() - replay_lsn = 0 " | ||
"from pg_stat_replication where application_name = '{}'").format( | ||
self.name) | ||
|
||
try: | ||
# wait until this LSN reaches subscriber | ||
self.pub.node.poll_query_until( | ||
query=query, | ||
dbname=self.pub.dbname, | ||
username=username or self.pub.username, | ||
max_attempts=60) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I don't like the hard-coded number. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be a parameter here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, added |
||
except Exception as e: | ||
raise_from(CatchUpException("Failed to catch up", query), e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was when I tested with doctest. But now when I ran doctest it showed
[(3,)]
so I am confused :) Will change it back then as it shouldn't be part of this pull request anyway.