-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
- Elasticsearch server version: 7.5.2
- Elasticsearch python version: 7.5.1
I am trying to parse files containing millions of lines, and I am using the helpers.parallel_bulk function for indexing data.
However, it seems that parallel_bulk does not respect the chunk_size parameter, and instead fills up my memory with all the data before it starts insertion.
Code excerpt (full script can be found here):
# Generate data for Elasticsearch to index
def generator(self):
yields = 0
with open(self.file_name, encoding="utf-8") as fp:
for line in fp:
match = self.regex.search( line.strip() )
if match:
if not self.dry_run: # expensive to have dry run check here?
entry = match.groupdict()
entry['leak'] = self.leak_name # use _type instead? save space... / bad for search performance?
doc = dict()
doc['_index'] = self.index
doc['_type'] = self.doc_type # or should this be self.leak_name maybe ?
doc['_id'] = hashlib.md5( json.dumps(entry, sort_keys=True).encode("utf-8") ).hexdigest() # Create md5sum based on the entry, and add it to the document we want to index # expensive operation
doc['_source'] = entry
#pprint(doc)
yields += 1
print('Yields: ', yields)
yield doc
self.good += 1
else:
self.bad += 1
self.ignored.write("%s" % line)
status = "[Good: {}, bad: {}]".format(self.good, self.bad)
Utils.progressbar( (self.good+self.bad), self.total, suffix=status)
# Initialize
def run(self):
# Very good documentation here: https://bluesock.org/~willkg/blog/dev/elasticsearch_part1_index.html
# and here: https://elasticsearch-py.readthedocs.io/en/master/helpers.html
# Create Elasticsearch connection
es = Elasticsearch(
host = self.host,
port = self.port,
timeout = 10,
request_timeout = 10,
max_retries = 10,
retry_on_timeout = True
)
es.cluster.health(wait_for_status='yellow')
try:
for success, info in helpers.parallel_bulk(
client = es,
actions = self.generator(),
#thread_count = 2,
chunk_size = self.chunk_size#,
#max_chunk_bytes = 1 * 1024 * 1024,
#queue_size = 2,
#raise_on_exception = True
):
#print(success)
print('INSERTED!')
#es.cluster.health(wait_for_status='yellow') # During insertion, wait for cluster to be in good state?
if not success:
#print('[INDEXER]: A document failed:', info)
self.failedfile.write("A document failed: %s" % info) # TODO: replace stuff like this with logger
except Exception as error:
raise errorThe "yields" variable counts all the yields done in the generator loop. If I specify a chunk_size of 500, it was of my understanding that the parallel_bulk function should start indexing once the chunk_size is reached? Instead it continues without inserting until all input is completely read (or at around 400.000 chunks). I have confirmed this by printing on success.
Perhaps I am missing something here, or is this expected behavior?