Skip to content

Commit 0477e8d

Browse files
[8.x] [Streams 🌊] Enrichment - Fix broken results due to condition and add skipped metric (#212757) (#213037)
# Backport This will backport the following commits from `main` to `8.x`: - [[Streams 🌊] Enrichment - Fix broken results due to condition and add skipped metric (#212757)](#212757) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Marco Antonio Ghiani","email":"[email protected]"},"sourceCommit":{"committedDate":"2025-03-04T06:43:30Z","message":"[Streams 🌊] Enrichment - Fix broken results due to condition and add skipped metric (#212757)\n\n## 📓 Summary\r\n\r\nWhen the condition is not met, the processing simulation reports wrong\r\nmetrics and fails on a unhandler error.\r\n\r\nThis work fix the issue and also update the document simulation metrics,\r\nreporting how many documents are skipped by a processor during the\r\nsimulation.\r\n\r\nA follow-up work will update the filters on the date to better reflect\r\nthe available states of the documents (parsed, partially parsed,\r\nskipped, failed).\r\n\r\n<img width=\"701\" alt=\"Screenshot 2025-02-28 at 12 47 10\"\r\nsrc=\"https://github.com/user-attachments/assets/1b6979e4-78a1-4db3-af72-faaf06c0e249\"\r\n/>","sha":"6e2a1033b8900529a2276f90a78b36a7ea145cb8","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:obs-ux-logs","backport:version","Feature:Streams","v9.1.0","v8.19.0"],"title":"[Streams 🌊] Enrichment - Fix broken results due to condition and add skipped metric","number":212757,"url":"https://github.com/elastic/kibana/pull/212757","mergeCommit":{"message":"[Streams 🌊] Enrichment - Fix broken results due to condition and add skipped metric (#212757)\n\n## 📓 Summary\r\n\r\nWhen the condition is not met, the processing simulation reports wrong\r\nmetrics and fails on a unhandler error.\r\n\r\nThis work fix the issue and also update the document simulation metrics,\r\nreporting how many documents are skipped by a processor during the\r\nsimulation.\r\n\r\nA follow-up work will update the filters on the date to better reflect\r\nthe available states of the documents (parsed, partially parsed,\r\nskipped, failed).\r\n\r\n<img width=\"701\" alt=\"Screenshot 2025-02-28 at 12 47 10\"\r\nsrc=\"https://github.com/user-attachments/assets/1b6979e4-78a1-4db3-af72-faaf06c0e249\"\r\n/>","sha":"6e2a1033b8900529a2276f90a78b36a7ea145cb8"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/212757","number":212757,"mergeCommit":{"message":"[Streams 🌊] Enrichment - Fix broken results due to condition and add skipped metric (#212757)\n\n## 📓 Summary\r\n\r\nWhen the condition is not met, the processing simulation reports wrong\r\nmetrics and fails on a unhandler error.\r\n\r\nThis work fix the issue and also update the document simulation metrics,\r\nreporting how many documents are skipped by a processor during the\r\nsimulation.\r\n\r\nA follow-up work will update the filters on the date to better reflect\r\nthe available states of the documents (parsed, partially parsed,\r\nskipped, failed).\r\n\r\n<img width=\"701\" alt=\"Screenshot 2025-02-28 at 12 47 10\"\r\nsrc=\"https://github.com/user-attachments/assets/1b6979e4-78a1-4db3-af72-faaf06c0e249\"\r\n/>","sha":"6e2a1033b8900529a2276f90a78b36a7ea145cb8"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> --------- Co-authored-by: Marco Antonio Ghiani <[email protected]>
1 parent af59c21 commit 0477e8d

File tree

3 files changed

+86
-15
lines changed

3 files changed

+86
-15
lines changed

‎x-pack/platform/plugins/shared/streams/server/routes/streams/processing/simulation_handler.ts

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ export interface SimulationError {
6262
| 'non_additive_processor_failure';
6363
}
6464

65-
export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'failed';
65+
export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'skipped' | 'failed';
6666

6767
export interface SimulationDocReport {
6868
detected_fields: Array<{ processor_id: string; name: string }>;
@@ -75,6 +75,7 @@ export interface ProcessorMetrics {
7575
detected_fields: string[];
7676
errors: SimulationError[];
7777
failure_rate: number;
78+
skipped_rate: number;
7879
success_rate: number;
7980
}
8081

@@ -113,7 +114,6 @@ export const simulateProcessing = async ({
113114
/* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */
114115
const simulationData = prepareSimulationData(params);
115116
const pipelineSimulationBody = preparePipelineSimulationBody(simulationData);
116-
117117
/**
118118
* 2. Run both pipeline and ingest simulations in parallel.
119119
* - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs.
@@ -188,7 +188,16 @@ const prepareSimulationProcessors = (
188188
} as ProcessorDefinition;
189189
});
190190

191-
return formatToIngestProcessors(processors);
191+
const dotExpanderProcessor: Pick<IngestProcessorContainer, 'dot_expander'> = {
192+
dot_expander: {
193+
field: '*',
194+
override: true,
195+
},
196+
};
197+
198+
const formattedProcessors = formatToIngestProcessors(processors);
199+
200+
return [dotExpanderProcessor, ...formattedProcessors];
192201
};
193202

194203
const prepareSimulationData = (params: ProcessingSimulationParams) => {
@@ -351,10 +360,18 @@ const computePipelineSimulationResult = (
351360
const processorsMap = initProcessorMetricsMap(processing);
352361

353362
const docReports = simulationResult.docs.map((docResult, id) => {
354-
const { errors, status, value } = getLastDoc(docResult);
363+
const { errors, status, value } = getLastDoc(docResult, sampleDocs[id]._source);
355364

356365
const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source);
357366

367+
docResult.processor_results.forEach((processor) => {
368+
const procId = processor.tag;
369+
370+
if (procId && isSkippedProcessor(processor)) {
371+
processorsMap[procId].skipped_rate++;
372+
}
373+
});
374+
358375
diff.detected_fields.forEach(({ processor_id, name }) => {
359376
processorsMap[processor_id].detected_fields.push(name);
360377
});
@@ -392,6 +409,7 @@ const initProcessorMetricsMap = (
392409
detected_fields: [],
393410
errors: [],
394411
failure_rate: 0,
412+
skipped_rate: 0,
395413
success_rate: 1,
396414
},
397415
]);
@@ -408,30 +426,47 @@ const extractProcessorMetrics = ({
408426
}) => {
409427
return mapValues(processorsMap, (metrics) => {
410428
const failureRate = metrics.failure_rate / sampleSize;
411-
const successRate = 1 - failureRate;
429+
const skippedRate = metrics.skipped_rate / sampleSize;
430+
const successRate = 1 - skippedRate - failureRate;
412431
const detected_fields = uniq(metrics.detected_fields);
413432
const errors = uniqBy(metrics.errors, (error) => error.message);
414433

415434
return {
416435
detected_fields,
417436
errors,
418437
failure_rate: parseFloat(failureRate.toFixed(2)),
438+
skipped_rate: parseFloat(skippedRate.toFixed(2)),
419439
success_rate: parseFloat(successRate.toFixed(2)),
420440
};
421441
});
422442
};
423443

424444
const getDocumentStatus = (doc: SuccessfulIngestSimulateDocumentResult): DocSimulationStatus => {
425-
if (doc.processor_results.every(isSuccessfulProcessor)) return 'parsed';
445+
// Remove the always present base processor for dot expander
446+
const processorResults = doc.processor_results.slice(1);
447+
448+
if (processorResults.every(isSkippedProcessor)) {
449+
return 'skipped';
450+
}
426451

427-
if (doc.processor_results.some(isSuccessfulProcessor)) return 'partially_parsed';
452+
if (processorResults.every((proc) => isSuccessfulProcessor(proc) || isSkippedProcessor(proc))) {
453+
return 'parsed';
454+
}
455+
456+
if (processorResults.some(isSuccessfulProcessor)) {
457+
return 'partially_parsed';
458+
}
428459

429460
return 'failed';
430461
};
431462

432-
const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => {
463+
const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult, sample: FlattenRecord) => {
433464
const status = getDocumentStatus(docResult);
434-
const lastDocSource = docResult.processor_results.at(-1)?.doc?._source ?? {};
465+
const lastDocSource =
466+
docResult.processor_results
467+
.slice(1) // Remove the always present base processor for dot expander
468+
.filter((proc) => !isSkippedProcessor(proc))
469+
.at(-1)?.doc?._source ?? sample;
435470

436471
if (status === 'parsed') {
437472
return {
@@ -440,7 +475,7 @@ const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => {
440475
status,
441476
};
442477
} else {
443-
const { _errors, ...value } = lastDocSource;
478+
const { _errors = [], ...value } = lastDocSource;
444479
return { value: flattenObjectNestedLast(value), errors: _errors as SimulationError[], status };
445480
}
446481
};
@@ -459,7 +494,7 @@ const computeSimulationDocDiff = (
459494
const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor);
460495

461496
const comparisonDocs = [
462-
{ processor_id: 'sample', value: sample },
497+
{ processor_id: 'base', value: docResult.processor_results[0]!.doc!._source },
463498
...successfulProcessors.map((proc) => ({
464499
processor_id: proc.tag,
465500
value: omit(proc.doc._source, ['_errors']),
@@ -495,7 +530,7 @@ const computeSimulationDocDiff = (
495530

496531
// We might have updated fields that are not present in the original document because are generated by the previous processors.
497532
// We exclude them from the list of fields that make the processor non-additive.
498-
const originalUpdatedFields = updatedFields.filter((field) => field in sample);
533+
const originalUpdatedFields = updatedFields.filter((field) => field in sample).sort();
499534
if (!isEmpty(originalUpdatedFields)) {
500535
diffResult.errors.push({
501536
processor_id: nextDoc.processor_id,
@@ -514,7 +549,8 @@ const prepareSimulationResponse = async (
514549
detectedFields: DetectedField[]
515550
) => {
516551
const successRate = computeSuccessRate(docReports);
517-
const failureRate = 1 - successRate;
552+
const skippedRate = computeSkippedRate(docReports);
553+
const failureRate = 1 - skippedRate - successRate;
518554
const isNotAdditiveSimulation = some(processorsMetrics, (metrics) =>
519555
metrics.errors.some(isNonAdditiveSimulationError)
520556
);
@@ -524,6 +560,7 @@ const prepareSimulationResponse = async (
524560
documents: docReports,
525561
processors_metrics: processorsMetrics,
526562
failure_rate: parseFloat(failureRate.toFixed(2)),
563+
skipped_rate: parseFloat(skippedRate.toFixed(2)),
527564
success_rate: parseFloat(successRate.toFixed(2)),
528565
is_non_additive_simulation: isNotAdditiveSimulation,
529566
};
@@ -538,10 +575,12 @@ const prepareSimulationFailureResponse = (error: SimulationError) => {
538575
detected_fields: [],
539576
errors: [error],
540577
failure_rate: 1,
578+
skipped_rate: 0,
541579
success_rate: 0,
542580
},
543581
},
544582
failure_rate: 1,
583+
skipped_rate: 0,
545584
success_rate: 0,
546585
is_non_additive_simulation: isNonAdditiveSimulationError(error),
547586
};
@@ -597,6 +636,12 @@ const computeSuccessRate = (docs: SimulationDocReport[]) => {
597636
return successfulCount / docs.length;
598637
};
599638

639+
const computeSkippedRate = (docs: SimulationDocReport[]) => {
640+
const skippedCount = docs.reduce((rate, doc) => (rate += doc.status === 'skipped' ? 1 : 0), 0);
641+
642+
return skippedCount / docs.length;
643+
};
644+
600645
const computeMappingProperties = (detectedFields: NamedFieldDefinitionConfig[]) => {
601646
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
602647
};
@@ -609,6 +654,12 @@ const isSuccessfulProcessor = (
609654
): processor is WithRequired<IngestSimulatePipelineSimulation, 'doc' | 'tag'> =>
610655
processor.status === 'success' && !!processor.tag;
611656

657+
const isSkippedProcessor = (
658+
processor: IngestSimulatePipelineSimulation
659+
): processor is WithRequired<IngestSimulatePipelineSimulation, 'tag'> =>
660+
// @ts-expect-error Looks like the IngestSimulatePipelineSimulation.status is not typed correctly and misses the 'skipped' status
661+
processor.status === 'skipped';
662+
612663
// TODO: update type once Kibana updates to elasticsearch-js 8.17
613664
const isMappingFailure = (entry: any) => entry.doc?.error?.type === 'document_parsing_exception';
614665

‎x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ export const ProcessorOutcomePreview = ({
6767
}
6868
}, [columns, selectedDocsFilter]);
6969

70+
const simulationFailureRate = simulation
71+
? simulation?.failure_rate + simulation?.skipped_rate
72+
: undefined;
73+
const simulationSuccessRate = simulation?.success_rate;
74+
7075
return (
7176
<>
7277
<EuiFlexItem grow={false}>
@@ -76,8 +81,8 @@ export const ProcessorOutcomePreview = ({
7681
timeRange={timeRange}
7782
onTimeRangeChange={setTimeRange}
7883
onTimeRangeRefresh={onRefreshSamples}
79-
simulationFailureRate={simulation?.failure_rate}
80-
simulationSuccessRate={simulation?.success_rate}
84+
simulationFailureRate={simulationFailureRate}
85+
simulationSuccessRate={simulationSuccessRate}
8186
/>
8287
</EuiFlexItem>
8388
<EuiSpacer size="m" />

‎x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ const formatter = new Intl.NumberFormat('en-US', {
3232
export const ProcessorMetricBadges = ({
3333
detected_fields,
3434
failure_rate,
35+
skipped_rate,
3536
success_rate,
3637
}: ProcessorMetricBadgesProps) => {
3738
const detectedFieldsCount = detected_fields.length;
3839
const failureRate = failure_rate > 0 ? formatter.format(failure_rate) : null;
40+
const skippedRate = skipped_rate > 0 ? formatter.format(skipped_rate) : null;
3941
const successRate = success_rate > 0 ? formatter.format(success_rate) : null;
4042

4143
return (
@@ -53,6 +55,19 @@ export const ProcessorMetricBadges = ({
5355
{failureRate}
5456
</EuiBadge>
5557
)}
58+
{skippedRate && (
59+
<EuiBadge
60+
color="hollow"
61+
iconType="minus"
62+
title={i18n.translate('xpack.streams.processorMetricBadges.euiBadge.skippedRate', {
63+
defaultMessage:
64+
'{skippedRate} of the sampled documents were skipped due to the set condition',
65+
values: { skippedRate },
66+
})}
67+
>
68+
{skippedRate}
69+
</EuiBadge>
70+
)}
5671
{successRate && (
5772
<EuiBadge
5873
color="hollow"

0 commit comments

Comments
 (0)