Skip to content

Replication slots #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions testgres/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def spawn_primary(self, name=None, destroy=True):

return node

def spawn_replica(self, name=None, destroy=True):
def spawn_replica(self, name=None, destroy=True, slot_name=None):
"""
Create a replica of the original node from a backup.

Expand All @@ -171,7 +171,7 @@ def spawn_replica(self, name=None, destroy=True):

# Assign it a master and a recovery file (private magic)
node._assign_master(self.original_node)
node._create_recovery_conf(username=self.username)
node._create_recovery_conf(username=self.username, slot_name=slot_name)

return node

Expand Down
3 changes: 3 additions & 0 deletions testgres/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@
PG_LOG_FILE = "postgresql.log"
UTILS_LOG_FILE = "utils.log"
BACKUP_LOG_FILE = "backup.log"

# default replication slots number
REPLICATION_SLOTS = 10
41 changes: 37 additions & 4 deletions testgres/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
RECOVERY_CONF_FILE, \
PG_LOG_FILE, \
UTILS_LOG_FILE, \
PG_PID_FILE
PG_PID_FILE, \
REPLICATION_SLOTS

from .decorators import \
method_decorator, \
Expand Down Expand Up @@ -277,7 +278,7 @@ def _assign_master(self, master):
# now this node has a master
self._master = master

def _create_recovery_conf(self, username):
def _create_recovery_conf(self, username, slot_name=None):
"""NOTE: this is a private method!"""

# fetch master of this node
Expand Down Expand Up @@ -305,6 +306,9 @@ def _create_recovery_conf(self, username):
"standby_mode=on\n"
).format(conninfo)

if slot_name:
line += "primary_slot_name={}\n".format(slot_name)

self.append_conf(RECOVERY_CONF_FILE, line)

def _maybe_start_logger(self):
Expand Down Expand Up @@ -348,6 +352,28 @@ def _collect_special_files(self):

return result

def _create_replication_slot(self, slot_name, dbname=None, username=None):
"""
Create a physical replication slot.

Args:
slot_name: slot name
dbname: database name
username: database user name
"""
rs = self.execute("select exists (select * from pg_replication_slots "
"where slot_name = '{}')".format(slot_name),
dbname=dbname, username=username)

if rs[0][0]:
raise TestgresException("Slot '{}' already exists".format(slot_name))

query = (
"select pg_create_physical_replication_slot('{}')"
).format(slot_name)

self.execute(query=query, dbname=dbname, username=username)

def init(self, initdb_params=None, **kwargs):
"""
Perform initdb for this node.
Expand Down Expand Up @@ -458,8 +484,10 @@ def get_auth_method(t):
wal_keep_segments = 20 # for convenience
conf.write(u"hot_standby = on\n"
u"max_wal_senders = {}\n"
u"max_replication_slots = {}\n"
u"wal_keep_segments = {}\n"
u"wal_level = {}\n".format(max_wal_senders,
REPLICATION_SLOTS,
wal_keep_segments,
wal_level))

Expand Down Expand Up @@ -941,7 +969,7 @@ def backup(self, **kwargs):

return NodeBackup(node=self, **kwargs)

def replicate(self, name=None, **kwargs):
def replicate(self, name=None, slot_name=None, **kwargs):
"""
Create a binary replica of this node.

Expand All @@ -952,10 +980,15 @@ def replicate(self, name=None, **kwargs):
base_dir: the base directory for data files and logs
"""

if slot_name:
self._create_replication_slot(slot_name, **kwargs)

backup = self.backup(**kwargs)

# transform backup into a replica
return backup.spawn_replica(name=name, destroy=True)
return backup.spawn_replica(name=name,
destroy=True,
slot_name=slot_name)

def catchup(self, dbname=None, username=None):
"""
Expand Down
15 changes: 15 additions & 0 deletions tests/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,21 @@ def test_replicate(self):
res = node.execute('select * from test')
self.assertListEqual(res, [])

def test_replication_slots(self):
query_create = 'create table test as select generate_series(1, 2) as val'

with get_new_node() as node:
node.init(allow_streaming=True).start()
node.execute(query_create)

with node.replicate(slot_name='slot1').start() as replica:
res = replica.execute('select * from test')
self.assertListEqual(res, [(1, ), (2, )])

# cannot create new slot with the same name
with self.assertRaises(TestgresException):
node._create_replication_slot('slot1')

def test_incorrect_catchup(self):
with get_new_node() as node:
node.init(allow_streaming=True).start()
Expand Down