diff --git a/setup.py b/setup.py index e0287659..b006c8bf 100755 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ readme = f.read() setup( - version='1.9.3', + version='1.10.0', name='testgres', packages=['testgres', 'testgres.operations', 'testgres.helpers'], description='Testing utility for PostgreSQL and its extensions', diff --git a/testgres/plugins/__init__.py b/testgres/plugins/__init__.py new file mode 100644 index 00000000..e60331f0 --- /dev/null +++ b/testgres/plugins/__init__.py @@ -0,0 +1,9 @@ +from pg_probackup2.gdb import GDBobj +from pg_probackup2.app import ProbackupApp, ProbackupException +from pg_probackup2.init_helpers import init_params +from pg_probackup2.storage.fs_backup import FSTestBackupDir +from pg_probackup2.storage.s3_backup import S3TestBackupDir + +__all__ = [ + "ProbackupApp", "ProbackupException", "init_params", "FSTestBackupDir", "S3TestBackupDir", "GDBobj" +] diff --git a/testgres/plugins/pg_probackup2/README.md b/testgres/plugins/pg_probackup2/README.md new file mode 100644 index 00000000..b62bf24b --- /dev/null +++ b/testgres/plugins/pg_probackup2/README.md @@ -0,0 +1,65 @@ +# testgres - pg_probackup2 + +Ccontrol and testing utility for [pg_probackup2](https://github.com/postgrespro/pg_probackup). Python 3.5+ is supported. + + +## Installation + +To install `testgres`, run: + +``` +pip install testgres-pg_probackup +``` + +We encourage you to use `virtualenv` for your testing environment. +The package requires testgres~=1.9.3. + +## Usage + +### Environment + +> Note: by default testgres runs `initdb`, `pg_ctl`, `psql` provided by `PATH`. + +There are several ways to specify a custom postgres installation: + +* export `PG_CONFIG` environment variable pointing to the `pg_config` executable; +* export `PG_BIN` environment variable pointing to the directory with executable files. + +Example: + +```bash +export PG_BIN=$HOME/pg/bin +python my_tests.py +``` + + +### Examples + +Here is an example of what you can do with `testgres-pg_probackup2`: + +```python +# You can see full script here plugins/pg_probackup2/pg_probackup2/tests/basic_test.py +def test_full_backup(self): + # Setting up a simple test node + node = self.pg_node.make_simple('node', pg_options={"fsync": "off", "synchronous_commit": "off"}) + + # Initialize and configure Probackup + self.pb.init() + self.pb.add_instance('node', node) + self.pb.set_archiving('node', node) + + # Start the node and initialize pgbench + node.slow_start() + node.pgbench_init(scale=100, no_vacuum=True) + + # Perform backup and validation + backup_id = self.pb.backup_node('node', node) + out = self.pb.validate('node', backup_id) + + # Check if the backup is valid + self.assertIn(f"INFO: Backup {backup_id} is valid", out) +``` + +## Authors + +[Postgres Professional](https://postgrespro.ru/about) diff --git a/testgres/plugins/pg_probackup2/__init__.py b/testgres/plugins/pg_probackup2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/__init__.py b/testgres/plugins/pg_probackup2/pg_probackup2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/app.py b/testgres/plugins/pg_probackup2/pg_probackup2/app.py new file mode 100644 index 00000000..2c31de51 --- /dev/null +++ b/testgres/plugins/pg_probackup2/pg_probackup2/app.py @@ -0,0 +1,762 @@ +import contextlib +import importlib +import json +import os +import re +import subprocess +import sys +import threading +import time +import unittest + +import testgres + +from .storage.fs_backup import TestBackupDir, FSTestBackupDir +from .gdb import GDBobj +from .init_helpers import init_params + +warning = """ +Wrong splint in show_pb +Original Header:f +{header} +Original Body: +{body} +Splitted Header +{header_split} +Splitted Body +{body_split} +""" + + +class ProbackupException(Exception): + def __init__(self, message, cmd): + self.message = message + self.cmd = cmd + + def __str__(self): + return '\n ERROR: {0}\n CMD: {1}'.format(repr(self.message), self.cmd) + + +class ProbackupApp: + + def __init__(self, test_class: unittest.TestCase, + pg_node, pb_log_path, test_env, auto_compress_alg, backup_dir): + self.test_class = test_class + self.pg_node = pg_node + self.pb_log_path = pb_log_path + self.test_env = test_env + self.auto_compress_alg = auto_compress_alg + self.backup_dir = backup_dir + self.probackup_path = init_params.probackup_path + self.probackup_old_path = init_params.probackup_old_path + self.remote = init_params.remote + self.verbose = init_params.verbose + self.archive_compress = init_params.archive_compress + self.test_class.output = None + + def run(self, command, gdb=False, old_binary=False, return_id=True, env=None, + skip_log_directory=False, expect_error=False, use_backup_dir=True): + """ + Run pg_probackup + backup_dir: target directory for making backup + command: commandline options + expect_error: option for ignoring errors and getting error message as a result of running the function + gdb: when True it returns GDBObj(), when tuple('suspend', port) it runs probackup + in suspended gdb mode with attachable gdb port, for local debugging + """ + if isinstance(use_backup_dir, TestBackupDir): + command = [command[0], *use_backup_dir.pb_args, *command[1:]] + elif use_backup_dir: + command = [command[0], *self.backup_dir.pb_args, *command[1:]] + + if not self.probackup_old_path and old_binary: + print('PGPROBACKUPBIN_OLD is not set') + exit(1) + + if old_binary: + binary_path = self.probackup_old_path + else: + binary_path = self.probackup_path + + if not env: + env = self.test_env + + strcommand = ' '.join(str(p) for p in command) + if '--log-level-file' in strcommand and \ + '--log-directory' not in strcommand and \ + not skip_log_directory: + command += ['--log-directory=' + self.pb_log_path] + strcommand += ' ' + command[-1] + + if 'pglz' in strcommand and \ + ' -j' not in strcommand and '--thread' not in strcommand: + command += ['-j', '1'] + strcommand += ' -j 1' + + self.test_class.cmd = binary_path + ' ' + strcommand + if self.verbose: + print(self.test_class.cmd) + + cmdline = [binary_path, *command] + if gdb is True: + # general test flow for using GDBObj + return GDBobj(cmdline, self.test_class) + + try: + result = None + if type(gdb) is tuple and gdb[0] == 'suspend': + # special test flow for manually debug probackup + gdb_port = gdb[1] + cmdline = ['gdbserver'] + ['localhost:' + str(gdb_port)] + cmdline + print("pg_probackup gdb suspended, waiting gdb connection on localhost:{0}".format(gdb_port)) + + self.test_class.output = subprocess.check_output( + cmdline, + stderr=subprocess.STDOUT, + env=env + ).decode('utf-8', errors='replace') + if command[0] == 'backup' and return_id: + # return backup ID + for line in self.test_class.output.splitlines(): + if 'INFO: Backup' and 'completed' in line: + result = line.split()[2] + else: + result = self.test_class.output + if expect_error is True: + assert False, f"Exception was expected, but run finished successful with result: `{result}`\n" \ + f"CMD: {self.test_class.cmd}" + elif expect_error: + assert False, f"Exception was expected {expect_error}, but run finished successful with result: `{result}`\n" \ + f"CMD: {self.test_class.cmd}" + return result + except subprocess.CalledProcessError as e: + self.test_class.output = e.output.decode('utf-8').replace("\r", "") + if expect_error: + return self.test_class.output + else: + raise ProbackupException(self.test_class.output, self.test_class.cmd) + + def init(self, options=None, old_binary=False, skip_log_directory=False, expect_error=False, use_backup_dir=True): + if options is None: + options = [] + return self.run(['init'] + options, + old_binary=old_binary, + skip_log_directory=skip_log_directory, + expect_error=expect_error, + use_backup_dir=use_backup_dir + ) + + def add_instance(self, instance, node, old_binary=False, options=None, expect_error=False): + if options is None: + options = [] + cmd = [ + 'add-instance', + '--instance={0}'.format(instance), + '-D', node.data_dir + ] + + # don`t forget to kill old_binary after remote ssh release + if self.remote and not old_binary: + options = options + [ + '--remote-proto=ssh', + '--remote-host=localhost'] + + return self.run(cmd + options, old_binary=old_binary, expect_error=expect_error) + + def set_config(self, instance, old_binary=False, options=None, expect_error=False): + if options is None: + options = [] + cmd = [ + 'set-config', + '--instance={0}'.format(instance), + ] + + return self.run(cmd + options, old_binary=old_binary, expect_error=expect_error) + + def set_backup(self, instance, backup_id=False, + old_binary=False, options=None, expect_error=False): + if options is None: + options = [] + cmd = [ + 'set-backup', + ] + + if instance: + cmd = cmd + ['--instance={0}'.format(instance)] + + if backup_id: + cmd = cmd + ['-i', backup_id] + + return self.run(cmd + options, old_binary=old_binary, expect_error=expect_error) + + def del_instance(self, instance, old_binary=False, expect_error=False): + + return self.run([ + 'del-instance', + '--instance={0}'.format(instance), + ], + old_binary=old_binary, + expect_error=expect_error + ) + + def backup_node( + self, instance, node, data_dir=False, + backup_type='full', datname=False, options=None, + gdb=False, + old_binary=False, return_id=True, no_remote=False, + env=None, + expect_error=False, + sync=False + ): + if options is None: + options = [] + if not node and not data_dir: + print('You must provide ether node or data_dir for backup') + exit(1) + + if not datname: + datname = 'postgres' + + cmd_list = [ + 'backup', + '--instance={0}'.format(instance), + # "-D", pgdata, + '-p', '%i' % node.port, + '-d', datname + ] + + if data_dir: + cmd_list += ['-D', self._node_dir(data_dir)] + + # don`t forget to kill old_binary after remote ssh release + if self.remote and not old_binary and not no_remote: + options = options + [ + '--remote-proto=ssh', + '--remote-host=localhost'] + + if self.auto_compress_alg and '--compress' in options and \ + self.archive_compress and self.archive_compress != 'zlib': + options = [o if o != '--compress' else f'--compress-algorithm={self.archive_compress}' + for o in options] + + if backup_type: + cmd_list += ['-b', backup_type] + + if not (old_binary or sync): + cmd_list += ['--no-sync'] + + return self.run(cmd_list + options, gdb, old_binary, return_id, env=env, + expect_error=expect_error) + + def backup_replica_node(self, instance, node, data_dir=False, *, + master, backup_type='full', datname=False, + options=None, env=None): + """ + Try to reliably run backup on replica by switching wal at master + at the moment pg_probackup is waiting for archived wal segment + """ + if options is None: + options = [] + assert '--stream' not in options or backup_type == 'page', \ + "backup_replica_node should be used with one of archive-mode or " \ + "page-stream mode" + + options = options.copy() + if not any('--log-level-file' in x for x in options): + options.append('--log-level-file=INFO') + + gdb = self.backup_node( + instance, node, data_dir, + backup_type=backup_type, + datname=datname, + options=options, + env=env, + gdb=True) + gdb.set_breakpoint('wait_wal_lsn') + # we need to break on wait_wal_lsn in pg_stop_backup + gdb.run_until_break() + if backup_type == 'page': + self.switch_wal_segment(master) + if '--stream' not in options: + gdb.continue_execution_until_break() + self.switch_wal_segment(master) + gdb.continue_execution_until_exit() + + output = self.read_pb_log() + self.unlink_pg_log() + parsed_output = re.compile(r'Backup \S+ completed').search(output) + assert parsed_output, f"Expected: `Backup 'backup_id' completed`, but found `{output}`" + backup_id = parsed_output[0].split(' ')[1] + return (backup_id, output) + + def checkdb_node( + self, use_backup_dir=False, instance=False, data_dir=False, + options=None, gdb=False, old_binary=False, + skip_log_directory=False, + expect_error=False + ): + if options is None: + options = [] + cmd_list = ["checkdb"] + + if instance: + cmd_list += ["--instance={0}".format(instance)] + + if data_dir: + cmd_list += ["-D", self._node_dir(data_dir)] + + return self.run(cmd_list + options, gdb, old_binary, + skip_log_directory=skip_log_directory, expect_error=expect_error, + use_backup_dir=use_backup_dir) + + def merge_backup( + self, instance, backup_id, + gdb=False, old_binary=False, options=None, expect_error=False): + if options is None: + options = [] + cmd_list = [ + 'merge', + '--instance={0}'.format(instance), + '-i', backup_id + ] + + return self.run(cmd_list + options, gdb, old_binary, expect_error=expect_error) + + def restore_node( + self, instance, node=None, restore_dir=None, + backup_id=None, old_binary=False, options=None, + gdb=False, + expect_error=False, + sync=False + ): + if options is None: + options = [] + if node: + if isinstance(node, str): + data_dir = node + else: + data_dir = node.data_dir + elif restore_dir: + data_dir = self._node_dir(restore_dir) + else: + raise ValueError("You must provide ether node or base_dir for backup") + + cmd_list = [ + 'restore', + '-D', data_dir, + '--instance={0}'.format(instance) + ] + + # don`t forget to kill old_binary after remote ssh release + if self.remote and not old_binary: + options = options + [ + '--remote-proto=ssh', + '--remote-host=localhost'] + + if backup_id: + cmd_list += ['-i', backup_id] + + if not (old_binary or sync): + cmd_list += ['--no-sync'] + + return self.run(cmd_list + options, gdb=gdb, old_binary=old_binary, expect_error=expect_error) + + def catchup_node( + self, + backup_mode, source_pgdata, destination_node, + options=None, + remote_host='localhost', + expect_error=False, + gdb=False + ): + + if options is None: + options = [] + cmd_list = [ + 'catchup', + '--backup-mode={0}'.format(backup_mode), + '--source-pgdata={0}'.format(source_pgdata), + '--destination-pgdata={0}'.format(destination_node.data_dir) + ] + if self.remote: + cmd_list += ['--remote-proto=ssh', '--remote-host=%s' % remote_host] + if self.verbose: + cmd_list += [ + '--log-level-file=VERBOSE', + '--log-directory={0}'.format(destination_node.logs_dir) + ] + + return self.run(cmd_list + options, gdb=gdb, expect_error=expect_error, use_backup_dir=False) + + def show( + self, instance=None, backup_id=None, + options=None, as_text=False, as_json=True, old_binary=False, + env=None, + expect_error=False, + gdb=False + ): + + if options is None: + options = [] + backup_list = [] + specific_record = {} + cmd_list = [ + 'show', + ] + if instance: + cmd_list += ['--instance={0}'.format(instance)] + + if backup_id: + cmd_list += ['-i', backup_id] + + # AHTUNG, WARNING will break json parsing + if as_json: + cmd_list += ['--format=json', '--log-level-console=error'] + + if as_text: + # You should print it when calling as_text=true + return self.run(cmd_list + options, old_binary=old_binary, env=env, + expect_error=expect_error, gdb=gdb) + + # get show result as list of lines + if as_json: + text_json = str(self.run(cmd_list + options, old_binary=old_binary, env=env, + expect_error=expect_error, gdb=gdb)) + try: + if expect_error: + return text_json + data = json.loads(text_json) + except ValueError: + assert False, f"Couldn't parse {text_json} as json. " \ + f"Check that you don't have additional messages inside the log or use 'as_text=True'" + + for instance_data in data: + # find specific instance if requested + if instance and instance_data['instance'] != instance: + continue + + for backup in reversed(instance_data['backups']): + # find specific backup if requested + if backup_id: + if backup['id'] == backup_id: + return backup + else: + backup_list.append(backup) + + if backup_id is not None: + assert False, "Failed to find backup with ID: {0}".format(backup_id) + + return backup_list + else: + show_splitted = self.run(cmd_list + options, old_binary=old_binary, env=env, + expect_error=expect_error).splitlines() + if instance is not None and backup_id is None: + # cut header(ID, Mode, etc) from show as single string + header = show_splitted[1:2][0] + # cut backup records from show as single list + # with string for every backup record + body = show_splitted[3:] + # inverse list so oldest record come first + body = body[::-1] + # split string in list with string for every header element + header_split = re.split(' +', header) + # Remove empty items + for i in header_split: + if i == '': + header_split.remove(i) + continue + header_split = [ + header_element.rstrip() for header_element in header_split + ] + for backup_record in body: + backup_record = backup_record.rstrip() + # split list with str for every backup record element + backup_record_split = re.split(' +', backup_record) + # Remove empty items + for i in backup_record_split: + if i == '': + backup_record_split.remove(i) + if len(header_split) != len(backup_record_split): + print(warning.format( + header=header, body=body, + header_split=header_split, + body_split=backup_record_split) + ) + exit(1) + new_dict = dict(zip(header_split, backup_record_split)) + backup_list.append(new_dict) + return backup_list + else: + # cut out empty lines and lines started with # + # and other garbage then reconstruct it as dictionary + # print show_splitted + sanitized_show = [item for item in show_splitted if item] + sanitized_show = [ + item for item in sanitized_show if not item.startswith('#') + ] + # print sanitized_show + for line in sanitized_show: + name, var = line.partition(' = ')[::2] + var = var.strip('"') + var = var.strip("'") + specific_record[name.strip()] = var + + if not specific_record: + assert False, "Failed to find backup with ID: {0}".format(backup_id) + + return specific_record + + def show_archive( + self, instance=None, options=None, + as_text=False, as_json=True, old_binary=False, + tli=0, + expect_error=False + ): + if options is None: + options = [] + cmd_list = [ + 'show', + '--archive', + ] + if instance: + cmd_list += ['--instance={0}'.format(instance)] + + # AHTUNG, WARNING will break json parsing + if as_json: + cmd_list += ['--format=json', '--log-level-console=error'] + + if as_text: + # You should print it when calling as_text=true + return self.run(cmd_list + options, old_binary=old_binary, expect_error=expect_error) + + if as_json: + if as_text: + data = self.run(cmd_list + options, old_binary=old_binary, expect_error=expect_error) + else: + data = json.loads(self.run(cmd_list + options, old_binary=old_binary, expect_error=expect_error)) + + if instance: + instance_timelines = None + for instance_name in data: + if instance_name['instance'] == instance: + instance_timelines = instance_name['timelines'] + break + + if tli > 0: + for timeline in instance_timelines: + if timeline['tli'] == tli: + return timeline + + return {} + + if instance_timelines: + return instance_timelines + + return data + else: + show_splitted = self.run(cmd_list + options, old_binary=old_binary, + expect_error=expect_error).splitlines() + print(show_splitted) + exit(1) + + def validate( + self, instance=None, backup_id=None, + options=None, old_binary=False, gdb=False, expect_error=False + ): + if options is None: + options = [] + cmd_list = [ + 'validate', + ] + if instance: + cmd_list += ['--instance={0}'.format(instance)] + if backup_id: + cmd_list += ['-i', backup_id] + + return self.run(cmd_list + options, old_binary=old_binary, gdb=gdb, + expect_error=expect_error) + + def delete( + self, instance, backup_id=None, + options=None, old_binary=False, gdb=False, expect_error=False): + if options is None: + options = [] + cmd_list = [ + 'delete', + ] + + cmd_list += ['--instance={0}'.format(instance)] + if backup_id: + cmd_list += ['-i', backup_id] + + return self.run(cmd_list + options, old_binary=old_binary, gdb=gdb, + expect_error=expect_error) + + def delete_expired( + self, instance, options=None, old_binary=False, expect_error=False): + if options is None: + options = [] + cmd_list = [ + 'delete', + '--instance={0}'.format(instance) + ] + return self.run(cmd_list + options, old_binary=old_binary, expect_error=expect_error) + + def show_config(self, instance, old_binary=False, expect_error=False, gdb=False): + out_dict = {} + cmd_list = [ + 'show-config', + '--instance={0}'.format(instance) + ] + + res = self.run(cmd_list, old_binary=old_binary, expect_error=expect_error, gdb=gdb).splitlines() + for line in res: + if not line.startswith('#'): + name, var = line.partition(' = ')[::2] + out_dict[name] = var + return out_dict + + def run_binary(self, command, asynchronous=False, env=None): + + if not env: + env = self.test_env + + if self.verbose: + print([' '.join(map(str, command))]) + try: + if asynchronous: + return subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env + ) + else: + self.test_class.output = subprocess.check_output( + command, + stderr=subprocess.STDOUT, + env=env + ).decode('utf-8') + return self.test_class.output + except subprocess.CalledProcessError as e: + raise ProbackupException(e.output.decode('utf-8'), command) + + def _node_dir(self, base_dir): + return os.path.join(self.pg_node.test_path, base_dir) + + def set_archiving( + self, instance, node, replica=False, + overwrite=False, compress=True, old_binary=False, + log_level=False, archive_timeout=False, + custom_archive_command=None): + + # parse postgresql.auto.conf + options = {} + if replica: + options['archive_mode'] = 'always' + options['hot_standby'] = 'on' + else: + options['archive_mode'] = 'on' + + if custom_archive_command is None: + archive_command = " ".join([f'"{init_params.probackup_path}"', + 'archive-push', *self.backup_dir.pb_args]) + if os.name == "nt": + archive_command = archive_command.replace("\\", "\\\\") + archive_command += f' --instance={instance}' + + # don`t forget to kill old_binary after remote ssh release + if init_params.remote and not old_binary: + archive_command += ' --remote-proto=ssh --remote-host=localhost' + + if init_params.archive_compress and compress: + archive_command += ' --compress-algorithm=' + init_params.archive_compress + + if overwrite: + archive_command += ' --overwrite' + + archive_command += ' --log-level-console=VERBOSE' + archive_command += ' -j 5' + archive_command += ' --batch-size 10' + archive_command += ' --no-sync' + + if archive_timeout: + archive_command += f' --archive-timeout={archive_timeout}' + + if os.name == 'posix': + archive_command += ' --wal-file-path=%p --wal-file-name=%f' + + elif os.name == 'nt': + archive_command += ' --wal-file-path="%p" --wal-file-name="%f"' + + if log_level: + archive_command += f' --log-level-console={log_level}' + else: # custom_archive_command is not None + archive_command = custom_archive_command + options['archive_command'] = archive_command + + node.set_auto_conf(options) + + def switch_wal_segment(self, node, sleep_seconds=1, and_tx=False): + """ + Execute pg_switch_wal() in given node + + Args: + node: an instance of PostgresNode or NodeConnection class + """ + if isinstance(node, testgres.PostgresNode): + with node.connect('postgres') as con: + if and_tx: + con.execute('select txid_current()') + lsn = con.execute('select pg_switch_wal()')[0][0] + else: + lsn = node.execute('select pg_switch_wal()')[0][0] + + if sleep_seconds > 0: + time.sleep(sleep_seconds) + return lsn + + @contextlib.contextmanager + def switch_wal_after(self, node, seconds, and_tx=True): + tm = threading.Timer(seconds, self.switch_wal_segment, [node, 0, and_tx]) + tm.start() + try: + yield + finally: + tm.cancel() + tm.join() + + def read_pb_log(self): + with open(os.path.join(self.pb_log_path, 'pg_probackup.log')) as fl: + return fl.read() + + def unlink_pg_log(self): + os.unlink(os.path.join(self.pb_log_path, 'pg_probackup.log')) + + def load_backup_class(fs_type): + fs_type = os.environ.get('PROBACKUP_FS_TYPE') + implementation = f"{__package__}.fs_backup.FSTestBackupDir" + if fs_type: + implementation = fs_type + + print("Using ", implementation) + module_name, class_name = implementation.rsplit(sep='.', maxsplit=1) + + module = importlib.import_module(module_name) + + return getattr(module, class_name) + + +# Local or S3 backup +fs_backup_class = FSTestBackupDir +if os.environ.get('PG_PROBACKUP_S3_TEST', os.environ.get('PROBACKUP_S3_TYPE_FULL_TEST')): + root = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..')) + if root not in sys.path: + sys.path.append(root) + from pg_probackup2.storage.s3_backup import S3TestBackupDir + + fs_backup_class = S3TestBackupDir + + def build_backup_dir(self, backup='backup'): + return fs_backup_class(rel_path=self.rel_path, backup=backup) diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/gdb.py b/testgres/plugins/pg_probackup2/pg_probackup2/gdb.py new file mode 100644 index 00000000..0b61da65 --- /dev/null +++ b/testgres/plugins/pg_probackup2/pg_probackup2/gdb.py @@ -0,0 +1,346 @@ +import functools +import os +import re +import subprocess +import sys +import unittest +from time import sleep + + +class GdbException(Exception): + def __init__(self, message="False"): + self.message = message + + def __str__(self): + return '\n ERROR: {0}\n'.format(repr(self.message)) + + +class GDBobj: + _gdb_enabled = False + _gdb_ok = False + _gdb_ptrace_ok = False + + def __init__(self, cmd, env, attach=False): + self.verbose = env.verbose + self.output = '' + self._did_quit = False + self.has_breakpoint = False + + # Check gdb flag is set up + if not hasattr(env, "_gdb_decorated") or not env._gdb_decorated: + raise GdbException("Test should be decorated with @needs_gdb") + if not self._gdb_enabled: + raise GdbException("No `PGPROBACKUP_GDB=on` is set.") + if not self._gdb_ok: + if not self._gdb_ptrace_ok: + raise GdbException("set /proc/sys/kernel/yama/ptrace_scope to 0" + " to run GDB tests") + raise GdbException("No gdb usage possible.") + + # Check gdb presense + try: + gdb_version, _ = subprocess.Popen( + ['gdb', '--version'], + stdout=subprocess.PIPE + ).communicate() + except OSError: + raise GdbException("Couldn't find gdb on the path") + + self.base_cmd = [ + 'gdb', + '--interpreter', + 'mi2', + ] + + if attach: + self.cmd = self.base_cmd + ['--pid'] + cmd + else: + self.cmd = self.base_cmd + ['--args'] + cmd + + # Get version + gdb_version_number = re.search( + br"^GNU gdb [^\d]*(\d+)\.(\d)", + gdb_version) + self.major_version = int(gdb_version_number.group(1)) + self.minor_version = int(gdb_version_number.group(2)) + + if self.verbose: + print([' '.join(map(str, self.cmd))]) + + self.proc = subprocess.Popen( + self.cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + bufsize=0, + text=True, + errors='replace', + ) + self.gdb_pid = self.proc.pid + + while True: + line = self.get_line() + + if 'No such process' in line: + raise GdbException(line) + + if not line.startswith('(gdb)'): + pass + else: + break + + def __del__(self): + if not self._did_quit and hasattr(self, "proc"): + try: + self.quit() + except subprocess.TimeoutExpired: + self.kill() + + def get_line(self): + line = self.proc.stdout.readline() + self.output += line + return line + + def kill(self): + self._did_quit = True + self.proc.kill() + self.proc.wait(3) + self.proc.stdin.close() + self.proc.stdout.close() + + def set_breakpoint(self, location): + + result = self._execute('break ' + location) + self.has_breakpoint = True + for line in result: + if line.startswith('~"Breakpoint'): + return + + elif line.startswith('=breakpoint-created'): + return + + elif line.startswith('^error'): # or line.startswith('(gdb)'): + break + + elif line.startswith('&"break'): + pass + + elif line.startswith('&"Function'): + raise GdbException(line) + + elif line.startswith('&"No line'): + raise GdbException(line) + + elif line.startswith('~"Make breakpoint pending on future shared'): + raise GdbException(line) + + raise GdbException( + 'Failed to set breakpoint.\n Output:\n {0}'.format(result) + ) + + def remove_all_breakpoints(self): + if not self.has_breakpoint: + return + + result = self._execute('delete') + self.has_breakpoint = False + for line in result: + + if line.startswith('^done'): + return + + raise GdbException( + 'Failed to remove breakpoints.\n Output:\n {0}'.format(result) + ) + + def run_until_break(self): + result = self._execute('run', False) + for line in result: + if line.startswith('*stopped,reason="breakpoint-hit"'): + return + raise GdbException( + 'Failed to run until breakpoint.\n' + ) + + def continue_execution_until_running(self): + result = self._execute('continue') + + for line in result: + if line.startswith('*running') or line.startswith('^running'): + return + if line.startswith('*stopped,reason="breakpoint-hit"'): + continue + if line.startswith('*stopped,reason="exited-normally"'): + continue + + raise GdbException( + 'Failed to continue execution until running.\n' + ) + + def signal(self, sig): + if 'KILL' in sig: + self.remove_all_breakpoints() + self._execute(f'signal {sig}') + + def continue_execution_until_exit(self): + self.remove_all_breakpoints() + result = self._execute('continue', False) + + for line in result: + if line.startswith('*running'): + continue + if line.startswith('*stopped,reason="breakpoint-hit"'): + continue + if line.startswith('*stopped,reason="exited') or line == '*stopped\n': + self.quit() + return + + raise GdbException( + 'Failed to continue execution until exit.\n' + ) + + def continue_execution_until_error(self): + self.remove_all_breakpoints() + result = self._execute('continue', False) + + for line in result: + if line.startswith('^error'): + return + if line.startswith('*stopped,reason="exited'): + return + if line.startswith( + '*stopped,reason="signal-received",signal-name="SIGABRT"'): + return + + raise GdbException( + 'Failed to continue execution until error.\n') + + def continue_execution_until_break(self, ignore_count=0): + if ignore_count > 0: + result = self._execute( + 'continue ' + str(ignore_count), + False + ) + else: + result = self._execute('continue', False) + + for line in result: + if line.startswith('*stopped,reason="breakpoint-hit"'): + return + if line.startswith('*stopped,reason="exited-normally"'): + break + + raise GdbException( + 'Failed to continue execution until break.\n') + + def show_backtrace(self): + return self._execute("backtrace", running=False) + + def stopped_in_breakpoint(self): + while True: + line = self.get_line() + if self.verbose: + print(line) + if line.startswith('*stopped,reason="breakpoint-hit"'): + return True + + def detach(self): + if not self._did_quit: + self._execute('detach') + + def quit(self): + if not self._did_quit: + self._did_quit = True + self.proc.terminate() + self.proc.wait(3) + self.proc.stdin.close() + self.proc.stdout.close() + + # use for breakpoint, run, continue + def _execute(self, cmd, running=True): + output = [] + self.proc.stdin.flush() + self.proc.stdin.write(cmd + '\n') + self.proc.stdin.flush() + sleep(1) + + # look for command we just send + while True: + line = self.get_line() + if self.verbose: + print(repr(line)) + + if cmd not in line: + continue + else: + break + + while True: + line = self.get_line() + output += [line] + if self.verbose: + print(repr(line)) + if line.startswith('^done') or line.startswith('*stopped'): + break + if line.startswith('^error'): + break + if running and (line.startswith('*running') or line.startswith('^running')): + # if running and line.startswith('*running'): + break + return output + + +def _set_gdb(self): + test_env = os.environ.copy() + self._gdb_enabled = test_env.get('PGPROBACKUP_GDB') == 'ON' + self._gdb_ok = self._gdb_enabled + if not self._gdb_enabled or sys.platform != 'linux': + return + try: + with open('/proc/sys/kernel/yama/ptrace_scope') as f: + ptrace = f.read() + except FileNotFoundError: + self._gdb_ptrace_ok = True + return + self._gdb_ptrace_ok = int(ptrace) == 0 + self._gdb_ok = self._gdb_ok and self._gdb_ptrace_ok + + +def _check_gdb_flag_or_skip_test(): + if not GDBobj._gdb_enabled: + return ("skip", + "Specify PGPROBACKUP_GDB and build without " + "optimizations for run this test" + ) + if GDBobj._gdb_ok: + return None + if not GDBobj._gdb_ptrace_ok: + return ("fail", "set /proc/sys/kernel/yama/ptrace_scope to 0" + " to run GDB tests") + else: + return ("fail", "use of gdb is not possible") + + +def needs_gdb(func): + check = _check_gdb_flag_or_skip_test() + if not check: + @functools.wraps(func) + def ok_wrapped(self): + self._gdb_decorated = True + func(self) + + return ok_wrapped + reason = check[1] + if check[0] == "skip": + return unittest.skip(reason)(func) + elif check[0] == "fail": + @functools.wraps(func) + def fail_wrapper(self): + self.fail(reason) + + return fail_wrapper + else: + raise "Wrong action {0}".format(check) + + +_set_gdb(GDBobj) diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py b/testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py new file mode 100644 index 00000000..7af21eb6 --- /dev/null +++ b/testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py @@ -0,0 +1,207 @@ +from functools import reduce +import getpass +import os +import re +import shutil +import subprocess +import sys +import testgres + +try: + import lz4.frame # noqa: F401 + + HAVE_LZ4 = True +except ImportError as e: + HAVE_LZ4 = False + LZ4_error = e + +try: + import zstd # noqa: F401 + + HAVE_ZSTD = True +except ImportError as e: + HAVE_ZSTD = False + ZSTD_error = e + +delete_logs = os.getenv('KEEP_LOGS') not in ['1', 'y', 'Y'] + +try: + testgres.configure_testgres( + cache_initdb=False, + cached_initdb_dir=False, + node_cleanup_full=delete_logs) +except Exception as e: + print("Can't configure testgres: {0}".format(e)) + + +class Init(object): + def __init__(self): + if '-v' in sys.argv or '--verbose' in sys.argv: + self.verbose = True + else: + self.verbose = False + + self._pg_config = testgres.get_pg_config() + self.is_enterprise = self._pg_config.get('PGPRO_EDITION', None) == 'enterprise' + self.is_shardman = self._pg_config.get('PGPRO_EDITION', None) == 'shardman' + self.is_pgpro = 'PGPRO_EDITION' in self._pg_config + self.is_nls_enabled = 'enable-nls' in self._pg_config['CONFIGURE'] + self.is_lz4_enabled = '-llz4' in self._pg_config['LIBS'] + version = self._pg_config['VERSION'].rstrip('develalphabetapre') + parts = [*version.split(' ')[1].split('.'), '0', '0'][:3] + parts[0] = re.match(r'\d+', parts[0]).group() + self.pg_config_version = reduce(lambda v, x: v * 100 + int(x), parts, 0) + + test_env = os.environ.copy() + envs_list = [ + 'LANGUAGE', + 'LC_ALL', + 'PGCONNECT_TIMEOUT', + 'PGDATA', + 'PGDATABASE', + 'PGHOSTADDR', + 'PGREQUIRESSL', + 'PGSERVICE', + 'PGSSLMODE', + 'PGUSER', + 'PGPORT', + 'PGHOST' + ] + + for e in envs_list: + test_env.pop(e, None) + + test_env['LC_MESSAGES'] = 'C' + test_env['LC_TIME'] = 'C' + self._test_env = test_env + + # Get the directory from which the script was executed + self.source_path = os.getcwd() + tmp_path = test_env.get('PGPROBACKUP_TMP_DIR') + if tmp_path and os.path.isabs(tmp_path): + self.tmp_path = tmp_path + else: + self.tmp_path = os.path.abspath( + os.path.join(self.source_path, tmp_path or os.path.join('tests', 'tmp_dirs')) + ) + + os.makedirs(self.tmp_path, exist_ok=True) + + self.username = getpass.getuser() + + self.probackup_path = None + if 'PGPROBACKUPBIN' in test_env: + if shutil.which(test_env["PGPROBACKUPBIN"]): + self.probackup_path = test_env["PGPROBACKUPBIN"] + else: + if self.verbose: + print('PGPROBACKUPBIN is not an executable file') + + if not self.probackup_path: + probackup_path_tmp = os.path.join( + testgres.get_pg_config()['BINDIR'], 'pg_probackup') + + if os.path.isfile(probackup_path_tmp): + if not os.access(probackup_path_tmp, os.X_OK): + print('{0} is not an executable file'.format( + probackup_path_tmp)) + else: + self.probackup_path = probackup_path_tmp + + if not self.probackup_path: + probackup_path_tmp = self.source_path + + if os.path.isfile(probackup_path_tmp): + if not os.access(probackup_path_tmp, os.X_OK): + print('{0} is not an executable file'.format( + probackup_path_tmp)) + else: + self.probackup_path = probackup_path_tmp + + if not self.probackup_path: + print('pg_probackup binary is not found') + exit(1) + + if os.name == 'posix': + self.EXTERNAL_DIRECTORY_DELIMITER = ':' + os.environ['PATH'] = os.path.dirname( + self.probackup_path) + ':' + os.environ['PATH'] + + elif os.name == 'nt': + self.EXTERNAL_DIRECTORY_DELIMITER = ';' + os.environ['PATH'] = os.path.dirname( + self.probackup_path) + ';' + os.environ['PATH'] + + self.probackup_old_path = None + if 'PGPROBACKUPBIN_OLD' in test_env: + if (os.path.isfile(test_env['PGPROBACKUPBIN_OLD']) and os.access(test_env['PGPROBACKUPBIN_OLD'], os.X_OK)): + self.probackup_old_path = test_env['PGPROBACKUPBIN_OLD'] + else: + if self.verbose: + print('PGPROBACKUPBIN_OLD is not an executable file') + + self.probackup_version = None + self.old_probackup_version = None + + probackup_version_output = subprocess.check_output( + [self.probackup_path, "--version"], + stderr=subprocess.STDOUT, + ).decode('utf-8') + self.probackup_version = re.search(r"\d+\.\d+\.\d+", + probackup_version_output + ).group(0) + compressions = re.search(r"\(compressions: ([^)]*)\)", + probackup_version_output).group(1) + self.probackup_compressions = {s.strip() for s in compressions.split(',')} + + if self.probackup_old_path: + old_probackup_version_output = subprocess.check_output( + [self.probackup_old_path, "--version"], + stderr=subprocess.STDOUT, + ).decode('utf-8') + self.old_probackup_version = re.search(r"\d+\.\d+\.\d+", + old_probackup_version_output + ).group(0) + + self.remote = test_env.get('PGPROBACKUP_SSH_REMOTE', None) == 'ON' + self.ptrack = test_env.get('PG_PROBACKUP_PTRACK', None) == 'ON' and self.pg_config_version >= 110000 + + self.paranoia = test_env.get('PG_PROBACKUP_PARANOIA', None) == 'ON' + env_compress = test_env.get('ARCHIVE_COMPRESSION', None) + if env_compress: + env_compress = env_compress.lower() + if env_compress in ('on', 'zlib'): + self.compress_suffix = '.gz' + self.archive_compress = 'zlib' + elif env_compress == 'lz4': + if not HAVE_LZ4: + raise LZ4_error + if 'lz4' not in self.probackup_compressions: + raise Exception("pg_probackup is not compiled with lz4 support") + self.compress_suffix = '.lz4' + self.archive_compress = 'lz4' + elif env_compress == 'zstd': + if not HAVE_ZSTD: + raise ZSTD_error + if 'zstd' not in self.probackup_compressions: + raise Exception("pg_probackup is not compiled with zstd support") + self.compress_suffix = '.zst' + self.archive_compress = 'zstd' + else: + self.compress_suffix = '' + self.archive_compress = False + + cfs_compress = test_env.get('PG_PROBACKUP_CFS_COMPRESS', None) + if cfs_compress: + self.cfs_compress = cfs_compress.lower() + else: + self.cfs_compress = self.archive_compress + + os.environ["PGAPPNAME"] = "pg_probackup" + self.delete_logs = delete_logs + + def test_env(self): + return self._test_env.copy() + + +init_params = Init() diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/storage/__init__.py b/testgres/plugins/pg_probackup2/pg_probackup2/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/storage/fs_backup.py b/testgres/plugins/pg_probackup2/pg_probackup2/storage/fs_backup.py new file mode 100644 index 00000000..d076432a --- /dev/null +++ b/testgres/plugins/pg_probackup2/pg_probackup2/storage/fs_backup.py @@ -0,0 +1,101 @@ +""" +Utilities for accessing pg_probackup backup data on file system. +""" +import os +import shutil + +from ..init_helpers import init_params + + +class TestBackupDir: + + def list_instance_backups(self, instance): + raise NotImplementedError() + + def list_files(self, sub_dir, recursive=False): + raise NotImplementedError() + + def list_dirs(self, sub_dir): + raise NotImplementedError() + + def read_file(self, sub_path, *, text=True): + raise NotImplementedError() + + def write_file(self, sub_path, data, *, text=True): + raise NotImplementedError() + + def cleanup(self): + raise NotImplementedError() + + def remove_file(self, sub_path): + raise NotImplementedError() + + def remove_dir(self, sub_path): + raise NotImplementedError() + + def exists(self, sub_path): + raise NotImplementedError() + + +class FSTestBackupDir(TestBackupDir): + is_file_based = True + + """ Backup directory. Usually created by running pg_probackup init -B """ + + def __init__(self, *, rel_path, backup): + self.path = os.path.join(init_params.tmp_path, rel_path, backup) + self.pb_args = ('-B', self.path) + + def list_instance_backups(self, instance): + full_path = os.path.join(self.path, 'backups', instance) + return sorted((x for x in os.listdir(full_path) + if os.path.isfile(os.path.join(full_path, x, 'backup.control')))) + + def list_files(self, sub_dir, recursive=False): + full_path = os.path.join(self.path, sub_dir) + if not recursive: + return [f for f in os.listdir(full_path) + if os.path.isfile(os.path.join(full_path, f))] + files = [] + for rootdir, dirs, files_in_dir in os.walk(full_path): + rootdir = rootdir[len(self.path) + 1:] + files.extend(os.path.join(rootdir, file) for file in files_in_dir) + return files + + def list_dirs(self, sub_dir): + full_path = os.path.join(self.path, sub_dir) + return [f for f in os.listdir(full_path) + if os.path.isdir(os.path.join(full_path, f))] + + def read_file(self, sub_path, *, text=True): + full_path = os.path.join(self.path, sub_path) + with open(full_path, 'r' if text else 'rb') as fin: + return fin.read() + + def write_file(self, sub_path, data, *, text=True): + full_path = os.path.join(self.path, sub_path) + with open(full_path, 'w' if text else 'wb') as fout: + fout.write(data) + + def cleanup(self): + shutil.rmtree(self.path, ignore_errors=True) + + def remove_file(self, sub_path): + os.remove(os.path.join(self.path, sub_path)) + + def remove_dir(self, sub_path): + full_path = os.path.join(self.path, sub_path) + shutil.rmtree(full_path, ignore_errors=True) + + def exists(self, sub_path): + full_path = os.path.join(self.path, sub_path) + return os.path.exists(full_path) + + def __str__(self): + return self.path + + def __repr__(self): + return "FSTestBackupDir" + str(self.path) + + def __fspath__(self): + return self.path diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/storage/s3_backup.py b/testgres/plugins/pg_probackup2/pg_probackup2/storage/s3_backup.py new file mode 100644 index 00000000..c6b764fb --- /dev/null +++ b/testgres/plugins/pg_probackup2/pg_probackup2/storage/s3_backup.py @@ -0,0 +1,134 @@ +import os +import io +import sys + +import minio +from minio import Minio +from minio.deleteobjects import DeleteObject +import urllib3 +from .fs_backup import TestBackupDir + +root = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..')) +if root not in sys.path: + sys.path.append(root) + +# Should fail if either of env vars does not exist +host = os.environ['PG_PROBACKUP_S3_HOST'] +port = os.environ['PG_PROBACKUP_S3_PORT'] +access = os.environ['PG_PROBACKUP_S3_ACCESS_KEY'] +secret = os.environ['PG_PROBACKUP_S3_SECRET_ACCESS_KEY'] +bucket = os.environ['PG_PROBACKUP_S3_BUCKET_NAME'] +path_suffix = os.environ.get("PG_PROBACKUP_TEST_TMP_SUFFIX") +https = os.environ.get("PG_PROBACKUP_S3_HTTPS") + +s3_type = os.environ.get('PG_PROBACKUP_S3_TEST', os.environ.get('PROBACKUP_S3_TYPE_FULL_TEST')) +tmp_path = os.environ.get('PGPROBACKUP_TMP_DIR', default='') + +status_forcelist = [413, # RequestBodyTooLarge + 429, # TooManyRequests + 500, # InternalError + 503, # ServerBusy + ] + + +class S3TestBackupDir(TestBackupDir): + is_file_based = False + + def __init__(self, *, rel_path, backup): + path = "pg_probackup" + if path_suffix: + path += "_" + path_suffix + if tmp_path == '' or os.path.isabs(tmp_path): + self.path = f"{path}{tmp_path}/{rel_path}/{backup}" + else: + self.path = f"{path}/{tmp_path}/{rel_path}/{backup}" + + secure: bool = False + if https in ['ON', 'HTTPS']: + secure = True + self.conn = Minio(host + ":" + port, secure=secure, access_key=access, + secret_key=secret, http_client=urllib3.PoolManager(retries=urllib3.Retry(total=5, + backoff_factor=1, + status_forcelist=status_forcelist))) + if not self.conn.bucket_exists(bucket): + raise Exception(f"Test bucket {bucket} does not exist.") + self.pb_args = ('-B', '/' + self.path, f'--s3={s3_type}') + return + + def list_instance_backups(self, instance): + full_path = os.path.join(self.path, 'backups', instance) + candidates = self.conn.list_objects(bucket, prefix=full_path, recursive=True) + return [os.path.basename(os.path.dirname(x.object_name)) + for x in candidates if x.object_name.endswith('backup.control')] + + def list_files(self, sub_dir, recursive=False): + full_path = os.path.join(self.path, sub_dir) + # Need '/' in the end to find inside the folder + full_path_dir = full_path if full_path[-1] == '/' else full_path + '/' + object_list = self.conn.list_objects(bucket, prefix=full_path_dir, recursive=recursive) + return [obj.object_name.replace(full_path_dir, '', 1) + for obj in object_list + if not obj.is_dir] + + def list_dirs(self, sub_dir): + full_path = os.path.join(self.path, sub_dir) + # Need '/' in the end to find inside the folder + full_path_dir = full_path if full_path[-1] == '/' else full_path + '/' + object_list = self.conn.list_objects(bucket, prefix=full_path_dir, recursive=False) + return [obj.object_name.replace(full_path_dir, '', 1).rstrip('\\/') + for obj in object_list + if obj.is_dir] + + def read_file(self, sub_path, *, text=True): + full_path = os.path.join(self.path, sub_path) + bytes = self.conn.get_object(bucket, full_path).read() + if not text: + return bytes + return bytes.decode('utf-8') + + def write_file(self, sub_path, data, *, text=True): + full_path = os.path.join(self.path, sub_path) + if text: + data = data.encode('utf-8') + self.conn.put_object(bucket, full_path, io.BytesIO(data), length=len(data)) + + def cleanup(self): + self.remove_dir('') + + def remove_file(self, sub_path): + full_path = os.path.join(self.path, sub_path) + self.conn.remove_object(bucket, full_path) + + def remove_dir(self, sub_path): + if sub_path: + full_path = os.path.join(self.path, sub_path) + else: + full_path = self.path + objs = self.conn.list_objects(bucket, prefix=full_path, recursive=True, + include_version=True) + delobjs = (DeleteObject(o.object_name, o.version_id) for o in objs) + errs = list(self.conn.remove_objects(bucket, delobjs)) + if errs: + strerrs = "; ".join(str(err) for err in errs) + raise Exception("There were errors: {0}".format(strerrs)) + + def exists(self, sub_path): + full_path = os.path.join(self.path, sub_path) + try: + self.conn.stat_object(bucket, full_path) + return True + except minio.error.S3Error as s3err: + if s3err.code == 'NoSuchKey': + return False + raise s3err + except Exception as err: + raise err + + def __str__(self): + return '/' + self.path + + def __repr__(self): + return "S3TestBackupDir" + str(self.path) + + def __fspath__(self): + return self.path diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/tests/basic_test.py b/testgres/plugins/pg_probackup2/pg_probackup2/tests/basic_test.py new file mode 100644 index 00000000..f5a82d38 --- /dev/null +++ b/testgres/plugins/pg_probackup2/pg_probackup2/tests/basic_test.py @@ -0,0 +1,79 @@ +import os +import shutil +import unittest +import testgres +from pg_probackup2.app import ProbackupApp +from pg_probackup2.init_helpers import Init, init_params +from pg_probackup2.app import build_backup_dir + + +class TestUtils: + @staticmethod + def get_module_and_function_name(test_id): + try: + module_name = test_id.split('.')[-2] + fname = test_id.split('.')[-1] + except IndexError: + print(f"Couldn't get module name and function name from test_id: `{test_id}`") + module_name, fname = test_id.split('(')[1].split('.')[1], test_id.split('(')[0] + return module_name, fname + + +class ProbackupTest(unittest.TestCase): + def setUp(self): + self.setup_test_environment() + self.setup_test_paths() + self.setup_backup_dir() + self.setup_probackup() + + def setup_test_environment(self): + self.output = None + self.cmd = None + self.nodes_to_cleanup = [] + self.module_name, self.fname = TestUtils.get_module_and_function_name(self.id()) + self.test_env = Init().test_env() + + def setup_test_paths(self): + self.rel_path = os.path.join(self.module_name, self.fname) + self.test_path = os.path.join(init_params.tmp_path, self.rel_path) + os.makedirs(self.test_path) + self.pb_log_path = os.path.join(self.test_path, "pb_log") + + def setup_backup_dir(self): + self.backup_dir = build_backup_dir(self, 'backup') + self.backup_dir.cleanup() + + def setup_probackup(self): + self.pg_node = testgres.NodeApp(self.test_path, self.nodes_to_cleanup) + self.pb = ProbackupApp(self, self.pg_node, self.pb_log_path, self.test_env, + auto_compress_alg='zlib', backup_dir=self.backup_dir) + + def tearDown(self): + if os.path.exists(self.test_path): + shutil.rmtree(self.test_path) + + +class BasicTest(ProbackupTest): + def test_full_backup(self): + # Setting up a simple test node + node = self.pg_node.make_simple('node', pg_options={"fsync": "off", "synchronous_commit": "off"}) + + # Initialize and configure Probackup + self.pb.init() + self.pb.add_instance('node', node) + self.pb.set_archiving('node', node) + + # Start the node and initialize pgbench + node.slow_start() + node.pgbench_init(scale=100, no_vacuum=True) + + # Perform backup and validation + backup_id = self.pb.backup_node('node', node) + out = self.pb.validate('node', backup_id) + + # Check if the backup is valid + self.assertIn(f"INFO: Backup {backup_id} is valid", out) + + +if __name__ == "__main__": + unittest.main() diff --git a/testgres/plugins/pg_probackup2/setup.py b/testgres/plugins/pg_probackup2/setup.py new file mode 100644 index 00000000..371eb078 --- /dev/null +++ b/testgres/plugins/pg_probackup2/setup.py @@ -0,0 +1,18 @@ +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + +setup( + version='0.0.1', + name='testgres_pg_probackup2', + packages=['pg_probackup2', 'pg_probackup2.storage'], + description='Plugin for testgres that manages pg_probackup2', + url='https://github.com/postgrespro/testgres', + long_description_content_type='text/markdown', + license='PostgreSQL', + author='Postgres Professional', + author_email='testgres@postgrespro.ru', + keywords=['pg_probackup', 'testing', 'testgres'], + install_requires=['testgres>=1.9.2'] +)