Skip to content

Is there a replacement for the BulkProcessor? #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tony-hizzle opened this issue Jan 14, 2022 · 25 comments · Fixed by #474
Closed

Is there a replacement for the BulkProcessor? #108

tony-hizzle opened this issue Jan 14, 2022 · 25 comments · Fixed by #474
Labels
Category: Enhancement New feature or request v7.16.2
Milestone

Comments

@tony-hizzle
Copy link

tony-hizzle commented Jan 14, 2022

With RHLC, one could use the BulkProcessor API to batch IndexRequests and DeleteRequests. Is there a recommended replacement for BulkProcessor when migrating to elasticsearch-java?

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html

@swallez swallez added Category: Enhancement New feature or request v7.16.2 labels Jan 14, 2022
@swallez
Copy link
Member

swallez commented Jan 14, 2022

There's no equivalent for now, but we're working on a replacement that will land in the near future.

@jerryguowei
Copy link

Is there any update on this feature?

@jetpack1116
Copy link

Any update on BulkProcessors for the new ES java api?

@jetpack1116
Copy link

jetpack1116 commented Jul 25, 2022

if there is no equivalent for BulkProcessor in the new elasticsearch java api client, how will we migrate a BulkProcessor code?
Are the new elasticsearch java api client BulkRequest, BulkOperation, BulkResponse object enough?

Is there a sample just to point to the right direction?

@divadpoc
Copy link

divadpoc commented Aug 4, 2022

any update would be much appreciated @swallez

@swallez
Copy link
Member

swallez commented Aug 4, 2022

Sorry, no update yet. I'll raise this internally as a topic that needs to be prioritized.

@antondollmaier
Copy link

Hi folks - thanks for the already open issue.

We've recently migrated from the now deprecated ES7 client to the new API Client in v8 and "stumbled" the very hard way on bulk requests.
ElasticSearch rejects our bulk requests due to max_content_length exceeded - which is plausible, as the bulk() client is now not doing any batching.

Instead of having the "old" way (as well as the way of the clients in, e.g., Python), where the ES client is splitting the bulk request in smaller chunks based on (byte)size or document count, we now have to reside to manually splitting the data ourselves. Seems a step backward to me.

Is there a way to speed up the development of a "new" BulkProcessor, aside from "implement it yourselves"? ("buy Enterprise support", "send us chocolate", "talk to and bribe a member of the Evangelist team", ...)

Thanks! :)

Best,
Anton

@sat245
Copy link

sat245 commented Aug 11, 2022

Do we have any deadline on update of this issue ?
Any documentation coming soon ?

@frank-montyne
Copy link

Any reason why this is taking so long? Why even introduce a new API without this core component for ingestion? This issue should be at the top of the list of things to solve in my opinion. An update and preferably timeline would be greatly appreciated.

Thanks and best regards,
Frank

@fabian-froehlich
Copy link

It seems that I just opened a duplicate to that here: #425
I also asked for a solution in the official board: https://discuss.elastic.co/t/new-java-client-how-to-estimate-size-of-bulkrequest/316211

It hard to beliefe that you should migrate to the new client before switching from es 7 to es 8, when there is no such feature present.

@frank-montyne
Copy link

Well I started migrating to the new java api client and underway realised the bulkprocessor was not available there. It has been almost a year now and still nothing seems to be moving on the Elastic front. I guess only paying customers get support these days...

@frank-montyne
Copy link

I don't understand that this issue is given zero priority. Why introduce a new API if that is not usable in real life situations for bulk processing?
Any update?
Any deadline?

@panthony
Copy link

panthony commented Nov 16, 2022

I stumbled upon this issue yesterday, something like that seems to be working (but I have yet to do extensive testing):

In classpath:

  • co.elastic.clients:elasticsearch-java:8.5.0
  • org.elasticsearch.client:elasticsearch-rest-high-level-client:elasticsearch-rest:7.17.3

Then in my indexing logic that relies on BulkProcessor I have:

        if (esVersion.getMajor() >= 8) {
            requestOptions = RequestOptions.DEFAULT
                .toBuilder()
                .addHeader("Content-Type", "application/vnd.elasticsearch+json; compatible-with=7")
                .addHeader("Accept", "application/vnd.elasticsearch+json; compatible-with=7")
                .build();
        } else {
            requestOptions = RequestOptions.DEFAULT;
        }

That I'm using it like this:

        Request request = new Request(HttpPost.METHOD_NAME, "/_bulk");
        request.setOptions(requestOptions);

With that I do not have NPE anymore and the indexing seems to work properly on es6/es7/es8.

Hopefully I'll not have classpath issues due to having 2 versions of the java client.

