Skip to content

Commit dfe8556

Browse files
author
kgpayne
authored
Added time_extracted to batch message type. (#29)
1 parent ffc13bb commit dfe8556

File tree

2 files changed

+44
-10
lines changed

2 files changed

+44
-10
lines changed

singer/messages.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ class BatchMessage(Message):
178178
If none is provided, 'jsonl' will be assumed. e.g. 'csv'.
179179
* compression (string, optional) - An indication of file compression format. e.g. 'gzip'.
180180
* batch_size (int, optional) - Number of records in this batch. e.g. 100000.
181+
* time_extracted (datetime, optional) - TZ-aware datetime with batch extraction time.
181182
182183
If file_properties are not provided, uncompressed jsonl files are assumed.
183184
@@ -192,12 +193,19 @@ class BatchMessage(Message):
192193
193194
"""
194195

195-
def __init__(self, stream, filepath, file_format=None, compression=None, batch_size=None):
196+
def __init__(
197+
self, stream, filepath, file_format=None, compression=None,
198+
batch_size=None, time_extracted=None
199+
):
196200
self.stream = stream
197201
self.filepath = filepath
198202
self.format = file_format or 'jsonl'
199203
self.compression = compression
200204
self.batch_size = batch_size
205+
self.time_extracted = time_extracted
206+
if time_extracted and not time_extracted.tzinfo:
207+
raise ValueError("'time_extracted' must be either None " +
208+
"or an aware datetime (with a time zone)")
201209

202210
def asdict(self):
203211
result = {
@@ -210,6 +218,9 @@ def asdict(self):
210218
result['compression'] = self.compression
211219
if self.batch_size is not None:
212220
result['batch_size'] = self.batch_size
221+
if self.time_extracted:
222+
as_utc = self.time_extracted.astimezone(pytz.utc)
223+
result['time_extracted'] = u.strftime(as_utc)
213224
return result
214225

215226

@@ -262,11 +273,22 @@ def parse_message(msg):
262273
version=_required_key(obj, 'version'))
263274

264275
elif msg_type == 'BATCH':
265-
return BatchMessage(stream=_required_key(obj, 'stream'),
266-
filepath=_required_key(obj, 'filepath'),
267-
file_format=_required_key(obj, 'format'),
268-
compression=obj.get('compression'),
269-
batch_size=obj.get('batch_size'))
276+
time_extracted = obj.get('time_extracted')
277+
if time_extracted:
278+
try:
279+
time_extracted = ciso8601.parse_datetime(time_extracted)
280+
except:
281+
LOGGER.warning("unable to parse time_extracted with ciso8601 library")
282+
time_extracted = None
283+
284+
return BatchMessage(
285+
stream=_required_key(obj, 'stream'),
286+
filepath=_required_key(obj, 'filepath'),
287+
file_format=_required_key(obj, 'format'),
288+
compression=obj.get('compression'),
289+
batch_size=obj.get('batch_size'),
290+
time_extracted=time_extracted
291+
)
270292

271293
else:
272294
return None
@@ -342,7 +364,7 @@ def write_version(stream_name, version):
342364

343365
def write_batch(
344366
stream_name, filepath, file_format=None,
345-
compression=None, batch_size=None
367+
compression=None, batch_size=None, time_extracted=None
346368
):
347369
"""Write a batch message.
348370
@@ -352,4 +374,13 @@ def write_batch(
352374
compression = None
353375
batch_size = 100000
354376
"""
355-
write_message(BatchMessage(stream_name, filepath, file_format, compression, batch_size))
377+
write_message(
378+
BatchMessage(
379+
stream=stream_name,
380+
filepath=filepath,
381+
file_format=file_format,
382+
compression=compression,
383+
batch_size=batch_size,
384+
time_extracted=time_extracted
385+
)
386+
)

tests/test_singer.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,11 @@ def test_parse_message_state_missing_value(self):
9292

9393
def test_parse_message_batch_good(self):
9494
message = singer.parse_message(
95-
'{"type": "BATCH", "stream": "users", "filepath": "/tmp/users0001.jsonl", "format": "jsonl"}')
96-
self.assertEqual(message, singer.BatchMessage(stream='users', filepath='/tmp/users0001.jsonl'))
95+
'{"type": "BATCH", "stream": "users", "filepath": "/tmp/users0001.jsonl", "format": "jsonl", "time_extracted": "1970-01-02T00:00:00.000Z"}')
96+
self.assertEqual(
97+
message,
98+
singer.BatchMessage(stream='users', filepath='/tmp/users0001.jsonl', time_extracted=dateutil.parser.parse("1970-01-02T00:00:00.000Z"))
99+
)
97100

98101
def test_parse_message_batch_missing_value(self):
99102
with self.assertRaises(Exception):

0 commit comments

Comments
 (0)