Skip to content
This repository was archived by the owner on Aug 25, 2022. It is now read-only.

Commit cb8ec62

Browse files
author
mbergeron
committed
add rejection list using TARGET_REJECTED_DIR
1 parent 7aa7c32 commit cb8ec62

File tree

2 files changed

+54
-35
lines changed

2 files changed

+54
-35
lines changed

target_postgres/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import urllib
1111
from datetime import datetime
1212
import collections
13-
from tempfile import TemporaryFile
13+
from tempfile import NamedTemporaryFile
1414

1515
import pkg_resources
1616
from jsonschema.validators import Draft4Validator
@@ -101,7 +101,7 @@ def persist_lines(config, lines):
101101
stream_to_sync[stream].create_schema_if_not_exists()
102102
stream_to_sync[stream].sync_table()
103103
row_count[stream] = 0
104-
csv_files_to_load[stream] = TemporaryFile(mode='w+b')
104+
csv_files_to_load[stream] = NamedTemporaryFile(mode='w+b')
105105
elif t == 'ACTIVATE_VERSION':
106106
logger.debug('ACTIVATE_VERSION message')
107107
else:
@@ -120,7 +120,7 @@ def flush_records(o, csv_files_to_load, row_count, primary_key_exists, sync):
120120
sync.load_csv(csv_files_to_load[stream], row_count[stream])
121121
row_count[stream] = 0
122122
primary_key_exists[stream] = {}
123-
csv_files_to_load[stream] = TemporaryFile(mode='w+b')
123+
csv_files_to_load[stream] = NamedTemporaryFile(mode='w+b')
124124

125125

126126
def main():

target_postgres/db_sync.py

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import inflection
77
import re
88
import itertools
9+
import os
10+
import shutil
11+
12+
TARGET_REJECTED_DIR = os.getenv("TARGET_REJECTED_DIR")
913

1014
logger = singer.get_logger()
1115

@@ -43,14 +47,6 @@ def column_clause(name, schema_property):
4347
return '{} {}'.format(safe_column_name(name), column_type(schema_property))
4448

4549

46-
def sanitize(value):
47-
if not isinstance(value, str):
48-
return value
49-
50-
# this sequence will cause the CSV load to fail
51-
return value.replace("\\u0000", '')
52-
53-
5450
def flatten_key(k, parent_key, sep):
5551
full_key = parent_key + [k]
5652
inflected_key = [inflect_column_name(n) for n in full_key]
@@ -108,7 +104,10 @@ def flatten_record(d, parent_key=[], sep='__'):
108104

109105

110106
def primary_column_names(stream_schema_message):
111-
return [safe_column_name(inflect_column_name(p)) for p in stream_schema_message['key_properties']]
107+
return [
108+
safe_column_name(inflect_column_name(p))
109+
for p in stream_schema_message['key_properties']
110+
]
112111

113112

114113
class DbSync:
@@ -117,6 +116,7 @@ def __init__(self, connection_config, stream_schema_message):
117116
self.schema_name = self.connection_config['schema']
118117
self.stream_schema_message = stream_schema_message
119118
self.flatten_schema = flatten_schema(stream_schema_message['schema'])
119+
self.rejected_count = 0
120120

121121
def open_connection(self):
122122
conn_string = "host='{}' dbname='{}' user='{}' password='{}' port='{}'".format(
@@ -153,6 +153,21 @@ def table_name(self, table_name, is_temporary):
153153
else:
154154
return '{}.{}'.format(self.schema_name, table_name)
155155

156+
def reject_file(self, file):
157+
self.rejected_count += 1
158+
159+
if not TARGET_REJECTED_DIR:
160+
return
161+
162+
os.makedirs(TARGET_REJECTED_DIR, exist_ok=True)
163+
rejected_file_name = "{}-{:04d}.rej.csv".format(self.stream_schema_message['stream'],
164+
self.rejected_count)
165+
rejected_file_path = os.path.join(TARGET_REJECTED_DIR,
166+
rejected_file_name)
167+
168+
shutil.copy(file.name, rejected_file_path)
169+
logger.info("Saved rejected entries as {}".format(rejected_file_path))
170+
156171
def record_primary_key_string(self, record):
157172
if len(self.stream_schema_message['key_properties']) == 0:
158173
return None
@@ -164,35 +179,39 @@ def record_to_csv_line(self, record):
164179
flatten = flatten_record(record)
165180
return ','.join(
166181
[
167-
json.dumps(sanitize(flatten[name])) if name in flatten and flatten[name] else ''
182+
json.dumps(flatten[name]) if name in flatten and flatten[name] else ''
168183
for name in self.flatten_schema
169184
]
170185
)
171186

172187
def load_csv(self, file, count):
173-
file.seek(0)
174-
stream_schema_message = self.stream_schema_message
175-
stream = stream_schema_message['stream']
176-
logger.info("Loading {} rows into '{}'".format(count, stream))
177-
178-
with self.open_connection() as connection:
179-
with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
180-
cur.execute(self.create_table_query(True))
181-
copy_sql = "COPY {} ({}) FROM STDIN WITH (FORMAT CSV, ESCAPE '\\')".format(
182-
self.table_name(stream, True),
183-
', '.join(self.column_names())
184-
)
185-
logger.info(copy_sql)
186-
cur.copy_expert(
187-
copy_sql,
188-
file
189-
)
190-
if len(self.stream_schema_message['key_properties']) > 0:
191-
cur.execute(self.update_from_temp_table())
188+
try:
189+
file.seek(0)
190+
stream_schema_message = self.stream_schema_message
191+
stream = stream_schema_message['stream']
192+
logger.info("Loading {} rows into '{}'".format(count, stream))
193+
194+
with self.open_connection() as connection:
195+
with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
196+
cur.execute(self.create_table_query(True))
197+
copy_sql = "COPY {} ({}) FROM STDIN WITH (FORMAT CSV, ESCAPE '\\')".format(
198+
self.table_name(stream, True),
199+
', '.join(self.column_names())
200+
)
201+
logger.info(copy_sql)
202+
cur.copy_expert(
203+
copy_sql,
204+
file
205+
)
206+
if len(self.stream_schema_message['key_properties']) > 0:
207+
cur.execute(self.update_from_temp_table())
208+
logger.info(cur.statusmessage)
209+
cur.execute(self.insert_from_temp_table())
192210
logger.info(cur.statusmessage)
193-
cur.execute(self.insert_from_temp_table())
194-
logger.info(cur.statusmessage)
195-
cur.execute(self.drop_temp_table())
211+
cur.execute(self.drop_temp_table())
212+
except psycopg2.DataError as err:
213+
logger.exception("Failed to load CSV file: {}".format(file.name))
214+
self.reject_file(file)
196215

197216
def insert_from_temp_table(self):
198217
stream_schema_message = self.stream_schema_message

0 commit comments

Comments
 (0)