@frank-montyne
Copy link

Hi Anthony,

It's not that there is no bulk processor functionality it's the data chunking and the listener that are missing.
We used to be able to do things like:

// Create bulk processor.
BulkProcessor.Listener bulkProcessorListener = new BulkProcessor.Listener() {
	@Override
	public void beforeBulk(long executionId, BulkRequest bulkRequest) {
		// Handle stuff before bulk execution...
	}

	@Override
	public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
		for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
			// Handle item responses...
		}

		if (bulkResponse.hasFailures()) {
			logger.error(bulkResponse.buildFailureMessage());
		}
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
		List<DocWriteRequest<?>> docWriteRequests = bulkRequest.requests();
		for (DocWriteRequest docWriteRequest : docWriteRequests) {
			// Handle failures...
		}
	}
};

BulkProcessor bulkProcessor = BulkProcessor.builder((request, bulkListener) -> esHighLevelClient.
	bulkAsync(request, RequestOptions.DEFAULT, bulkListener), bulkProcessorListener)
	// Only make use of the byte size to force a flush.
	.setBulkActions(-1)
	.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
	.setConcurrentRequests(0)
	.build(); 
...

Below is some code where I actually use the new bulk processor. The events used in the code are documents that need to be stored, updated,... I keep track of the ElasticSearch index in the event itself because updates could go horribly wrong otherwise when there is an index rollover (not everyone purely uses ElasticSearch for a logging use case:). We use ElasticSearch also as our database for master data unlike most other users. I've been using ElasticSearch since version 0.1.4...
Hopefully it is of some help for you.

Best regards,
Frank Montyne

// Create bulk processor.
BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder()
		.waitForActiveShards(asBuilder -> asBuilder.count(1))
		.refresh(Refresh.True);

// Needed as a temporary workaround until withJson() bug is fixed! 
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
JsonProvider jsonProvider = jsonpMapper.jsonProvider();

