Skip to content

Commit a388f23

Browse files
author
vshepard
committed
Move s3_backup.py
1 parent de4c84e commit a388f23

File tree

3 files changed

+146
-2
lines changed

3 files changed

+146
-2
lines changed

testgres/plugins/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
from pg_probackup2.app import ProbackupApp, ProbackupException
33
from pg_probackup2.init_helpers import init_params
44
from pg_probackup2.storage.fs_backup import FSTestBackupDir
5+
from pg_probackup2.storage.s3_backup import S3TestBackupDir
56

67

78
__all__ = [
8-
"ProbackupApp", "ProbackupException", "init_params", "FSTestBackupDir", "GDBobj"
9+
"ProbackupApp", "ProbackupException", "init_params", "FSTestBackupDir", "S3TestBackupDir", "GDBobj"
910
]

testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ def __init__(self):
4343

4444
self._pg_config = testgres.get_pg_config()
4545
self.is_enterprise = self._pg_config.get('PGPRO_EDITION', None) == 'enterprise'
46+
self.is_shardman = self._pg_config.get('PGPRO_EDITION', None) == 'shardman'
4647
self.is_pgpro = 'PGPRO_EDITION' in self._pg_config
4748
self.is_nls_enabled = 'enable-nls' in self._pg_config['CONFIGURE']
49+
self.is_lz4_enabled = '-llz4' in self._pg_config['LIBS']
4850
version = self._pg_config['VERSION'].rstrip('develalphabetapre')
4951
parts = [*version.split(' ')[1].split('.'), '0', '0'][:3]
5052
parts[0] = re.match(r'\d+', parts[0]).group()
@@ -80,7 +82,7 @@ def __init__(self):
8082
self.tmp_path = tmp_path
8183
else:
8284
self.tmp_path = os.path.abspath(
83-
os.path.join(self.source_path, tmp_path or 'tmp_dirs')
85+
os.path.join(self.source_path, tmp_path or os.path.join('tests', 'tmp_dirs'))
8486
)
8587

8688
os.makedirs(self.tmp_path, exist_ok=True)
@@ -189,6 +191,12 @@ def __init__(self):
189191
self.compress_suffix = ''
190192
self.archive_compress = False
191193

