From 5ca5eec4abb131df5ac4a3e9db35bfa789060664 Mon Sep 17 00:00:00 2001 From: Frank Date: Wed, 1 Nov 2023 10:46:43 -0400 Subject: [PATCH 1/5] WIP --- target_postgres/denest.py | 120 ++++--------------------------- target_postgres/json_schema.py | 9 ++- target_postgres/postgres.py | 40 ++++++++++- target_postgres/singer_stream.py | 4 ++ 4 files changed, 61 insertions(+), 112 deletions(-) diff --git a/target_postgres/denest.py b/target_postgres/denest.py index 2029a8f2..2486e51b 100644 --- a/target_postgres/denest.py +++ b/target_postgres/denest.py @@ -1,7 +1,7 @@ from copy import deepcopy - from target_postgres import json_schema, singer - +import traceback +import json def to_table_batches(schema, key_properties, records): """ @@ -98,6 +98,10 @@ def _literal_only_schema(schema): def _create_subtable(table_path, table_json_schema, key_prop_schemas, subtables, level): + print(f"Creating subtable for {table_path} with schema {table_json_schema}") + + traceback.print_stack() + if json_schema.is_object(table_json_schema['items']): new_properties = table_json_schema['items']['properties'] else: @@ -156,7 +160,7 @@ def _denest_schema_helper( key_prop_schemas, subtables, level): - + for prop, item_json_schema in _denest_schema__singular_schemas(table_json_schema): if json_schema.is_object(item_json_schema): @@ -196,30 +200,10 @@ def _denest_schema( new_properties = {} for prop, item_json_schema in _denest_schema__singular_schemas(table_json_schema): - - if json_schema.is_object(item_json_schema): - _denest_schema_helper(table_path + (prop,), - (prop,), - item_json_schema, - json_schema.is_nullable(item_json_schema), - new_properties, - key_prop_schemas, - subtables, - level) - - elif json_schema.is_iterable(item_json_schema): - _create_subtable(table_path + (prop,), - item_json_schema, - key_prop_schemas, - subtables, - level + 1) - - elif json_schema.is_literal(item_json_schema): - if (prop,) in new_properties: - new_properties[(prop,)]['anyOf'].append(item_json_schema) - else: - new_properties[(prop,)] = {'anyOf': [item_json_schema]} - + if (prop,) in new_properties: + new_properties[(prop,)]['anyOf'].append(item_json_schema) + else: + new_properties[(prop,)] = {'anyOf': [item_json_schema]} table_json_schema['properties'] = new_properties @@ -245,60 +229,6 @@ def _get_streamed_table_records(key_properties, records): return records_map -def _denest_subrecord(table_path, - prop_path, - parent_record, - record, - records_map, - key_properties, - pk_fks, - level): - """""" - """ - {...} - """ - for prop, value in record.items(): - """ - str : {...} | [...] | ???None??? | - """ - - if isinstance(value, dict): - """ - {...} - """ - _denest_subrecord(table_path + (prop,), - prop_path + (prop,), - parent_record, - value, - records_map, - key_properties, - pk_fks, - level) - - elif isinstance(value, list): - """ - [...] - """ - _denest_records(table_path + (prop,), - value, - records_map, - key_properties, - pk_fks=pk_fks, - level=level + 1) - - elif value is None: - """ - None - """ - continue - - else: - """ - - """ - parent_record[prop_path + (prop,)] = (json_schema.python_type(value), value) - - def _denest_record(table_path, record, records_map, key_properties, pk_fks, level): """""" """ @@ -309,32 +239,7 @@ def _denest_record(table_path, record, records_map, key_properties, pk_fks, leve """ str : {...} | [...] | None | """ - - if isinstance(value, dict): - """ - {...} - """ - _denest_subrecord(table_path + (prop,), - (prop,), - denested_record, - value, - records_map, - key_properties, - pk_fks, - level) - - elif isinstance(value, list): - """ - [...] - """ - _denest_records(table_path + (prop,), - value, - records_map, - key_properties, - pk_fks=pk_fks, - level=level + 1) - - elif value is None: + if value is None: """ None """ @@ -348,6 +253,7 @@ def _denest_record(table_path, record, records_map, key_properties, pk_fks, leve if table_path not in records_map: records_map[table_path] = [] + records_map[table_path].append(denested_record) diff --git a/target_postgres/json_schema.py b/target_postgres/json_schema.py index 8e971877..de76e17b 100644 --- a/target_postgres/json_schema.py +++ b/target_postgres/json_schema.py @@ -22,10 +22,11 @@ bool: BOOLEAN, str: STRING, type(None): NULL, - decimal.Decimal: NUMBER + decimal.Decimal: NUMBER, + dict: OBJECT, + list: ARRAY } - def python_type(x): """ Given a value `x`, return its Python Type as a JSONSchema type. @@ -559,7 +560,9 @@ def validation_errors(schema): 'number': 'f', 'integer': 'i', 'boolean': 'b', - 'date-time': 't' + 'date-time': 't', + 'object': 'o', + 'array': 'a' } diff --git a/target_postgres/postgres.py b/target_postgres/postgres.py index 223c0d55..cdd7fb3a 100644 --- a/target_postgres/postgres.py +++ b/target_postgres/postgres.py @@ -16,6 +16,7 @@ from target_postgres import json_schema, singer from target_postgres.exceptions import PostgresError from target_postgres.sql_base import SEPARATOR, SQLInterface +from decimal import Decimal RESERVED_NULL_DEFAULT = 'NULL' @@ -427,6 +428,8 @@ def add_table(self, cur, path, name, metadata): sql.Identifier(self.postgres_schema), sql.Identifier(name)) + print(create_table_sql) + cur.execute(sql.SQL('{} ();').format(create_table_sql)) self._set_table_metadata(cur, name, {'path': path, @@ -564,11 +567,20 @@ def persist_csv_rows(self, columns, csv_rows): + print("-------------------------") + print(csv_rows) + print("-------------------------") + copy = sql.SQL('COPY {}.{} ({}) FROM STDIN WITH CSV NULL AS {}').format( sql.Identifier(self.postgres_schema), sql.Identifier(temp_table_name), sql.SQL(', ').join(map(sql.Identifier, columns)), sql.Literal(RESERVED_NULL_DEFAULT)) + + print("-------------------------") + print(type(cur)) + print("-------------------------") + cur.copy_expert(copy, csv_rows) pattern = re.compile(singer.LEVEL_FMT.format('[0-9]+')) @@ -582,6 +594,11 @@ def persist_csv_rows(self, canonicalized_key_properties, columns, subkeys) + + print("-------------------------") + print(update_sql) + print("-------------------------") + cur.execute(update_sql) def write_table_batch(self, cur, table_batch, metadata): @@ -601,10 +618,20 @@ def write_table_batch(self, cur, table_batch, metadata): csv_headers = list(remote_schema['schema']['properties'].keys()) rows_iter = iter(table_batch['records']) + def handle_decimal(obj): + if isinstance(obj, Decimal): + return float(obj) + raise TypeError(f"Object of type '{type(obj).__name__}' is not JSON serializable") + def transform(): try: row = next(rows_iter) + for header in csv_headers: + if header in row and isinstance(row[header], (dict, list)): + row[header] = json.dumps(row[header], default=handle_decimal) + print(json.dumps(row[header])) + with io.StringIO() as out: writer = csv.DictWriter(out, csv_headers) writer.writerow(row) @@ -614,6 +641,8 @@ def transform(): csv_rows = TransformStream(transform) + print(table_batch['records']) + ## Persist csv rows self.persist_csv_rows(cur, remote_schema, @@ -623,8 +652,7 @@ def transform(): return len(table_batch['records']) - def add_column(self, cur, table_name, column_name, column_schema): - + def add_column(self, cur, table_name, column_name, column_schema): cur.execute(sql.SQL(''' ALTER TABLE {table_schema}.{table_name} ADD COLUMN {column_name} {data_type}; @@ -818,6 +846,8 @@ def sql_type_to_json_schema(self, sql_type, is_nullable): :return: JSONSchema """ _format = None + print(f"sql type {sql_type}") + if sql_type == 'timestamp with time zone': json_type = 'string' _format = 'date-time' @@ -829,6 +859,8 @@ def sql_type_to_json_schema(self, sql_type, is_nullable): json_type = 'boolean' elif sql_type == 'text': json_type = 'string' + elif sql_type == 'jsonb': + json_type = 'json' else: raise PostgresError('Unsupported type `{}` in existing target table'.format(sql_type)) @@ -869,6 +901,10 @@ def json_schema_to_sql_type(self, schema): sql_type = 'bigint' elif _type == 'number': sql_type = 'double precision' + elif _type == 'object': + sql_type = 'jsonb' + elif _type == 'array': + sql_type = 'jsonb' if not_null: sql_type += ' NOT NULL' diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index a52100b9..408e690b 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -134,6 +134,10 @@ def __update_version(self, version): self.__lifetime_max_version = version def add_record_message(self, record_message): + print("----------------") + print(record_message) + print("----------------") + add_record = True self.__update_version(record_message.get('version')) From dd0443c3ca3408d0e41f1fa735d84dec03f44b5d Mon Sep 17 00:00:00 2001 From: Frank Date: Wed, 1 Nov 2023 10:56:58 -0400 Subject: [PATCH 2/5] Removed print --- target_postgres/postgres.py | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/target_postgres/postgres.py b/target_postgres/postgres.py index cdd7fb3a..29f0fabc 100644 --- a/target_postgres/postgres.py +++ b/target_postgres/postgres.py @@ -428,8 +428,6 @@ def add_table(self, cur, path, name, metadata): sql.Identifier(self.postgres_schema), sql.Identifier(name)) - print(create_table_sql) - cur.execute(sql.SQL('{} ();').format(create_table_sql)) self._set_table_metadata(cur, name, {'path': path, @@ -567,20 +565,12 @@ def persist_csv_rows(self, columns, csv_rows): - print("-------------------------") - print(csv_rows) - print("-------------------------") - copy = sql.SQL('COPY {}.{} ({}) FROM STDIN WITH CSV NULL AS {}').format( sql.Identifier(self.postgres_schema), sql.Identifier(temp_table_name), sql.SQL(', ').join(map(sql.Identifier, columns)), sql.Literal(RESERVED_NULL_DEFAULT)) - print("-------------------------") - print(type(cur)) - print("-------------------------") - cur.copy_expert(copy, csv_rows) pattern = re.compile(singer.LEVEL_FMT.format('[0-9]+')) @@ -594,11 +584,7 @@ def persist_csv_rows(self, canonicalized_key_properties, columns, subkeys) - - print("-------------------------") - print(update_sql) - print("-------------------------") - + cur.execute(update_sql) def write_table_batch(self, cur, table_batch, metadata): @@ -630,7 +616,6 @@ def transform(): for header in csv_headers: if header in row and isinstance(row[header], (dict, list)): row[header] = json.dumps(row[header], default=handle_decimal) - print(json.dumps(row[header])) with io.StringIO() as out: writer = csv.DictWriter(out, csv_headers) @@ -641,8 +626,6 @@ def transform(): csv_rows = TransformStream(transform) - print(table_batch['records']) - ## Persist csv rows self.persist_csv_rows(cur, remote_schema, @@ -846,7 +829,6 @@ def sql_type_to_json_schema(self, sql_type, is_nullable): :return: JSONSchema """ _format = None - print(f"sql type {sql_type}") if sql_type == 'timestamp with time zone': json_type = 'string' From 41dbef979f633b7e46f4dee7c25aab331e707c7c Mon Sep 17 00:00:00 2001 From: Frank Date: Wed, 1 Nov 2023 10:57:57 -0400 Subject: [PATCH 3/5] Removed print --- target_postgres/denest.py | 2 -- target_postgres/singer_stream.py | 4 ---- 2 files changed, 6 deletions(-) diff --git a/target_postgres/denest.py b/target_postgres/denest.py index 2486e51b..b4ecf848 100644 --- a/target_postgres/denest.py +++ b/target_postgres/denest.py @@ -98,8 +98,6 @@ def _literal_only_schema(schema): def _create_subtable(table_path, table_json_schema, key_prop_schemas, subtables, level): - print(f"Creating subtable for {table_path} with schema {table_json_schema}") - traceback.print_stack() if json_schema.is_object(table_json_schema['items']): diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index 408e690b..a52100b9 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -134,10 +134,6 @@ def __update_version(self, version): self.__lifetime_max_version = version def add_record_message(self, record_message): - print("----------------") - print(record_message) - print("----------------") - add_record = True self.__update_version(record_message.get('version')) From cf6b3237ecd185cf32d4b9b9f05ac61e8ecc4f9c Mon Sep 17 00:00:00 2001 From: Frank Date: Wed, 1 Nov 2023 13:30:34 -0400 Subject: [PATCH 4/5] Remove additional debugging messages --- target_postgres/denest.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/target_postgres/denest.py b/target_postgres/denest.py index b4ecf848..b926c42b 100644 --- a/target_postgres/denest.py +++ b/target_postgres/denest.py @@ -1,7 +1,5 @@ from copy import deepcopy from target_postgres import json_schema, singer -import traceback -import json def to_table_batches(schema, key_properties, records): """ @@ -98,7 +96,6 @@ def _literal_only_schema(schema): def _create_subtable(table_path, table_json_schema, key_prop_schemas, subtables, level): - traceback.print_stack() if json_schema.is_object(table_json_schema['items']): new_properties = table_json_schema['items']['properties'] From f558b117c6aec4e616f9800b0b3ed37d314be40f Mon Sep 17 00:00:00 2001 From: imran Date: Wed, 6 Mar 2024 16:14:47 +0500 Subject: [PATCH 5/5] updated logic to sync all columns in single json object --- target_postgres/postgres.py | 68 +++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/target_postgres/postgres.py b/target_postgres/postgres.py index 29f0fabc..cca4b39a 100644 --- a/target_postgres/postgres.py +++ b/target_postgres/postgres.py @@ -22,6 +22,70 @@ RESERVED_NULL_DEFAULT = 'NULL' @lru_cache(maxsize=128) +def transform_dict(input_dict,id_columns): + id_columns=eval(id_columns) + sdc_columns = [key for key in input_dict['properties'] if key.startswith("_sdc_")] + tenant_id = [key for key in input_dict['properties'] if key == "tenant_id"] + + # Create a copy of the original dictionary + result_dict = { + 'type': input_dict['type'], + 'properties': {} + } + + # Move id_columns to the result dictionary + for column in id_columns: + if column in input_dict['properties']: + result_dict['properties'][column] = input_dict['properties'][column] + + # Move sdc_columns to the result dictionary + for column in sdc_columns: + if column in input_dict['properties']: + result_dict['properties'][column] = input_dict['properties'][column] + + # Move tenant_id to the result dictionary + for column in tenant_id: + if column in input_dict['properties']: + result_dict['properties'][column] = input_dict['properties'][column] + + # Move other columns to the 'result' dictionary + result_dict['properties']['record'] = { + 'type': ['object'], + 'properties': { + column: input_dict['properties'][column] + for column in input_dict['properties'] + if column not in id_columns + sdc_columns + tenant_id + } + } + + return result_dict + + +def transform_data_dict(input_string,id_columns): + id_columns=eval(id_columns) + data=eval(input_string) + result_list = [] + + for entry in data: + result_entry = {'record': {}} + + # Separate id_columns and columns starting with "_sdc_" + id_columns = {key: entry.pop(key) for key in id_columns} + sdc_columns = {key: entry.pop(key) for key in entry.copy() if key.startswith("_sdc_")} + tenant_id = {key: entry.pop(key) for key in entry.copy() if key == "tenant_id"} + + # Move remaining columns to the 'result' dictionary + result_entry['record'] = entry + + # Add id_columns and sdc_columns back to the result_entry + result_entry.update(id_columns) + result_entry.update(sdc_columns) + result_entry.update(tenant_id) + + result_list.append(result_entry) + + return result_list + def _format_datetime(value): """ Format a datetime value. This is only called from the @@ -304,9 +368,9 @@ def write_batch(self, stream_buffer): written_batches_details = self.write_batch_helper(cur, root_table_name, - stream_buffer.schema, + transform_dict(stream_buffer.schema,str(stream_buffer.key_properties)), stream_buffer.key_properties, - stream_buffer.get_batch(), + transform_data_dict(str(stream_buffer.get_batch()),str(stream_buffer.key_properties)), {'version': target_table_version}) cur.execute('COMMIT;')