Skip to content

Commit d6ccbc0

Browse files
committed
fix: write to file in CSV format, not in JSON
writing in JSON format does not make sense. The records in Postgres should arrive exactly as they were scraped by a Tap at the source. E.g. if a string in the sources has control characters like LF (0x0a), the same string should also have LF in the postgres target and not '\n' like JSON format uses. Fixes transferwise#131
1 parent bef5a27 commit d6ccbc0

File tree

2 files changed

+11
-15
lines changed

2 files changed

+11
-15
lines changed

target_postgres/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python3
22

33
import argparse
4+
import csv
45
import io
56
import json
67
import os
@@ -344,10 +345,10 @@ def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None):
344345

345346
size_bytes = 0
346347
csv_fd, csv_file = mkstemp(suffix='.csv', prefix=f'{stream}_', dir=temp_dir)
347-
with open(csv_fd, 'w+b') as f:
348+
with open(csv_fd, 'w') as csvfile:
349+
writer = csv.DictWriter(csvfile, fieldnames=list(db_sync.flatten_schema.keys()))
348350
for record in records_to_load.values():
349-
csv_line = db_sync.record_to_csv_line(record)
350-
f.write(bytes(csv_line + '\n', 'UTF-8'))
351+
writer.writerow(db_sync.record_to_flattened_record(record))
351352

352353
size_bytes = os.path.getsize(csv_file)
353354
db_sync.load_csv(csv_file, row_count, size_bytes)

target_postgres/db_sync.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import json
22
import sys
3-
import psycopg2
4-
import psycopg2.extras
5-
import inflection
63
import re
74
import uuid
85
import itertools
96
import time
107
from collections.abc import MutableMapping
8+
from typing import Dict
9+
10+
import psycopg2
11+
import psycopg2.extras
12+
import inflection
1113
from singer import get_logger
1214

1315

@@ -344,15 +346,8 @@ def record_primary_key_string(self, record):
344346
raise exc
345347
return ','.join(key_props)
346348

347-
def record_to_csv_line(self, record):
348-
flatten = flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level)
349-
return ','.join(
350-
[
351-
json.dumps(flatten[name], ensure_ascii=False)
352-
if name in flatten and (flatten[name] == 0 or flatten[name]) else ''
353-
for name in self.flatten_schema
354-
]
355-
)
349+
def record_to_flattened_record(self, record: Dict) -> Dict:
350+
return flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level)
356351

357352
def load_csv(self, file, count, size_bytes):
358353
stream_schema_message = self.stream_schema_message

0 commit comments

Comments
 (0)