diff --git a/target_postgres/denest.py b/target_postgres/denest.py index 2029a8f..b926c42 100644 --- a/target_postgres/denest.py +++ b/target_postgres/denest.py @@ -1,8 +1,6 @@ from copy import deepcopy - from target_postgres import json_schema, singer - def to_table_batches(schema, key_properties, records): """ Given a schema, and records, get all table schemas and records and prep them @@ -98,6 +96,7 @@ def _literal_only_schema(schema): def _create_subtable(table_path, table_json_schema, key_prop_schemas, subtables, level): + if json_schema.is_object(table_json_schema['items']): new_properties = table_json_schema['items']['properties'] else: @@ -156,7 +155,7 @@ def _denest_schema_helper( key_prop_schemas, subtables, level): - + for prop, item_json_schema in _denest_schema__singular_schemas(table_json_schema): if json_schema.is_object(item_json_schema): @@ -196,30 +195,10 @@ def _denest_schema( new_properties = {} for prop, item_json_schema in _denest_schema__singular_schemas(table_json_schema): - - if json_schema.is_object(item_json_schema): - _denest_schema_helper(table_path + (prop,), - (prop,), - item_json_schema, - json_schema.is_nullable(item_json_schema), - new_properties, - key_prop_schemas, - subtables, - level) - - elif json_schema.is_iterable(item_json_schema): - _create_subtable(table_path + (prop,), - item_json_schema, - key_prop_schemas, - subtables, - level + 1) - - elif json_schema.is_literal(item_json_schema): - if (prop,) in new_properties: - new_properties[(prop,)]['anyOf'].append(item_json_schema) - else: - new_properties[(prop,)] = {'anyOf': [item_json_schema]} - + if (prop,) in new_properties: + new_properties[(prop,)]['anyOf'].append(item_json_schema) + else: + new_properties[(prop,)] = {'anyOf': [item_json_schema]} table_json_schema['properties'] = new_properties @@ -245,60 +224,6 @@ def _get_streamed_table_records(key_properties, records): return records_map -def _denest_subrecord(table_path, - prop_path, - parent_record, - record, - records_map, - key_properties, - pk_fks, - level): - """""" - """ - {...} - """ - for prop, value in record.items(): - """ - str : {...} | [...] | ???None??? | - """ - - if isinstance(value, dict): - """ - {...} - """ - _denest_subrecord(table_path + (prop,), - prop_path + (prop,), - parent_record, - value, - records_map, - key_properties, - pk_fks, - level) - - elif isinstance(value, list): - """ - [...] - """ - _denest_records(table_path + (prop,), - value, - records_map, - key_properties, - pk_fks=pk_fks, - level=level + 1) - - elif value is None: - """ - None - """ - continue - - else: - """ - - """ - parent_record[prop_path + (prop,)] = (json_schema.python_type(value), value) - - def _denest_record(table_path, record, records_map, key_properties, pk_fks, level): """""" """ @@ -309,32 +234,7 @@ def _denest_record(table_path, record, records_map, key_properties, pk_fks, leve """ str : {...} | [...] | None | """ - - if isinstance(value, dict): - """ - {...} - """ - _denest_subrecord(table_path + (prop,), - (prop,), - denested_record, - value, - records_map, - key_properties, - pk_fks, - level) - - elif isinstance(value, list): - """ - [...] - """ - _denest_records(table_path + (prop,), - value, - records_map, - key_properties, - pk_fks=pk_fks, - level=level + 1) - - elif value is None: + if value is None: """ None """ @@ -348,6 +248,7 @@ def _denest_record(table_path, record, records_map, key_properties, pk_fks, leve if table_path not in records_map: records_map[table_path] = [] + records_map[table_path].append(denested_record) diff --git a/target_postgres/json_schema.py b/target_postgres/json_schema.py index 8e97187..de76e17 100644 --- a/target_postgres/json_schema.py +++ b/target_postgres/json_schema.py @@ -22,10 +22,11 @@ bool: BOOLEAN, str: STRING, type(None): NULL, - decimal.Decimal: NUMBER + decimal.Decimal: NUMBER, + dict: OBJECT, + list: ARRAY } - def python_type(x): """ Given a value `x`, return its Python Type as a JSONSchema type. @@ -559,7 +560,9 @@ def validation_errors(schema): 'number': 'f', 'integer': 'i', 'boolean': 'b', - 'date-time': 't' + 'date-time': 't', + 'object': 'o', + 'array': 'a' } diff --git a/target_postgres/postgres.py b/target_postgres/postgres.py index 223c0d5..29f0fab 100644 --- a/target_postgres/postgres.py +++ b/target_postgres/postgres.py @@ -16,6 +16,7 @@ from target_postgres import json_schema, singer from target_postgres.exceptions import PostgresError from target_postgres.sql_base import SEPARATOR, SQLInterface +from decimal import Decimal RESERVED_NULL_DEFAULT = 'NULL' @@ -569,6 +570,7 @@ def persist_csv_rows(self, sql.Identifier(temp_table_name), sql.SQL(', ').join(map(sql.Identifier, columns)), sql.Literal(RESERVED_NULL_DEFAULT)) + cur.copy_expert(copy, csv_rows) pattern = re.compile(singer.LEVEL_FMT.format('[0-9]+')) @@ -582,6 +584,7 @@ def persist_csv_rows(self, canonicalized_key_properties, columns, subkeys) + cur.execute(update_sql) def write_table_batch(self, cur, table_batch, metadata): @@ -601,10 +604,19 @@ def write_table_batch(self, cur, table_batch, metadata): csv_headers = list(remote_schema['schema']['properties'].keys()) rows_iter = iter(table_batch['records']) + def handle_decimal(obj): + if isinstance(obj, Decimal): + return float(obj) + raise TypeError(f"Object of type '{type(obj).__name__}' is not JSON serializable") + def transform(): try: row = next(rows_iter) + for header in csv_headers: + if header in row and isinstance(row[header], (dict, list)): + row[header] = json.dumps(row[header], default=handle_decimal) + with io.StringIO() as out: writer = csv.DictWriter(out, csv_headers) writer.writerow(row) @@ -623,8 +635,7 @@ def transform(): return len(table_batch['records']) - def add_column(self, cur, table_name, column_name, column_schema): - + def add_column(self, cur, table_name, column_name, column_schema): cur.execute(sql.SQL(''' ALTER TABLE {table_schema}.{table_name} ADD COLUMN {column_name} {data_type}; @@ -818,6 +829,7 @@ def sql_type_to_json_schema(self, sql_type, is_nullable): :return: JSONSchema """ _format = None + if sql_type == 'timestamp with time zone': json_type = 'string' _format = 'date-time' @@ -829,6 +841,8 @@ def sql_type_to_json_schema(self, sql_type, is_nullable): json_type = 'boolean' elif sql_type == 'text': json_type = 'string' + elif sql_type == 'jsonb': + json_type = 'json' else: raise PostgresError('Unsupported type `{}` in existing target table'.format(sql_type)) @@ -869,6 +883,10 @@ def json_schema_to_sql_type(self, schema): sql_type = 'bigint' elif _type == 'number': sql_type = 'double precision' + elif _type == 'object': + sql_type = 'jsonb' + elif _type == 'array': + sql_type = 'jsonb' if not_null: sql_type += ' NOT NULL'