try {
	// Add events to bulk processor.
	List<BulkOperation> bulkOperations = new ArrayList<>();

	// Add events to bulk processor.
	for (E event : indexSetEvents) {
		// Add request to bulk processor.
		switch (event.action()) {
			case create:
				bulkOperations.add(new CreateOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

			case update:
				bulkOperations.add(new IndexOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

			case delete:
				// Soft delete event.
				event.deleted(true);
				bulkOperations.add(new IndexOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

			case undelete:
				// Soft undelete event.
				event.deleted(false);
				bulkOperations.add(new IndexOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

				case purge:
				// Real physical delete.
				bulkOperations.add(new DeleteOperation.Builder()
						.index(event.esIndex())
						.id(event.id())
						.build()
						._toBulkOperation());					
				break;

			default:
				// Should not get here. Log anyway.
				logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
				break;
		}
	}

	bulkRequestBuilder.operations(bulkOperations);
	
	// Perform bulk request.
	BulkResponse bulkResponse = esClient.bulk(bulkRequestBuilder.build());
	for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {
		// Bulk request API not fully fleshed out yet. There is no way to get to the source.
		// The bulkResponseItem.get().source() fails since bulkResponseItem.get() returns null!
		// Process response item...
	}
}

@panthony
Copy link

panthony commented Nov 17, 2022

@frank-montyne In this case I cannot help you.

I still have the listener because I'm still relying on the 7.X client with ES8 in compatibility mode.

What you are using here is just the Bulk API of ES and it's far (like very far) from what BulkProcessor actually does.

With BulkProcessor you can configure:

  • the concurrency
  • the max size of bulk requests (by document count and/or by size)
  • the back-off policy

You can then "stream" as many documents as you want through it and it will:

  • batch documents in bulk using configured thresholds
  • handles N requests in parallel (N being the concurrency given)
    • the request being the call .bulk on the ES client - what you are doing in your snippet
  • handles retries of failed document within that bulk request following the back-off policy

@frank-montyne
Copy link

frank-montyne commented Nov 17, 2022 via email

@jcr0ss
Copy link

jcr0ss commented Dec 7, 2022

Is adding a new Bulk Processor in 8+ java client jars being worked on? My application relies on the BulkProcessor's functionalities and trying to code my own Bulk Processor seems like a big risk not worth taking. I would rather wait for ElasticSearch dev team to come up with a new one. For now I will continue to use 7.17.3 jars

@fabian-froehlich
Copy link

fabian-froehlich commented Dec 13, 2022

That is the point I am interested in as well (see #425).

I would be glad to know the recommend way to write large data into elasticsearch. You can not know the size of your request by only inspecting your POJOs. So there must be something in this library right?

Maybe @swallez can clear things up?

@panthony
Copy link

panthony commented Jan 4, 2023

@swallez It does not look like the new BulkIngester retry a failed operation like BulkProcessor did, am I reading it correctly? It does not seems to look at the actual response at all.

Previously the BulkProcessor would create a new BulkRequest with only the failed operations for the next batch.

@swallez
Copy link
Member

swallez commented Jan 4, 2023

@panthony the new BulkIngester indeed doesn't have retries for now. There are a few shortcomings in the way BulkProcessor handles retries that would need to be cleared out before adding retries to BulkIngester.

I've opened #478 to outline the issues and a way to implement this. Please continue the discussion on retries there.

@frank-montyne
Copy link

@swallez The BulkIngester helper doesn't seem to be present in the 7.17.8 release. Is that correct? If so to which 7.17.x release will it be added?

Thanks
Frank

@swallez
Copy link
Member

swallez commented Jan 16, 2023

@frank-montyne that's correct. It will be included in 7.17.9 which should be released at the end of this month, and in 8.7.0 that is currently planned somewhere in March.

@frank-montyne
Copy link

frank-montyne commented Jan 16, 2023 via email

@nicolasm35
Copy link

I don't know if it can help someone but I created a class to mimic bulkprocessor.
I did not benchmark nor tested extensively it.

Function sending the data to an insertion thread:

    public void send(Object record) {
    	try {
			queue[currentInsertionThread].put(record);
			currentInsertionThread = (currentInsertionThread + 1) % bulkConcurrentRequests;
		} catch (InterruptedException e) {
			LOG.error("Error while sending records to Elasticsearch: " + e.getMessage());
		}
    }

Class managing insertions:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;

public class ElasticsearchInsertionThread implements Runnable {

	private static final Logger LOG = LogManager.getLogger(ElasticsearchInsertionThread.class);
	
	// List containing bulk operations
	List<BulkOperation> listBulkOperation = new ArrayList<>();
		
	// Maximum number of actions per bulk 
	int bulkMaxActions;
	
	// Index name
	String indexName;
	
	// Elasticsearch client
	ElasticsearchClient client;
	
	// Queue used to share records with main thread
	private BlockingQueue<Object> queue;
	
	// State of thread
	boolean running;
	
    public ElasticsearchInsertionThread(ElasticsearchClient client, BlockingQueue<Object> queue) {
    	this.client = client;
    	this.queue = queue;
    	this.indexName = System.getenv().get("es_index_name");
    	this.bulkMaxActions = Integer.valueOf(System.getenv().get("es_bulk_max_actions"));
    	this.running = false;
    }
    
    /**
     * Get state of thread
     * @return true if the thread is running, otherwise false
     */
    public boolean isRunning() {
    	return running;
    }

    @Override
    public void run() {
    	running = true;
    	try {
			Object record;
			while ((record = queue.take()) != null) {
				@SuppressWarnings("unchecked")
				Map<String, Object> esValues = (Map<String, Object>)record;
				
				if (esValues.isEmpty()) {
					// empty value to exit the thread
					// Flush documents not inserted before leaving
					if (listBulkOperation.size() > 0) {
						insertIntoElasticsearch();
					}
					break;
				}
				
				// Create bulk operation
				listBulkOperation.add(BulkOperation.of(_0 -> _0
						.create(_1 -> _1
								.index(indexName)
								.id(docId)
								.document(esValues))));
				
				if (listBulkOperation.size() == bulkMaxActions) {
					// Send the bulk operations once maximum size has been reached
					insertIntoElasticsearch();
				}
			}
		} catch (InterruptedException e) {
			LOG.error("Error getting data from queue " + e.getMessage());
		}
    	running = false;
    }
    
    /**
     * Insert documents in the bulk into Elasticsearch
     */
    void insertIntoElasticsearch() {
		try {
			BulkResponse response = client.bulk(_0 -> _0
					.operations(listBulkOperation));
			if (response.errors()) {
				LOG.error("Bulk Response has failures");
			}
		} catch (IOException | ElasticsearchException e) {
			LOG.error("Error while sending records to Elasticsearch: " + e.getMessage());
		}

	    listBulkOperation.clear();
    }
}

@swallez
Copy link
Member

swallez commented Jan 24, 2023

@nicolasm35 as mentioned previously, an implementation will be part of the next release. See PR #474 and https://github.com/elastic/elasticsearch-java/tree/main/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk

If you can't wait for the next release, I suggest you copy that code instead of using this more limited implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Category: Enhancement New feature or request v7.16.2
Projects
None yet
Development

Successfully merging a pull request may close this issue.