diff --git a/manage-data/ingest/transform-enrich/common-mistakes.md b/manage-data/ingest/transform-enrich/common-mistakes.md new file mode 100644 index 000000000..75b8a2bce --- /dev/null +++ b/manage-data/ingest/transform-enrich/common-mistakes.md @@ -0,0 +1,510 @@ +--- +mapped_pages: + - https://www.elastic.co/docs/manage-data/ingest/transform-enrich/common-mistakes.html +applies_to: + stack: ga + serverless: ga +--- + +# Common mistakes + +Here we are not discussing any performance metrics and if one way or the other one is faster, has less heap usage etc. What we are looking for is ease of maintenance and readability. Anybody who knows a bit about ingest pipelines should be able to fix a lot of issues. This section should provide a clear guide to “oh I have written this myself, ah that is the easier way to write it”. + +## if statements + +### Contains and lots of ORs + +This \== statement can be rewritten + +```painless +"if": "ctx?.kubernetes?.container?.name == 'admin' || ctx?.kubernetes?.container?.name == 'def' +|| ctx?.kubernetes?.container?.name == 'demo' || ctx?.kubernetes?.container?.name == 'acme' +|| ctx?.kubernetes?.container?.name == 'wonderful' +``` + +to the following easier maintainable and readable comprehension + +```painless +["admin","def", ...].contains(ctx.kubernetes?.container?.name) +``` + +The big implication here is that now the value `admin.contains('admin')` is executed. So if you only want to partially match because your data is `demo-admin-demo` then you still need to write: `ctx.kubernetes.container.name.contains('admin') ||...` + +The `?` work without any issue as it will be rewritten to `admin.contains(null)`. + +### Missing ? and contains operation + +Here is another example, which would fail if `openshift` is not properly set since it is not using `?`, also the `()` are not really doing anything. As well as the unnecessary check of `openshift.origin` and then `openshift.origin.threadId` + +```painless +"if": "ctx.openshift.eventPayload != null +&& (ctx.openshift.eventPayload.contains('Start expire sessions')) +&& ctx.openshift.origin != null +&& ctx.openshift.origin.threadId != null +&& (ctx.openshift.origin.threadId.contains('Catalina-utility'))", +``` + +This can become this: + +```painless +"if": "ctx.openshift?.eventPayload instanceof String +&& ctx.openshift.eventPayload.contains('Start expire sessions') +&& ctx.openshift?.origin?.threadId instanceof String +&& ctx.openshift.origin.threadId.contains('Catalina-utility')", +``` + +### Contains operation and null check + +This includes an initial null check, which is not necessary. + +```painless +"if": "ctx.event?.action !=null +&& ['bandwidth','spoofed syn flood prevention','dns authentication','tls attack prevention', + 'tcp syn flood detection','tcp connection limiting','http rate limiting', + 'block malformed dns traffic','tcp connection reset','udp flood detection', + 'dns rate limiting','malformed http filtering','icmp flood detection', + 'dns nxdomain rate limiting','invalid packets'].contains(ctx.event.action)" +``` + +This behaves nearly the same: + +```painless +"if": "['bandwidth','spoofed syn flood prevention','dns authentication','tls attack prevention', + 'tcp syn flood detection','tcp connection limiting','http rate limiting', + 'block malformed dns traffic','tcp connection reset','udp flood detection', + 'dns rate limiting','malformed http filtering','icmp flood detection', + 'dns nxdomain rate limiting','invalid packets'].contains(ctx.event?.action)" +``` + +The difference is in the execution itself which should not matter since it is Java under the hood and pretty fast as this. In reality what happens is the following when doing the first one with the initial: `ctx.event?.action != null` If action is null, then it will exit here and not even perform the contains operation. In our second example we basically run the contains operation x times, for every item in the array and have `valueOfarray.contains('null')` then. + +### Checking null and type unnecessarily + +This is just unnecessary + +```painless +"if": "ctx?.openshift?.eventPayload != null && ctx.openshift.eventPayload instanceof String" +``` + +Because this is the same. + +```painless +"if": "ctx.openshift?.eventPayload instanceof String" +``` + +Similar to the one above, in addition to that, we do not have `?` for dot-walking. + +```json +{ + "fail": { + "message": "This cannot be parsed as it a list and not a single message", + "if": "ctx._tmp.leef_kv.labelAbc != null && ctx._tmp.leef_kv.labelAbc instanceof List" + } +}, +``` + +This version is easier to read and maintain since we remove the unnecessary null check and add dot walking. + +```json +{ + "fail": { + "message": "This cannot be parsed as it a list and not a single message", + "if": "ctx._tmp?.leef_kv?.labelAbc instanceof List" + } +}, +``` + +### Checking null and for a value + +This is interesting as it misses the `?` and therefore will have a null pointer exception if `event.type` is ever null. + +```painless +"if": "ctx.event.type == null || ctx.event.type == '0'" +``` + +This needs to become this: + +```painless +"if": "ctx.event?.type == null || ctx.event?.type == '0'" +``` + +The reason why we need twice the `?` is because we are using an OR operator `||` therefore both parts of the if statement are executed. + +### Checking null + +It is not necessary to write a `?` after the ctx itself. For first level objects such as `ctx.message`, `ctx.demo` it is enough to write it like this. If ctx is ever null you face other problems (basically the entire context, so the entire `_source` is empty and there is not even a _source... it's basically all null) + +```painless +"if": "ctx?.message == null" +``` + +Is the same as: + +```painless +"if": "ctx.message == null" +``` + +### Checking null multiple times + +This is similar to other topics discussed above already. It is often not needed to check using the `?` a 2nd time when you already walked the object / path. + +```painless +"if": "ctx.arbor?.ddos?.subsystem == 'CLI' && ctx.arbor?.ddos?.command_line !=null" +``` + +Same as: + +```painless +"if": "ctx.arbor?.ddos?.subsystem == 'CLI' && ctx.arbor.ddos.command_line !=null" +``` + +Because the if condition is always executed left to right and therefore when `CLI` fails, the 2nd part of the if condition is not triggered. This means that we already walked the `ctx.arbor.ddos` path and therefore know that this object exists. + +### Checking null way to often + +This: + +```painless +"if": "ctx.process != null && ctx.process.thread != null + && ctx.process.thread.id != null && (ctx.process.thread.id instanceof String)" +``` + +Can become just this: + +```painless +"if": "ctx.process?.thread?.id instanceof String" +``` + +That is what the `?` is for, instead of listing every step individually and removing the unnecessary `()` as well. + +### Checking emptiness + +This: + +```painless +"if": "ctx?.user?.geo?.region != null && ctx?.user?.geo?.region != ''" +``` + +Is the same as this. You do not need to write in the second && clause the ? anymore. Since you already proven that this is not null. + +```painless +"if": "ctx.user?.geo?.region != null && ctx.user.geo.region != ''" +``` + +Alternatively you can use (elvis, but if geo.region is a number/object, anything else than a String, you have a problem) + +```painless +"if": "ctx.user?.geo?.region?.isEmpty() ?: false" +``` + +Alternatively you can use: + +```painless +"if": "ctx.user?.geo?.region instanceof String && ctx.user.geo.region.isEmpty() == false" +``` + +Here is a full reproducible example: + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "user": { + "geo": { + "region": "123" + } + } + } + }, + { + "_source": { + "user": { + "geo": { + "region": "" + } + } + } + }, + { + "_source": { + "user": { + "geo": { + "region": null + } + } + } + }, + { + "_source": { + "user": { + "geo": null + } + } + } + ], + "pipeline": { + "processors": [ + { + "set": { + "field": "demo", + "value": true, + "if": "if": "ctx.user?.geo?.region != null && ctx.user.geo.region != ''" + } + } + ] + } +} +``` + +## Going from mb,gb values to bytes + +We recommend to store everything as bytes in Elastic in `long` and then use the advanced formatting in Kibana Data View to render the bytes to human readable. + +Things like this: + +```json +{ + "gsub": { + "field": "document.size", + "pattern": "M", + "replacement": "", + "ignore_missing": true, + "if": "ctx?.document?.size != null && ctx.document.size.endsWith(\"M\")" + } +}, +{ + "gsub": { + "field": "document.size", + "pattern": "(\\d+)\\.(\\d+)G", + "replacement": "$1$200", + "ignore_missing": true, + "if": "ctx?.uws?.size != null && ctx.document.size.endsWith(\"G\")" + } +}, +{ + "gsub": { + "field": "document.size", + "pattern": "G", + "replacement": "000", + "ignore_missing": true, + "if": "ctx?.uws?.size != null && ctx.document.size.endsWith(\"G\")" + } +} +``` + +Can become this: + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "document": { + "size": "100M" + } + } + } + ], + "pipeline": { + "processors": [ + { + "bytes": { + "field": "document.size" + } + } + ] + } +} +``` + +## Rename processor + +The rename processor renames a field. There are two flags: + +- ignore_missing +- ignore_failure + +Ignore missing is useful when you are not sure that the field you want to rename from exist. Ignore_failure will help you with any failure encountered. The rename processor can only rename to non-existing fields. If you already have the field `abc` and you want to rename `def` to `abc` then the operation fails. The `ignore_failure` helps you in this case. + +## Script processor + +Sometimes we have to fallback to script processors. + +### Calculating event.duration in a complex manner + +There are many things wrong: + +- Square bracket access for fields. +- Unnecessary if statement with multiple `!=` null statements instead of `?` usage. +- SubString parsing instead of using DateTime features. +- Event.duration is directly accessed without checking if event is even available. +- Event.duration is wrong, this gives milliseconds. `Event.duration` should be in Nanoseconds. + +```json +{ + "script": { + "source": """ + String timeString = ctx['temp']['duration']; + ctx['event']['duration'] = Integer.parseInt(timeString.substring(0,2))*360000 + Integer.parseInt(timeString.substring(3,5))*60000 + Integer.parseInt(timeString.substring(6,8))*1000 + Integer.parseInt(timeString.substring(9,12)); + """, + "if": "ctx.temp != null && ctx.temp.duration != null" + } +}, +``` + +This becomes this. + +We use the if condition to ensure that `ctx.event` is available. The `[:]` is a shorthand to writing `ctx.event = New HashMap();`. We leverage the `DateTimeFormatter` and the `LocalTime` and use a builtin function to calculate the `NanoOfDay`. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "temp": { + "duration": "00:00:06.448" + } + } + } + ], + "pipeline": { + "processors": [ + { + "script": { + "source": """ + if(ctx.event == null){ + ctx.event = [:]; + } + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS"); + LocalTime time = LocalTime.parse(ctx.temp.duration, formatter); + ctx.event.duration = time.toNanoOfDay(); + """, + "if": "ctx.temp != null && ctx.temp.duration != null" + } + } + ] + } +} +``` + +## Unnecessary complex script to stitch together IP + +- No check if destination is available as object +- Using square brackets for accessing +- Unnecessary casting to Integer, we parse it as a String later anyway, so doesn't really matter. +- Unnecessary allocation of additional field String ip , we can set the ctx. directly. + +```json +{ + "script": { + "source": """ + String[] ipSplit = ctx['destination']['ip'].splitOnToken('.'); + String ip = Integer.parseInt(ipSplit[0]) + '.' + Integer.parseInt(ipSplit[1]) + '.' + Integer.parseInt(ipSplit[2]) + '.' +Integer.parseInt(ipSplit[3]); + ctx['destination']['ip'] = ip; + """, + "if": "(ctx['destination'] != null) && (ctx['destination']['ip'] != null)" + } +} +``` + +Can become this. Instead of specifying `String[]` you can also just write `def temp`. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "destination": { + "ip": "192.168.0.1.3.4.5.6.4" + } + } + } + ], + "pipeline": { + "processors": [ + { + "script": { + "source": """ + String[] temp = ctx.destination.ip.splitOnToken('.'); + ctx.destination.ip = temp[0]+"."+temp[1]+"."+temp[2]+"."+temp[3]; + """, + "if": "ctx.destination?.ip != null" + } + } + ] + } +} +``` + +## Removing of @timestamp + +I encountered this bit: + +```json +{ + "set": { + "field": "openshift.timestamp", + "value": "{{openshift.date}} {{openshift.time}}", + "if": "ctx?.openshift?.date != null && ctx?.openshift?.time != null && ctx?.openshift?.timestamp == null" + } +}, +{ + "remove": { + "field": "@timestamp", + "ignore_missing": true, + "if": "ctx?.openshift?.timestamp != null || ctx?.openshift?.timestamp1 != null" + } +}, +{ + "date": { + "field": "openshift.timestamp", + "formats": [ + "yyyy-MM-dd HH:mm:ss", + "ISO8601" + ], + "timezone": "Europe/Vienna", + "if": "ctx?.openshift?.timestamp != null" + } +} +``` + +The removal is completely unnecessary. The date processor always overwrites the value in `@timestamp` unless you specify the target field. + +## Mustache tips and tricks + +### Accessing values in an array + +Using the `.index` in this case accessing the first value in the array can be done with `.0` + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "host": { + "hostname": "abc" + }, + "tags": [ + "cool-host" + ] + } + } + ], + "pipeline": { + "processors": [ + { + "set": { + "field": "host.alias", + "value": "{{tags.0}}" + } + } + ] + } +} +``` diff --git a/manage-data/ingest/transform-enrich/error-handling.md b/manage-data/ingest/transform-enrich/error-handling.md new file mode 100644 index 000000000..fb6029721 --- /dev/null +++ b/manage-data/ingest/transform-enrich/error-handling.md @@ -0,0 +1,151 @@ +--- +mapped_pages: + - https://www.elastic.co/docs/manage-data/ingest/transform-enrich/error-handling.html +applies_to: + stack: ga + serverless: ga +--- + +# Error handling + +Ingest pipelines in Elasticsearch are powerful tools for transforming and enriching data before indexing. However, errors can occur during processing. This guide outlines strategies for handling such errors effectively. + +**Important**: Ingest pipelines are executed before the document is indexed by Elasticsearch. You can handle the errors occurring while processing the document (i.e. transforming the json object) but not the errors triggered while indexing like mapping conflict. For this is the Elasticsearch Failure Store. + +Errors in ingest pipelines typically fall into the following categories: + +* Parsing Errors: Occur when a processor fails to parse a field, such as a date or number. +* Missing Fields: Happen when a required field is absent in the document. + +**Recommendation**: Create an `error-handling-pipeline` that sets `event.kind` to `pipeline_error` and stores the error message, along with the tag from the failed processor, in the `error.message` field. Including a tag is especially helpful when using multiple `grok`, `dissect`, or `script` processors, as it helps identify which one caused the failure. + +The `on_failure` parameter can be defined either for individual processors or at the pipeline level to catch exceptions that may occur during document processing. The `ignore_failure` option allows a specific processor to silently skip errors without affecting the rest of the pipeline. + +## Global vs. Processor-Specific + +The following example demonstrates how to use the `on_failure` handler at the pipeline level rather than within individual processors. While this approach ensures the pipeline exits gracefully on failure, it also means that processing stops at the point of error. + +In this example, a typo was made in the configuration of the `dissect` processor intended to extract `user.name` from the message. A comma (`,`) was used instead of the correct colon (`:`). + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "@timestamp": "2025-04-03T10:00:00.000Z", + "message": "user: philipp has logged in" + } + } + ], + "pipeline": { + "processors": [ + { + "dissect": { + "field": "message", + "pattern": "%{}, %{user.name} %{}", + "tag": "dissect for user.name" + } + }, + { + "append": { + "field": "event.category", + "value": "authentication" + } + } + ], + "on_failure": [ + { + "set": { + "field": "event.kind", + "value": "pipeline_error" + } + }, + { + "append": { + "field": "error.message", + "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message: {{ _ingest.on_failure_message }}" + } + } + ] + } +} +``` + +The second processor, which sets `event.category` to `authentication`, is no longer executed because the first `dissect` processor fails and triggers the global `on_failure` handler. The resulting document shows which processor caused the error, the pattern it attempted to apply, and the input it received. + +```json +"@timestamp": "2025-04-03T10:00:00.000Z", +"message": "user: philipp has logged in", +"event": { + "kind": "pipeline_error" +}, +"error": { + "message": "Processor dissect with tag dissect for user.name in pipeline _simulate_pipeline failed with message: Unable to find match for dissect pattern: %{}, %{user.name} %{} against source: user: philipp has logged in" +} +``` + +We can restructure the pipeline by moving the `on_failure` handling directly into the processor itself. This allows the pipeline to continue execution. In this case, the `event.category` processor still runs. You can also retain the global `on_failure` to handle errors from other processors, while adding processor-specific error handling where needed. + +(While executing two `set` processors within the `dissect` error handler may not always be ideal, it serves as a demonstration.) + +For the `dissect` processor, consider setting a temporary field like `_tmp.error: dissect_failure`. You can then use `if` conditions in later processors to execute them only if parsing failed, allowing for more controlled and flexible error handling. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "@timestamp": "2025-04-03T10:00:00.000Z", + "message": "user: philipp has logged in" + } + } + ], + "pipeline": { + "processors": [ + { + "dissect": { + "field": "message", + "pattern": "%{}, %{user.name} %{}", + "on_failure": [ + { + "set": { + "field": "event.kind", + "value": "pipeline_error" + } + }, + { + "append": { + "field": "error.message", + "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message: {{ _ingest.on_failure_message }}" + } + } + ], + "tag": "dissect for user.name" + } + }, + { + "append": { + "field": "event.category", + "value": "authentication" + } + } + ], + "on_failure": [ + { + "set": { + "field": "event.kind", + "value": "pipeline_error" + } + }, + { + "set": { + "field": "error.message", + "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message: {{ _ingest.on_failure_message }}" + } + } + ] + } +} +``` diff --git a/manage-data/ingest/transform-enrich/general-tips.md b/manage-data/ingest/transform-enrich/general-tips.md new file mode 100644 index 000000000..6217b78b8 --- /dev/null +++ b/manage-data/ingest/transform-enrich/general-tips.md @@ -0,0 +1,461 @@ +--- +mapped_pages: + - https://www.elastic.co/docs/manage-data/ingest/transform-enrich/general-tips-and-tricks.html +applies_to: + stack: ga + serverless: ga +--- + +# Tips and Tricks + +There are various ways to handle data in ingest pipelines, and while they all produce similar results, some methods might be more suitable depending on the specific case. This section provides guidance to ensure that your ingest pipelines are consistent, readable, and maintainable. While we won't focus heavily on performance optimizations, the goal is to create pipelines that are easy to understand and manage. + +## Accessing Fields in `if` Statements + +In an ingest pipeline, when working with `if` statements inside processors, you can access fields in two ways: + +- Dot notation +- Square bracket notation + +For example: + +- `ctx.event.action` + +is equivalent to: + +- `ctx['event']['action']` + +Both notations can be used to reference fields, so choose the one that makes your pipeline easier to read and maintain. + +### Downsides of brackets + +- No support for null safety operations `?` + +### When to use brackets + +When you have special characters such as `@` in the field name, or a `.` in the field name. As an example: + +- field_name: `demo.cool.stuff` + +using: + +`ctx.demo.cool.stuff` it would try to access the field `stuff` in the object `cool` in the object `demo`. + +using: + +`ctx['demo.cool.stuff']` it can access the field directly. + +You can also mix and match both worlds when needed: + +- field_name: `my.nested.object.has@!%&chars` + +Proper way: `ctx.my.nested.object['has@!%&chars']` + +You can even, partially use the `?` operator: + +- `ctx.my?.nested?.object['has@!%&chars']` + +But it will error if object is `null`. To be a 100% on the safe side you need to write the following statement: + +- `ctx.my?.nested?.object != null && ctx.my.nested.object['has@!%&chars'] == ...` + +## Accessing fields in a script + +Within a script there are the same two possibilities to access fields as above. As well as the new `getter`. This only works in the painless scripts in an ingest pipeline\! Take the following input: + +```json +{ + "_source": { + "user_name": "philipp" + } +} +``` + +When you want to set the `user.name` field with a script: + +- `ctx.user.name = ctx.user_name` + +This works as long as `user_name` is populated. If it is null, you get null as value for user.name. Additionally, when the `user` object does not exist, it will error because Java needs you to define the `user` object first before adding a key `name` into it. + +This is one of the alternatives to get it working when you only want to set it, if it is not null + +``` +if (ctx.user_name != null) { + ctx.user.name = ctx.user_name +} +``` + +This works fine, as you now check for null. + +However there is also an easier to write and maintain alternative available: + +- `ctx.user.name = $('user_name', null);` + +This $('field', 'fallback') allows you to specify the field without the `CTX` for walking. You can even supply `$('this.very.nested.field.is.super.far.away', null)` when you need to. The fallback is in case the field is null. This comes in very handy when needing to do certain manipulation of data. Let's say you want to lowercase all the field names, you can simply write this now: + +- `ctx.user.name = $('user_name','').toLowerCase();` + +You see that I switched up the null value to an empty String. Since the String has the `toLowerCase()` function. This of course works with all types. Bit of a silly thing, since you could simply write `object.abc` as the field value. As an example you can see that we can even create a map, list, array, whatever you want. + +- `if ($('object', {}).containsKey('abc')){}` + +One common thing I use it for is when dealing with numbers and casting. The field specifies the usage in `%`, however Elasticsearch doesn't like this, or better to say Kibana renders % as `0-1` for `0%-100%` and not `0-100`. `100` is equal to `10.000%` + +- field: `cpu_usage = 100.00` +- `ctx.cpu.usage = $('cpu_usage',0.0)/100` + +This allows me to always set the `cpu.usage` field and not to worry about it, have an always working division. One other way to leverage this, in a simpler script is like this, but most scripts are rather complex so this is not that often applicable. + +``` +script: { + source: "ctx.abc = ctx.def" + if: "ctx.def != null" +} +``` + +## Check if a value exists and is not null + +In simplest case the `ignore_empty_value` parameter is available in most processors to handle fields without values. Or the `ignore_failure` parameter to let the processor fail without impacting the pipeline you but sometime you will need to use the [null safe operator `?.`](https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-operators-reference.html#null-safe-operator) to check if a field exists and is not `null`. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "host": { + "hostname": "test" + }, + "ip": "127.0.0.1" + } + }, + { + "_source": { + "ip": "127.0.0.1" + } + } + ], + "pipeline": { + "processors": [ + { + "set": { + "field": "a", + "value": "b", + "if": "ctx.host?.hostname == 'test'" + } + } + ] + } +} +``` + +This pipeline will work in both cases because `host?` checks if `host` exists and if not returns `null`. Removing `?` from the `if` condition will fail the second document with an error message: `cannot access method/field [hostname] from a null def reference` + +The null operator `?` is actually doing this behind the scene: + +Imagine you write this: + +- `ctx.windows?.event?.data?.user?.name == "philipp"` + +Then the ? will transform this simple if statement to this: + +``` +ctx.windows != null && +ctx.windows.event != null && +ctx.windows.event.data != null && +ctx.windows.event.data.user != null && +ctx.windows.event.data.user.name == "philipp" +``` + +You can use the null safe operator with function too: + +- `ctx.message?.startsWith('shoe')` + +An [elvis](https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-operators-reference.html#elvis-operator) might be useful in your script to handle these maybe null value: + +- `ctx.message?.startsWith('shoe') ?: false` + +Most safest and secure option is to write: + +- `ctx.message instanceof String && ctx.message.startsWith('shoe')` +- `ctx.event?.category instanceof String && ctx.event.category.startsWith('shoe')` + +The reason for that is, if `event.category` is a number, object or anything other than a `String` then it does not have the `startsWith` function and therefore will error with function `startsWith` not available on type object. + +## Check if a key is in a document + +The `containsKey` can be used to check if a map contains a specific key. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "ip": "127.0.0.1" + } + }, + { + "_source": { + "test": "asd" + } + } + ], + "pipeline": { + "processors": [ + { + "set": { + "field": "a", + "value": "b", + "if": "ctx.containsKey('test')" + } + } + ] + } +} +``` + +## Remove empty fields or remove empty fields that match a regular expression + +Alex and Honza created a [blog post](https://alexmarquardt.com/2020/11/06/using-elasticsearch-painless-scripting-to-iterate-through-fields/) presenting painless scripts that remove empty fields or fields that match a regular expression. We are already using this in a lot of places. Most of the time in the custom pipeline and in the final pipeline as well. + +```json +POST _ingest/pipeline/remove_empty_fields/_simulate +{ + "docs": [ + { + "_source": { + "key1": "first value", + "key2": "some other value", + "key3": "", + "sudoc": { + "a": "abc", + "b": "" + } + } + }, + { + "_source": { + "key1": "", + "key2": "some other value", + "list_of_docs": [ + { + "foo": "abc", + "bar": "" + }, + { + "baz": "", + "subdoc_in_list": {"child1": "xxx", "child2": ""} + } + ] + } + } + ] +} +``` + +```json +PUT _ingest/pipeline/remove_unwanted_keys +{ + "processors": [ + { + "script": { + "lang": "painless", + "source": """ + void iterateAllFields(def x) { + if (x instanceof List) { + for (def v: x) { + iterateAllFields(v); + } + } + if (!(x instanceof Map)) { + return; + } + x.entrySet().removeIf(e -> e.getKey() =~ /unwanted_key_.*/); +// You can also add more lines here. Like checking for emptiness, or null directly. + x.entrySet().removeIf(e -> e.getKey() == ""); + x.entrySet().removeIf(e -> e.getKey() == null); + for (def v: x.values()) { + iterateAllFields(v); + } + } + iterateAllFields(ctx); + """ + } + } + ] +} +``` + +## Type check of fields in ingest pipelines + +If it is required to check the type of a field, this can be done via the Painless method instanceof + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "winlog": { + "event_data": { + "param1": "hello" + } + } + } + } + ], + "pipeline": { + "processors": [ + { + "rename": { + "field": "winlog.event_data.param1", + "target_field": "SysEvent1", + "if": "ctx.winlog?.event_data?.param1 instanceof String" + } + } + ] + } +} +``` + +Yes the `instanceof` also works with the `?` operator. + +## Calculate time in other timezone + +When you cannot use the date and its timezone parameter, you can use `datetime` in Painless + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "@timestamp": "2021-08-13T09:06:00.000Z" + } + } + ], + "pipeline": { + "processors": [ + { + "script": { + "source": """ + ZonedDateTime zdt = ZonedDateTime.parse(ctx['@timestamp']); + ZonedDateTime zdt_local = zdt.withZoneSameInstant(ZoneId.of('Europe/Berlin')); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy - HH:mm:ss Z"); + ctx.localtime = zdt_local.format(formatter); + """ + } + } + ] + } +} +``` + +## Work with JSON as value of fields + +It is possible to work with json string as value of a field for example to set the `original` field value with the json of `_source`: We are leveraging a `mustache` function here. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [ + { + "_source": { + "foo": "bar", + "key": 123 + } + } + ], + "pipeline": { + "processors": [ + { + "set": { + "field": "original", + "value": "{{#toJson}}_source{{/toJson}}" + } + } + ] + } +} +``` + +## Script Processor + +### Setting the value of a field + +Sometimes it is needed to write to a field and this field does not exist yet. Whenever the object above it exists, this can be done immediately. + +`ctx.abc = “cool”` works without any issue as we are adding a root field called `abc`. + +Creating something like `ctx.abc.def = “cool”` does not work unless you create the `abc` object beforehand or it already exists. There are multiple ways to do it. What we always or usually want to create is a Map. We can do it in a couple of ways: + +``` +ctx.abc = new HashMap(); +ctx.abc = [:]; +``` + +Both options are valid and do the same thing. However there is a big caveat and that is, that if `abc` already exists, it will be overwritten and empty. Validating if `abc` already exists can be done by: + +``` +if(ctx.abc == null) { + ctx.abc = [:]; +} +``` + +With a simple `if ctx.abc == null` we know that `abc` does not exist and we can create it. Alternatively you can use the shorthand which is super helpful when you need to go 2,3,4 levels deep. You can use either version with the `HashMap()` or with the `[:]`. + +``` +ctx.putIfAbsent("abc", new HashMap()); +ctx.putIfAbsent("abc", [:]); +``` + +Now assuming you want to create this structure: + +```json +{ + "user": { + "geo": { + "city": "Amsterdam" +  } + } +} +``` + +The `putIfAbsent` will help a ton here: + +``` +ctx.putIfAbsent("user", [:]); +ctx.user.putIfAbsent("geo", [:]); +ctx.user.geo = "Amsterdam" +``` + +## Remove fields based on their values + +When `ignore_malformed`, null or other mapping parameters are not sufficient, you can use a script like this: + +```yaml +- script: + lang: painless + params: + values: + - "" + - "-" + - "N/A" + source: >- + ctx?.sophos?.xg.entrySet().removeIf(entry -> params.values.contains(entry.getValue())); +``` + +## GROK vs Dissects + +There can be a very long discussion on whether to choose GROK or dissects. When to choose what, depends on a lot of factors and existing knowledge. Dissects are easier to understand and follow, but are limited in their use. The log should look fairly the same all the times, as opposed to grok which can deal with a lot of different tasks, like optional fields in various positions. + +I can only go as far as telling what I like to do, which might not be the best on performance, but definitely the easiest to read and maintain. A log source often has many diverse messages and you might only need to extract certain information that is always on the same position, for example this message + +```text +2018-08-14T14:30:02.203151+02:00 linux-sqrz systemd[4179]: Stopped target Basic System. +``` + +With a dissect we can simply do `%{_tmp.date} %{host.hostname} %{process.name}[%{process.pid}]: %{message}` and call it a day. Now we have extracted the most important information, like the timestamp. If you extract first to `_tmp.date` or directly over `@timestamp` is a discussion for another chapter. + +With that extracted we are left with the `message` field to gather information, like user logins etc. diff --git a/manage-data/ingest/transform-enrich/ingest-lag.md b/manage-data/ingest/transform-enrich/ingest-lag.md new file mode 100644 index 000000000..4e0180ba9 --- /dev/null +++ b/manage-data/ingest/transform-enrich/ingest-lag.md @@ -0,0 +1,302 @@ +--- +mapped_pages: + - https://www.elastic.co/docs/manage-data/ingest/transform-enrich/calculate-ingest-lag.html +applies_to: + stack: ga + serverless: ga +--- + +# Ingest Lag + +Ingest lag is a recurring topic that deserves its own section. The goal is simple: calculate the time it takes from when a document is read to when it is received by Elasticsearch. Store this value in minutes, seconds, or milliseconds, and use it to create visualizations and alerts. + +The basic calculation is: + +`event.ingested - @timestamp` + +## Understanding `event.ingested` + +The `event.ingested` timestamp can be obtained in two ways: + +- `_ingest.timestamp` + Available via mustache notation `{{_ingest.timestamp}}` in all processors except `script`. + +- `metadata().now` + Available only in the `script` processor. Use this instead of `_ingest.timestamp` when working with scripts. + +Note that `event.ingested` is typically set in the **Fleet final pipeline**, which runs as the last step in the ingest process. Calculating the latency in **seconds** is usually sufficient for most use cases. + +## Calculating Ingestion Latency + +The following script is the core of the solution. It creates a new field, `event.ingestion.latency`, which you can use to monitor ingestion performance across your pipelines. + +```json +{ + "script": { + "description": "Calculates entire ingestion flow latency", + "if": "ctx['@timestamp'] != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']); + ctx.putIfAbsent("event", [:]); + ctx.event.putIfAbsent("ingestion", [:]); + ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now); + """ + } +} +``` + +## @timestamp + +One important detail to keep in mind: the value of `@timestamp` can vary depending on the data source. It might represent the time the Elastic Agent read the document, or it might be the actual timestamp extracted from the document itself after parsing. + +This distinction is crucial because it affects how ingest lag is calculated. For example, when Elastic Agent reads Windows Event Logs, it sets `@timestamp` based on the log's original timestamp. However, this behavior does not apply to all sources—such as syslog messages or Linux log files—where `@timestamp` is often set later in the pipeline, after parsing. + +This inconsistency can lead to inaccurate latency measurements if not accounted for. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [{ + "_source": { + "@timestamp": "2025-04-03T10:00:00.000Z", + "message": "2025-03-01T09:00:00.000Z user: philipp has logged in" + } + }], + "pipeline": { + "processors": [ + {"script": { + "if": "ctx['@timestamp'] != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']); + ctx.latency= ChronoUnit.SECONDS.between(start, metadata().now); + """ + }} + ] + } +} +``` + +In the example above, we can see that the timestamp, when the Elastic Agent read the document was `3rd April at 10:00`, while the actual log message on the disk is from `3rd March`. If we calculate the difference at the first step, before any parsing, we can be confident that the result will be accurate. However, if we perform the calculation as the final step in the pipeline (which is typically the case with Elastic Integrations that use `@custom` pipelines), the timestamp of `2025-03-01` will be used as `@timestamp`, leading to an erroneous latency calculation. + +While we can't always resolve every situation, the approach described above usually results in a "good enough" solution. For many use cases, simply using `@timestamp` is sufficient, as we expect the Elastic Agent to pick up logs as quickly as possible. During the initial onboarding of new data sources, there might be higher latency due to the ingestion of historical or older data. + +## Architectures + +Regardless of the chosen architecture, it's a good practice to add a `remove` processor at the end of the pipeline to drop the `_tmp` field. The raw timestamps from the various processing steps are not needed, as the latency in seconds should be sufficient. For additional pipeline architectures, refer to the [Ingest Architectures](../ingest-reference-architectures.md) documentation. + +### Elastic Agent => Elasticsearch + +We can use `@timestamp` and `event.ingested` and calculate the difference. This will give you the following document. The `event.ingestion.latency` is in seconds. + +```json +{ + "event": { + "ingestion": { + "latency": 443394 + } + } +} +``` + +#### Script + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [{ + "_source": { + "@timestamp": "2025-04-03T10:00:00.000Z", + "message": "user: philipp has logged in", + "_tmp": { + "logstash": "2025-04-03T10:00:02.456Z" + } + + } + }], + "pipeline": { + "processors": [ + { + "script": { + "description": "Calculates entire ingestion flow latency", + "if": "ctx['@timestamp'] != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']); + ctx.putIfAbsent("event", [:]); + ctx.event.putIfAbsent("ingestion", [:]); + ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now); + """ + } + } + ] + } +} +``` + +### Elastic Agent => Logstash => Elasticsearch + +In this scenario, we have an additional hop to manage. Elastic Agent populates the `@timestamp`, but Logstash does not add any timestamp by default. We recommend adding a temporary timestamp, for example by setting `_tmp.logstash_seen`. With this, you can calculate the following latency values: + +- Total latency: (`@timestamp - event.ingested`) +- Elastic Agent => Logstash: (`@timestamp - _tmp.logstash_seen`) +- Logstash => Elasticsearch: (`_tmp.logstash_seen - event.ingested`) + +These values can be especially helpful for debugging, as they allow you to quickly determine where the lag is introduced. Is the delay caused by the transfer from Elastic Agent to Logstash, or from Logstash to Elasticsearch? + +Below is a script that calculates these differences, providing latency values for each of the stages mentioned above. + +```json +{ + "event": { + "ingestion": { + "latency_logstash_to_elasticsearch": 443091, + "latency": 443093, + "latency_elastic_agent_to_logstash": 1 + } + } +} +``` + +#### Script + +If you want to remove the first calculation, you will need to ensure that the object `event.ingestion` is available. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [{ + "_source": { + "@timestamp": "2025-04-03T10:00:00.000Z", + "message": "user: philipp has logged in", + "_tmp": { + "logstash": "2025-04-03T10:00:02.456Z" + } + + } + }], + "pipeline": { + "processors": [ + { + "script": { + "description": "Calculates entire ingestion flow latency", + "if": "ctx['@timestamp'] != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']); + ctx.putIfAbsent("event", [:]); + ctx.event.putIfAbsent("ingestion", [:]); + ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now); + """ + } + }, + { + "script": { + "description": "Calculates logstash to Elasticsearch latency", + "if": "ctx._tmp?.logstash != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx._tmp.logstash_seen); + ctx.event.ingestion.latency_logstash_to_elasticsearch=ChronoUnit.SECONDS.between(start, metadata().now); + """ + } + }, + { + "script": { + "description": "Calculates Elastic Agent to Logstash latency", + "if": "ctx._tmp?.logstash != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']); + ZonedDateTime end = ZonedDateTime.parse(ctx._tmp.logstash_seen); + ctx.event.ingestion.latency_elastic_agent_to_logstash=ChronoUnit.SECONDS.between(start, end); + """ + } + } + ] + } +} +``` + +### Elastic Agent => Logstash => Kafka => Logstash => Elasticsearch + +As with the previous scenario, adding an additional hop introduces another point where latency can occur. The recommendation here is to add another temporary timestamp field. For more details, refer to the [Elastic Agent => Logstash => Elasticsearch](#elastic-agent--logstash--elasticsearch) section above. + +Below is a script that calculates the latency for each step in the pipeline. The following values will be generated: + +```json +{ + "event": { + "ingestion": { + "latency_logstash_to_elasticsearch": 443091, + "latency_logstash_to_logstash": 1, + "latency": 443093, + "latency_elastic_agent_to_logstash": 1 + } + } +} +``` + +#### Script + +If you want to remove the first calculation, you will need to ensure that the object `event.ingestion` is available. Of course you could merge all of the steps into one larger script. I personally like to separate it, so you can edit, modify and enhance exactly what you need. + +```json +POST _ingest/pipeline/_simulate +{ + "docs": [{ + "_source": { + "@timestamp": "2025-04-03T10:00:00.000Z", + "message": "user: philipp has logged in", + "_tmp": { + "logstash_pre_kafka": "2025-04-03T10:00:01.233Z", + "logstash_post_kafka": "2025-04-03T10:00:02.456Z" + } + + } + }], + "pipeline": { + "processors": [ + { + "script": { + "description": "Calculates entire ingestion flow latency", + "if": "ctx['@timestamp'] != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']); + ctx.putIfAbsent("event", [:]); + ctx.event.putIfAbsent("ingestion", [:]); + ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now); + """ + } + }, + { + "script": { + "description": "Calculates logstash to logstash latency", + "if": "ctx._tmp?.logstash_pre_kafka != null && ctx._tmp?.logstash_post_kafka != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx._tmp.logstash_pre_kafka); + ZonedDateTime end = ZonedDateTime.parse(ctx._tmp.logstash_post_kafka); + ctx.event.ingestion.latency_logstash_to_logstash=ChronoUnit.SECONDS.between(start, end); + """ + } + }, + { + "script": { + "description": "Calculates logstash post Kafka to Elasticsearch latency", + "if": "ctx._tmp?.logstash_post_kafka != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx._tmp.logstash_post_kafka); + ctx.event.ingestion.latency_logstash_to_elasticsearch=ChronoUnit.SECONDS.between(start, metadata().now); + """ + } + }, + { + "script": { + "description": "Calculates Elastic Agent to pre kafka Logstash latency", + "if": "ctx._tmp?.logstash_pre_kafka != null", + "source": """ + ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']); + ZonedDateTime end = ZonedDateTime.parse(ctx._tmp.logstash_pre_kafka); + ctx.event.ingestion.latency_elastic_agent_to_logstash=ChronoUnit.SECONDS.between(start, end); + """ + } + } + ] + } +} +``` diff --git a/manage-data/toc.yml b/manage-data/toc.yml index 9275294ab..6aa4cecc4 100644 --- a/manage-data/toc.yml +++ b/manage-data/toc.yml @@ -98,6 +98,10 @@ toc: - file: ingest/transform-enrich/ingest-pipelines.md children: - file: ingest/transform-enrich/example-parse-logs.md + - file: ingest/transform-enrich/common-mistakes.md + - file: ingest/transform-enrich/error-handling.md + - file: ingest/transform-enrich/general-tips.md + - file: ingest/transform-enrich/ingest-lag.md - file: ingest/transform-enrich/logstash-pipelines.md - file: ingest/transform-enrich/data-enrichment.md children: