|
14 | 14 |
|
15 | 15 | from data_diff.info_tree import InfoTree, SegmentInfo
|
16 | 16 |
|
17 |
| -from .utils import run_as_daemon, safezip, getLogger, truncate_error |
| 17 | +from .utils import run_as_daemon, safezip, getLogger, truncate_error, Vector |
18 | 18 | from .thread_utils import ThreadedYielder
|
19 |
| -from .table_segment import TableSegment |
| 19 | +from .table_segment import TableSegment, create_mesh_from_points |
20 | 20 | from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled
|
21 | 21 | from sqeleton.abcs import IKey
|
22 | 22 |
|
@@ -135,7 +135,6 @@ def _get_stats(self, is_dbt: bool = False) -> DiffStats:
|
135 | 135 |
|
136 | 136 | return DiffStats(diff_by_sign, table1_count, table2_count, unchanged, diff_percent, extra_column_diffs)
|
137 | 137 |
|
138 |
| - |
139 | 138 | def get_stats_string(self, is_dbt: bool = False):
|
140 | 139 | diff_stats = self._get_stats(is_dbt)
|
141 | 140 |
|
@@ -271,63 +270,75 @@ def _diff_segments(
|
271 | 270 | ):
|
272 | 271 | ...
|
273 | 272 |
|
274 |
| - def _bisect_and_diff_tables(self, table1, table2, info_tree): |
275 |
| - if len(table1.key_columns) > 1: |
276 |
| - raise NotImplementedError("Composite key not supported yet!") |
277 |
| - if len(table2.key_columns) > 1: |
278 |
| - raise NotImplementedError("Composite key not supported yet!") |
| 273 | + def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, info_tree): |
279 | 274 | if len(table1.key_columns) != len(table2.key_columns):
|
280 | 275 | raise ValueError("Tables should have an equivalent number of key columns!")
|
281 |
| - (key1,) = table1.key_columns |
282 |
| - (key2,) = table2.key_columns |
283 |
| - |
284 |
| - key_type = table1._schema[key1] |
285 |
| - key_type2 = table2._schema[key2] |
286 |
| - if not isinstance(key_type, IKey): |
287 |
| - raise NotImplementedError(f"Cannot use column of type {key_type} as a key") |
288 |
| - if not isinstance(key_type2, IKey): |
289 |
| - raise NotImplementedError(f"Cannot use column of type {key_type2} as a key") |
290 |
| - if key_type.python_type is not key_type2.python_type: |
291 |
| - raise TypeError(f"Incompatible key types: {key_type} and {key_type2}") |
| 276 | + |
| 277 | + key_types1 = [table1._schema[i] for i in table1.key_columns] |
| 278 | + key_types2 = [table2._schema[i] for i in table2.key_columns] |
| 279 | + |
| 280 | + for kt in key_types1 + key_types2: |
| 281 | + if not isinstance(kt, IKey): |
| 282 | + raise NotImplementedError(f"Cannot use a column of type {kt} as a key") |
| 283 | + |
| 284 | + for kt1, kt2 in safezip(key_types1, key_types2): |
| 285 | + if kt1.python_type is not kt2.python_type: |
| 286 | + raise TypeError(f"Incompatible key types: {kt1} and {kt2}") |
292 | 287 |
|
293 | 288 | # Query min/max values
|
294 | 289 | key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2])
|
295 | 290 |
|
296 | 291 | # Start with the first completed value, so we don't waste time waiting
|
297 |
| - min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 292 | + min_key1, max_key1 = self._parse_key_range_result(key_types1, next(key_ranges)) |
298 | 293 |
|
299 |
| - table1, table2 = [t.new(min_key=min_key1, max_key=max_key1) for t in (table1, table2)] |
| 294 | + btable1, btable2 = [t.new_key_bounds(min_key=min_key1, max_key=max_key1) for t in (table1, table2)] |
300 | 295 |
|
301 | 296 | logger.info(
|
302 |
| - f"Diffing segments at key-range: {table1.min_key}..{table2.max_key}. " |
303 |
| - f"size: table1 <= {table1.approximate_size()}, table2 <= {table2.approximate_size()}" |
| 297 | + f"Diffing segments at key-range: {btable1.min_key}..{btable2.max_key}. " |
| 298 | + f"size: table1 <= {btable1.approximate_size()}, table2 <= {btable2.approximate_size()}" |
304 | 299 | )
|
305 | 300 |
|
306 | 301 | ti = ThreadedYielder(self.max_threadpool_size)
|
307 | 302 | # Bisect (split) the table into segments, and diff them recursively.
|
308 |
| - ti.submit(self._bisect_and_diff_segments, ti, table1, table2, info_tree) |
| 303 | + ti.submit(self._bisect_and_diff_segments, ti, btable1, btable2, info_tree) |
309 | 304 |
|
310 | 305 | # Now we check for the second min-max, to diff the portions we "missed".
|
311 |
| - min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 306 | + # This is achieved by subtracting the table ranges, and dividing the resulting space into aligned boxes. |
| 307 | + # For example, given tables A & B, and a 2D compound key, where A was queried first for key-range, |
| 308 | + # the regions of B we need to diff in this second pass are marked by B1..8: |
| 309 | + # ┌──┬──────┬──┐ |
| 310 | + # │B1│ B2 │B3│ |
| 311 | + # ├──┼──────┼──┤ |
| 312 | + # │B4│ A │B5│ |
| 313 | + # ├──┼──────┼──┤ |
| 314 | + # │B6│ B7 │B8│ |
| 315 | + # └──┴──────┴──┘ |
| 316 | + # Overall, the max number of new regions in this 2nd pass is 3^|k| - 1 |
312 | 317 |
|
313 |
| - if min_key2 < min_key1: |
314 |
| - pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)] |
315 |
| - ti.submit(self._bisect_and_diff_segments, ti, *pre_tables, info_tree) |
| 318 | + min_key2, max_key2 = self._parse_key_range_result(key_types1, next(key_ranges)) |
316 | 319 |
|
317 |
| - if max_key2 > max_key1: |
318 |
| - post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)] |
319 |
| - ti.submit(self._bisect_and_diff_segments, ti, *post_tables, info_tree) |
| 320 | + points = [list(sorted(p)) for p in safezip(min_key1, min_key2, max_key1, max_key2)] |
| 321 | + box_mesh = create_mesh_from_points(*points) |
| 322 | + |
| 323 | + new_regions = [(p1, p2) for p1, p2 in box_mesh if p1 < p2 and not (p1 >= min_key1 and p2 <= max_key1)] |
| 324 | + |
| 325 | + for p1, p2 in new_regions: |
| 326 | + extra_tables = [t.new_key_bounds(min_key=p1, max_key=p2) for t in (table1, table2)] |
| 327 | + ti.submit(self._bisect_and_diff_segments, ti, *extra_tables, info_tree) |
320 | 328 |
|
321 | 329 | return ti
|
322 | 330 |
|
323 |
| - def _parse_key_range_result(self, key_type, key_range): |
324 |
| - mn, mx = key_range |
325 |
| - cls = key_type.make_value |
| 331 | + def _parse_key_range_result(self, key_types, key_range) -> Tuple[Vector, Vector]: |
| 332 | + min_key_values, max_key_values = key_range |
| 333 | + |
326 | 334 | # We add 1 because our ranges are exclusive of the end (like in Python)
|
327 | 335 | try:
|
328 |
| - return cls(mn), cls(mx) + 1 |
| 336 | + min_key = Vector(key_type.make_value(mn) for key_type, mn in safezip(key_types, min_key_values)) |
| 337 | + max_key = Vector(key_type.make_value(mx) + 1 for key_type, mx in safezip(key_types, max_key_values)) |
329 | 338 | except (TypeError, ValueError) as e:
|
330 |
| - raise type(e)(f"Cannot apply {key_type} to '{mn}', '{mx}'.") from e |
| 339 | + raise type(e)(f"Cannot apply {key_types} to '{min_key_values}', '{max_key_values}'.") from e |
| 340 | + |
| 341 | + return min_key, max_key |
331 | 342 |
|
332 | 343 | def _bisect_and_diff_segments(
|
333 | 344 | self,
|
|
0 commit comments