194+
cfs_compress = test_env.get('PG_PROBACKUP_CFS_COMPRESS', None)
195+
if cfs_compress:
196+
self.cfs_compress = cfs_compress.lower()
197+
else:
198+
self.cfs_compress = self.archive_compress
199+
192200
os.environ["PGAPPNAME"] = "pg_probackup"
193201
self.delete_logs = delete_logs
194202

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import os
2+
import io
3+
import sys
4+
5+
import minio
6+
from minio import Minio
7+
from minio.deleteobjects import DeleteObject
8+
import urllib3
9+
10+
root = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..'))
11+
if root not in sys.path:
12+
sys.path.append(root)
13+
14+
from .fs_backup import TestBackupDir
15+
16+
# Should fail if either of env vars does not exist
17+
host = os.environ['PG_PROBACKUP_S3_HOST']
18+
port = os.environ['PG_PROBACKUP_S3_PORT']
19+
access = os.environ['PG_PROBACKUP_S3_ACCESS_KEY']
20+
secret = os.environ['PG_PROBACKUP_S3_SECRET_ACCESS_KEY']
21+
bucket = os.environ['PG_PROBACKUP_S3_BUCKET_NAME']
22+
path_suffix = os.environ.get("PG_PROBACKUP_TEST_TMP_SUFFIX")
23+
https = os.environ.get("PG_PROBACKUP_S3_HTTPS")
24+
25+
s3_type = os.environ.get('PG_PROBACKUP_S3_TEST', os.environ.get('PROBACKUP_S3_TYPE_FULL_TEST'))
26+
tmp_path = os.environ.get('PGPROBACKUP_TMP_DIR', default='')
27+
28+
status_forcelist = [413, # RequestBodyTooLarge
29+
429, # TooManyRequests
30+
500, # InternalError
31+
503, # ServerBusy
32+
]
33+
34+
35+
class S3TestBackupDir(TestBackupDir):
36+
is_file_based = False
37+
38+
def __init__(self, *, rel_path, backup):
39+
path = "pg_probackup"
40+
if path_suffix:
41+
path += "_" + path_suffix
42+
if tmp_path == '' or os.path.isabs(tmp_path):
43+
self.path = f"{path}{tmp_path}/{rel_path}/{backup}"
44+
else:
45+
self.path = f"{path}/{tmp_path}/{rel_path}/{backup}"
46+
47+
secure: bool = False
48+
if https in ['ON', 'HTTPS']:
49+
secure = True
50+
self.conn = Minio(host + ":" + port, secure=secure, access_key=access,
51+
secret_key=secret, http_client=urllib3.PoolManager(retries=urllib3.Retry(total=5,
52+
backoff_factor=1,
53+
status_forcelist=status_forcelist)))
54+
if not self.conn.bucket_exists(bucket):
55+
raise Exception(f"Test bucket {bucket} does not exist.")
56+
self.pb_args = ('-B', '/' + self.path, f'--s3={s3_type}')
57+
return
58+
59+
def list_instance_backups(self, instance):
60+
full_path = os.path.join(self.path, 'backups', instance)
61+
candidates = self.conn.list_objects(bucket, prefix=full_path, recursive=True)
62+
return [os.path.basename(os.path.dirname(x.object_name))
63+
for x in candidates if x.object_name.endswith('backup.control')]
64+
65+
def list_files(self, sub_dir, recursive=False):
66+
full_path = os.path.join(self.path, sub_dir)
67+
# Need '/' in the end to find inside the folder
68+
full_path_dir = full_path if full_path[-1] == '/' else full_path + '/'
69+
object_list = self.conn.list_objects(bucket, prefix=full_path_dir, recursive=recursive)
70+
return [obj.object_name.replace(full_path_dir, '', 1)
71+
for obj in object_list
72+
if not obj.is_dir]
73+
74+
def list_dirs(self, sub_dir):
75+
full_path = os.path.join(self.path, sub_dir)
76+
# Need '/' in the end to find inside the folder
77+
full_path_dir = full_path if full_path[-1] == '/' else full_path + '/'
78+
object_list = self.conn.list_objects(bucket, prefix=full_path_dir, recursive=False)
79+
return [obj.object_name.replace(full_path_dir, '', 1).rstrip('\\/')
80+
for obj in object_list
81+
if obj.is_dir]
82+
83+
def read_file(self, sub_path, *, text=True):
84+
full_path = os.path.join(self.path, sub_path)
85+
bytes = self.conn.get_object(bucket, full_path).read()
86+
if not text:
87+
return bytes
88+
return bytes.decode('utf-8')
89+
90+
def write_file(self, sub_path, data, *, text=True):
91+
full_path = os.path.join(self.path, sub_path)
92+
if text:
93+
data = data.encode('utf-8')
94+
self.conn.put_object(bucket, full_path, io.BytesIO(data), length=len(data))
95+
96+
def cleanup(self):
97+
self.remove_dir('')
98+
99+
def remove_file(self, sub_path):
100+
full_path = os.path.join(self.path, sub_path)
101+
self.conn.remove_object(bucket, full_path)
102+
103+
def remove_dir(self, sub_path):
104+
if sub_path:
105+
full_path = os.path.join(self.path, sub_path)
106+
else:
107+
full_path = self.path
108+
objs = self.conn.list_objects(bucket, prefix=full_path, recursive=True,
109+
include_version=True)
110+
delobjs = (DeleteObject(o.object_name, o.version_id) for o in objs)
111+
errs = list(self.conn.remove_objects(bucket, delobjs))
112+
if errs:
113+
strerrs = "; ".join(str(err) for err in errs)
114+
raise Exception("There were errors: {0}".format(strerrs))
115+
116+
def exists(self, sub_path):
117+
full_path = os.path.join(self.path, sub_path)
118+
try:
119+
self.conn.stat_object(bucket, full_path)
120+
return True
121+
except minio.error.S3Error as s3err:
122+
if s3err.code == 'NoSuchKey':
123+
return False
124+
raise s3err
125+
except Exception as err:
126+
raise err
127+
128+
def __str__(self):
129+
return '/' + self.path
130+
131+
def __repr__(self):
132+
return "S3TestBackupDir" + str(self.path)
133+
134+
def __fspath__(self):
135+
return self.path

0 commit comments

Comments
 (0)