-
Notifications
You must be signed in to change notification settings - Fork 35
Logical replication support #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
ca5b546
bb01c7d
782484b
f8b95c6
b1cba73
954879a
0741c70
f4e0bd0
f48623b
138c6cc
f652bf4
bc1002f
08ed6ef
4b279ef
d60cdcb
50e02ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,11 +57,14 @@ | |
QueryException, \ | ||
StartNodeException, \ | ||
TimeoutException, \ | ||
InitNodeException, \ | ||
TestgresException, \ | ||
BackupException | ||
|
||
from .logger import TestgresLogger | ||
|
||
from .pubsub import Publication, Subscription | ||
|
||
from .utils import \ | ||
eprint, \ | ||
get_bin_path, \ | ||
|
@@ -70,6 +73,7 @@ | |
reserve_port, \ | ||
release_port, \ | ||
execute_utility, \ | ||
options_string, \ | ||
clean_on_error | ||
|
||
from .backup import NodeBackup | ||
|
@@ -300,24 +304,24 @@ def _create_recovery_conf(self, username, slot=None): | |
master = self.master | ||
assert master is not None | ||
|
||
conninfo = ( | ||
u"application_name={} " | ||
u"port={} " | ||
u"user={} " | ||
).format(self.name, master.port, username) # yapf: disable | ||
conninfo = { | ||
"application_name": self.name, | ||
"port": master.port, | ||
"user": username | ||
} # yapf: disable | ||
|
||
# host is tricky | ||
try: | ||
import ipaddress | ||
ipaddress.ip_address(master.host) | ||
conninfo += u"hostaddr={}".format(master.host) | ||
conninfo["hostaddr"] = master.host | ||
except ValueError: | ||
conninfo += u"host={}".format(master.host) | ||
conninfo["host"] = master.host | ||
|
||
line = ( | ||
"primary_conninfo='{}'\n" | ||
"standby_mode=on\n" | ||
).format(conninfo) # yapf: disable | ||
).format(options_string(**conninfo)) # yapf: disable | ||
|
||
if slot: | ||
# Connect to master for some additional actions | ||
|
@@ -413,6 +417,7 @@ def default_conf(self, | |
fsync=False, | ||
unix_sockets=True, | ||
allow_streaming=True, | ||
allow_logical=False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we enable this by default? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because it is not supported on postgres versions below 10 and there is specific message when someone's trying to enable this feature on those versions. Besides it produces extra WAL data and hence could work slightly slower. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, i see. |
||
log_statement='all'): | ||
""" | ||
Apply default settings to this node. | ||
|
@@ -421,6 +426,7 @@ def default_conf(self, | |
fsync: should this node use fsync to keep data safe? | ||
unix_sockets: should we enable UNIX sockets? | ||
allow_streaming: should this node add a hba entry for replication? | ||
allow_logical: can this node be used as a logical replication publisher? | ||
log_statement: one of ('all', 'off', 'mod', 'ddl'). | ||
|
||
Returns: | ||
|
@@ -497,6 +503,13 @@ def get_auth_method(t): | |
WAL_KEEP_SEGMENTS, | ||
wal_level)) # yapf: disable | ||
|
||
if allow_logical: | ||
if not pg_version_ge('10'): | ||
raise InitNodeException( | ||
"Logical replication is only available for Postgres 10 " | ||
"and newer") | ||
conf.write(u"wal_level = logical\n") | ||
|
||
# disable UNIX sockets if asked to | ||
if not unix_sockets: | ||
conf.write(u"unix_socket_directories = ''\n") | ||
|
@@ -937,13 +950,14 @@ def poll_query_until(self, | |
if res is None: | ||
raise QueryException('Query returned None', query) | ||
|
||
if len(res) == 0: | ||
raise QueryException('Query returned 0 rows', query) | ||
|
||
if len(res[0]) == 0: | ||
raise QueryException('Query returned 0 columns', query) | ||
|
||
if res[0][0] == expected: | ||
# result set is not empty | ||
if len(res): | ||
if len(res[0]) == 0: | ||
raise QueryException('Query returned 0 columns', query) | ||
if res[0][0] == expected: | ||
return # done | ||
# empty result set is considered as None | ||
elif expected is None: | ||
return # done | ||
|
||
except ProgrammingError as e: | ||
|
@@ -982,13 +996,11 @@ def execute(self, | |
|
||
with self.connect(dbname=dbname, | ||
username=username, | ||
password=password) as node_con: # yapf: disable | ||
password=password, | ||
autocommit=commit) as node_con: # yapf: disable | ||
|
||
res = node_con.execute(query) | ||
|
||
if commit: | ||
node_con.commit() | ||
|
||
return res | ||
|
||
def backup(self, **kwargs): | ||
|
@@ -1052,6 +1064,37 @@ def catchup(self, dbname=None, username=None): | |
except Exception as e: | ||
raise_from(CatchUpException("Failed to catch up", poll_lsn), e) | ||
|
||
def publish(self, name, **kwargs): | ||
""" | ||
Create publication for logical replication | ||
|
||
Args: | ||
pubname: publication name | ||
tables: tables names list | ||
dbname: database name where objects or interest are located | ||
username: replication username | ||
""" | ||
return Publication(name=name, node=self, **kwargs) | ||
|
||
def subscribe(self, publication, name, dbname=None, username=None, | ||
**params): | ||
""" | ||
Create subscription for logical replication | ||
|
||
Args: | ||
name: subscription name | ||
publication: publication object obtained from publish() | ||
dbname: database name | ||
username: replication username | ||
params: subscription parameters (see documentation on `CREATE SUBSCRIPTION | ||
<https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_ | ||
for details) | ||
""" | ||
# yapf: disable | ||
return Subscription(name=name, node=self, publication=publication, | ||
dbname=dbname, username=username, **params) | ||
# yapf: enable | ||
|
||
def pgbench(self, | ||
dbname=None, | ||
username=None, | ||
|
@@ -1150,14 +1193,21 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs): | |
|
||
return execute_utility(_params, self.utils_log_file) | ||
|
||
def connect(self, dbname=None, username=None, password=None): | ||
def connect(self, | ||
dbname=None, | ||
username=None, | ||
password=None, | ||
autocommit=False): | ||
""" | ||
Connect to a database. | ||
|
||
Args: | ||
dbname: database name to connect to. | ||
username: database user name. | ||
password: user's password. | ||
autocommit: commit each statement automatically. Also it should be | ||
set to `True` for statements requiring to be run outside | ||
a transaction? such as `VACUUM` or `CREATE DATABASE`. | ||
|
||
Returns: | ||
An instance of :class:`.NodeConnection`. | ||
|
@@ -1166,4 +1216,5 @@ def connect(self, dbname=None, username=None, password=None): | |
return NodeConnection(node=self, | ||
dbname=dbname, | ||
username=username, | ||
password=password) # yapf: disable | ||
password=password, | ||
autocommit=autocommit) # yapf: disable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was when I tested with doctest. But now when I ran doctest it showed
[(3,)]
so I am confused :) Will change it back then as it shouldn't be part of this pull request anyway.