Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

redshift: also try to get schema from svv_columns #803

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion data_diff/databases/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,38 @@ def query_pg_get_cols(self, path: DbPath) -> Dict[str, tuple]:

return schema_dict

def select_svv_columns_schema(self, path: DbPath) -> Dict[str, tuple]:
database, schema, table = self._normalize_table_path(path)

db_clause = ""
if database:
db_clause = f" AND table_catalog = '{database.lower()}'"

return (
f"""
select
distinct
column_name,
data_type,
datetime_precision,
numeric_precision,
numeric_scale
from
svv_columns
where table_name = '{table.lower()}' and table_schema = '{schema.lower()}'
"""
+ db_clause
)

def query_svv_columns(self, path: DbPath) -> Dict[str, tuple]:
rows = self.query(self.select_svv_columns_schema(path), list)
if not rows:
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")

d = {r[0]: r for r in rows}
assert len(d) == len(rows)
return d

# when using a non-information_schema source, strip (N) from type(N) etc. to match
# typical information_schema output
def _normalize_schema_info(self, rows) -> Dict[str, tuple]:
Expand Down Expand Up @@ -150,7 +182,10 @@ def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
try:
return self.query_external_table_schema(path)
except RuntimeError:
return self.query_pg_get_cols(path)
try:
return self.query_pg_get_cols(path)
except Exception:
return self.query_svv_columns(path)

def _normalize_table_path(self, path: DbPath) -> DbPath:
if len(path) == 1:
Expand Down