|
13 | 13 | from tempfile import TemporaryFile
|
14 | 14 |
|
15 | 15 | import pkg_resources
|
16 |
| -from jsonschema.validators import Draft4Validator |
| 16 | +from jsonschema.validators import Draft4Validator, FormatChecker |
17 | 17 | import singer
|
18 | 18 | from target_postgres.db_sync import DbSync
|
19 | 19 |
|
20 | 20 | logger = singer.get_logger()
|
21 | 21 |
|
22 | 22 |
|
| 23 | +def float_to_decimal(value): |
| 24 | + '''Walk the given data structure and turn all instances of float into |
| 25 | + double.''' |
| 26 | + if isinstance(value, float): |
| 27 | + return Decimal(str(value)) |
| 28 | + if isinstance(value, list): |
| 29 | + return [float_to_decimal(child) for child in value] |
| 30 | + if isinstance(value, dict): |
| 31 | + return {k: float_to_decimal(v) for k, v in value.items()} |
| 32 | + return value |
| 33 | + |
23 | 34 | def emit_state(state):
|
24 | 35 | if state is not None:
|
25 | 36 | line = json.dumps(state)
|
@@ -65,7 +76,7 @@ def persist_lines(config, lines):
|
65 | 76 | stream = o['stream']
|
66 | 77 |
|
67 | 78 | # Validate record
|
68 |
| - validators[stream].validate(o['record']) |
| 79 | + validators[stream].validate(float_to_decimal(o['record'])) |
69 | 80 |
|
70 | 81 | sync = stream_to_sync[stream]
|
71 | 82 |
|
@@ -93,7 +104,8 @@ def persist_lines(config, lines):
|
93 | 104 | raise Exception("Line is missing required key 'stream': {}".format(line))
|
94 | 105 | stream = o['stream']
|
95 | 106 | schemas[stream] = o
|
96 |
| - validators[stream] = Draft4Validator(o['schema']) |
| 107 | + schema = float_to_decimal(o['schema']) |
| 108 | + validators[stream] = Draft4Validator(schema, format_checker=FormatChecker()) |
97 | 109 | if 'key_properties' not in o:
|
98 | 110 | raise Exception("key_properties field is required")
|
99 | 111 | key_properties[stream] = o['key_properties']
|
|
0 commit comments