|
13 | 13 | from tempfile import NamedTemporaryFile
|
14 | 14 |
|
15 | 15 | import pkg_resources
|
16 |
| -from jsonschema.validators import Draft4Validator |
| 16 | +from jsonschema import Draft4Validator, FormatChecker |
| 17 | +from decimal import Decimal |
17 | 18 | import singer
|
18 | 19 | from target_postgres.db_sync import DbSync
|
19 | 20 |
|
20 | 21 | logger = singer.get_logger()
|
21 | 22 |
|
22 | 23 |
|
| 24 | +def float_to_decimal(value): |
| 25 | + '''Walk the given data structure and turn all instances of float into |
| 26 | + double.''' |
| 27 | + if isinstance(value, float): |
| 28 | + return Decimal(str(value)) |
| 29 | + if isinstance(value, list): |
| 30 | + return [float_to_decimal(child) for child in value] |
| 31 | + if isinstance(value, dict): |
| 32 | + return {k: float_to_decimal(v) for k, v in value.items()} |
| 33 | + return value |
| 34 | + |
23 | 35 | def emit_state(state):
|
24 | 36 | if state is not None:
|
25 | 37 | line = json.dumps(state)
|
@@ -65,7 +77,7 @@ def persist_lines(config, lines):
|
65 | 77 | stream = o['stream']
|
66 | 78 |
|
67 | 79 | # Validate record
|
68 |
| - validators[stream].validate(o['record']) |
| 80 | + validators[stream].validate(float_to_decimal(o['record'])) |
69 | 81 |
|
70 | 82 | sync = stream_to_sync[stream]
|
71 | 83 |
|
@@ -93,7 +105,8 @@ def persist_lines(config, lines):
|
93 | 105 | raise Exception("Line is missing required key 'stream': {}".format(line))
|
94 | 106 | stream = o['stream']
|
95 | 107 | schemas[stream] = o
|
96 |
| - validators[stream] = Draft4Validator(o['schema']) |
| 108 | + schema = float_to_decimal(o['schema']) |
| 109 | + validators[stream] = Draft4Validator(schema, format_checker=FormatChecker()) |
97 | 110 | if 'key_properties' not in o:
|
98 | 111 | raise Exception("key_properties field is required")
|
99 | 112 | key_properties[stream] = o['key_properties']
|
|
0 commit comments