43
43
44
44
import com .fasterxml .jackson .core .JsonFactory ;
45
45
import com .fasterxml .jackson .core .JsonGenerator ;
46
- import org .elasticsearch .action .DocWriteRequest .OpType ;
47
- import org .elasticsearch .action .bulk .BulkRequest ;
48
- import org .elasticsearch .action .bulk .BulkResponse ;
49
- import org .elasticsearch .action .index .IndexRequest ;
46
+ import com .fasterxml .jackson .databind .ObjectMapper ;
47
+ import org .apache .http .StatusLine ;
50
48
import org .elasticsearch .action .search .SearchRequest ;
51
49
import org .elasticsearch .action .search .SearchResponse ;
52
50
import org .elasticsearch .action .support .WriteRequest .RefreshPolicy ;
51
+ import org .elasticsearch .client .Request ;
53
52
import org .elasticsearch .client .RequestOptions ;
54
53
import org .elasticsearch .client .RestHighLevelClient ;
55
54
import org .elasticsearch .index .query .TermQueryBuilder ;
62
61
import org .elasticsearch .search .aggregations .bucket .composite .CompositeValuesSourceBuilder ;
63
62
import org .elasticsearch .search .aggregations .bucket .composite .TermsValuesSourceBuilder ;
64
63
import org .elasticsearch .search .builder .SearchSourceBuilder ;
65
- import org .elasticsearch .xcontent .XContentType ;
66
64
import org .xml .sax .Attributes ;
67
65
import org .xml .sax .Locator ;
68
66
import org .xml .sax .SAXException ;
69
67
70
68
import java .io .ByteArrayOutputStream ;
71
69
import java .io .IOException ;
72
70
import java .security .InvalidParameterException ;
71
+ import java .time .Duration ;
72
+ import java .time .Instant ;
73
73
import java .time .ZonedDateTime ;
74
74
import java .time .format .DateTimeFormatter ;
75
75
import java .util .ArrayList ;
76
+ import java .util .HashMap ;
76
77
import java .util .List ;
77
78
import java .util .Map ;
78
79
import javax .inject .Inject ;
@@ -108,8 +109,9 @@ class ElasticIndexingFilter extends AbstractXMLFilter {
108
109
private String indexNameDateFormat ;
109
110
private String indexNameDateFieldName = "@timestamp" ;
110
111
111
- private final List <IndexRequest > indexRequests ;
112
- private final ByteArrayOutputStream currentDocument ;
112
+ private final List <String > indexRequests = new ArrayList <>();
113
+ private long indexRequestsDocCount = 0 ;
114
+ private final ByteArrayOutputStream currentDocument = new ByteArrayOutputStream (INITIAL_JSON_STREAM_SIZE_BYTES );
113
115
private final StringBuilder valueBuffer = new StringBuilder ();
114
116
private String currentDocFieldName = null ;
115
117
private int currentDocPropertyCount = 0 ;
@@ -136,9 +138,6 @@ class ElasticIndexingFilter extends AbstractXMLFilter {
136
138
this .elasticClusterStore = elasticClusterStore ;
137
139
this .streamProcessorHolder = streamProcessorHolder ;
138
140
this .metaHolder = metaHolder ;
139
-
140
- indexRequests = new ArrayList <>();
141
- currentDocument = new ByteArrayOutputStream (INITIAL_JSON_STREAM_SIZE_BYTES );
142
141
}
143
142
144
143
/**
@@ -392,28 +391,25 @@ private void processDocument() {
392
391
if (currentDocPropertyCount > 0 ) {
393
392
jsonGenerator .flush ();
394
393
395
- final IndexRequest indexRequest = new IndexRequest (formatIndexName ())
396
- .opType (OpType .CREATE )
397
- .source (currentDocument .toByteArray (), XContentType .JSON );
398
-
399
- // If an ingest pipeline name is specified, execute it when ingesting the document
400
- if (ingestPipelineName != null && !ingestPipelineName .isEmpty ()) {
401
- indexRequest .setPipeline (ingestPipelineName );
402
- }
403
-
404
- indexRequests .add (indexRequest );
394
+ final HashMap <String , Object > indexMap = new HashMap <>();
395
+ indexMap .put ("_index" , formatIndexName ());
396
+ final HashMap <String , Object > createMap = new HashMap <>();
397
+ createMap .put ("create" , indexMap );
398
+ indexRequests .add (new ObjectMapper ().writeValueAsString (createMap ));
399
+ indexRequests .add (currentDocument .toString ());
400
+ indexRequestsDocCount ++;
405
401
}
406
402
407
- if (indexRequests . size () >= batchSize ) {
403
+ if (indexRequestsDocCount >= batchSize ) {
408
404
indexDocuments ();
409
405
}
410
406
} catch (IOException e ) {
411
407
fatalError ("Failed to flush JSON to stream" , e );
412
408
} catch (Exception e ) {
413
409
fatalError (e .getMessage (), e );
410
+ } finally {
411
+ clearDocument ();
414
412
}
415
-
416
- clearDocument ();
417
413
}
418
414
419
415
private void clearDocument () {
@@ -444,7 +440,6 @@ private boolean purgeDocumentsForStream(final RestHighLevelClient elasticClient,
444
440
final BulkByScrollResponse deleteResponse = elasticClient .deleteByQuery (deleteRequest ,
445
441
RequestOptions .DEFAULT );
446
442
final long deletedCount = deleteResponse .getDeleted ();
447
-
448
443
LOGGER .info ("Deleted {} documents matching StreamId: {} from index: {}, took {} seconds" ,
449
444
deletedCount , streamId , indexName , deleteResponse .getTook ().getSecondsFrac ());
450
445
}
@@ -508,49 +503,43 @@ private List<String> getTargetIndexNames(final RestHighLevelClient elasticClient
508
503
* Index the current batch of documents
509
504
*/
510
505
private void indexDocuments () {
511
- if (indexRequests . size () > 0 ) {
506
+ if (indexRequestsDocCount > 0 ) {
512
507
final ElasticClusterDoc elasticCluster = elasticClusterStore .readDocument (clusterRef );
513
508
514
509
elasticClientCache .context (elasticCluster .getConnection (), elasticClient -> {
515
510
try {
516
511
// Create a new bulk indexing request, containing the current batch of documents
517
- final BulkRequest bulkRequest = new BulkRequest ( );
512
+ final Request request = new Request ( "POST" , "/_bulk" );
518
513
519
514
// For each document, create an indexing request and append to the bulk request
520
- for (IndexRequest indexRequest : indexRequests ) {
521
- bulkRequest .add (indexRequest );
515
+ final String requestBody = String .join (System .lineSeparator (), indexRequests ) +
516
+ System .lineSeparator ();
517
+ request .setJsonEntity (requestBody );
518
+
519
+ // If an ingest pipeline name is specified, execute it when ingesting the document
520
+ if (ingestPipelineName != null && !ingestPipelineName .isEmpty ()) {
521
+ request .addParameter ("pipeline" , ingestPipelineName );
522
522
}
523
523
524
524
if (refreshAfterEachBatch ) {
525
525
// Refresh upon completion of the batch index request
526
- bulkRequest . setRefreshPolicy ( RefreshPolicy .IMMEDIATE );
526
+ request . addParameter ( "refresh" , RefreshPolicy .IMMEDIATE . getValue () );
527
527
} else {
528
528
// Only refresh after all batches have been indexed
529
- bulkRequest . setRefreshPolicy ( RefreshPolicy .NONE );
529
+ request . addParameter ( "refresh" , RefreshPolicy .NONE . getValue () );
530
530
}
531
531
532
- final BulkResponse response = elasticClient .bulk (bulkRequest , RequestOptions .DEFAULT );
533
- if (response .hasFailures ()) {
534
- throw new IOException ("Bulk index request failed: " + response .buildFailureMessage ());
535
- } else {
536
- LOGGER .info ("Indexed {} documents to Elasticsearch cluster: {}, took {} seconds" ,
537
- indexRequests .size (), elasticCluster .getName (), response .getTook ().getSecondsFrac ());
538
- }
539
- } catch (final RuntimeException e ) {
532
+ request .setOptions (RequestOptions .DEFAULT );
533
+ final Instant start = Instant .now ();
534
+ elasticClient .getLowLevelClient ().performRequest (request );
535
+ final float tookSeconds = Duration .between (start , Instant .now ()).toMillis () / 1000.0f ;
536
+ LOGGER .info ("Indexed {} documents to Elasticsearch cluster: {}, took {} seconds" ,
537
+ indexRequestsDocCount , elasticCluster .getName (), tookSeconds );
538
+ } catch (final RuntimeException | IOException e ) {
540
539
fatalError (e .getMessage (), e );
541
- } catch (IOException e ) {
542
- final String message = e .getMessage ();
543
- // Elasticsearch v8.0.0 breaks the Java High Level REST Client `bulk` API, by sending back a
544
- // response that the API cannot handle, causing an exception.
545
- // This is a workaround, where we inspect the actual HTTP return code and if it's `200`, take
546
- // the request to have succeeded.
547
- // TODO: Review this once a compatible Elasticsearch Java client is released
548
- // @see https://github.com/elastic/elasticsearch/issues/84173
549
- if (message == null || !message .matches ("^.+ response=HTTP/1\\ .1 200 OK}$" )) {
550
- fatalError (message , e );
551
- }
552
540
} finally {
553
541
indexRequests .clear ();
542
+ indexRequestsDocCount = 0 ;
554
543
}
555
544
});
556
545
}
0 commit comments