Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,55 @@ then when we run with the same input, the processor will parse the message into
{"message": "value", "newMessage": "new value"}
```

### Iterating over arrays with `iterate_on`

The `iterate_on` option allows adding entries to each element of an array. Combined with `disable_root_keys` and `evaluate_when_on_element`, you can reference root-level fields and apply per-element conditions.

```yaml
pipeline:
source:
http:
processor:
- add_entries:
entries:
- key: "title"
value_expression: "/alert_title"
iterate_on: "vulns"
disable_root_keys: false
add_to_element_when: '/severity == "critical"'
evaluate_when_on_element: true
sink:
- stdout:
```

Given the following input:
```json
{
"alert_title": "SQL Injection Detected",
"vulns": [
{"cve": "CVE-2024-001", "severity": "critical"},
{"cve": "CVE-2024-002", "severity": "low"},
{"cve": "CVE-2024-003", "severity": "critical"}
]
}
```

The processor will produce:
```json
{
"alert_title": "SQL Injection Detected",
"vulns": [
{"cve": "CVE-2024-001", "severity": "critical", "title": "SQL Injection Detected"},
{"cve": "CVE-2024-002", "severity": "low"},
{"cve": "CVE-2024-003", "severity": "critical", "title": "SQL Injection Detected"}
]
}
```

In this example:
- `disable_root_keys: false` allows `value_expression: "/alert_title"` to resolve from the root event
- `evaluate_when_on_element: true` evaluates `add_to_element_when` against each element, so only elements with `severity == "critical"` receive the new key

### Configuration
* `entries` - (required) - A list of entries to add to an event
* `key` - (required) - The key of the new entry to be added. One of `key` or `metadata_key` is required.
Expand All @@ -62,6 +111,8 @@ then when we run with the same input, the processor will parse the message into
* `format` - (optional) - A format string to use as value of the new entry to be added. For example, `${key1}-${ke2}` where `key1` and `key2` are existing keys in the event. Required if `value` is not specified.
* `value_expression` - (optional) - An expression string to use as value of the new entry to be added. For example, `/key` where `key` is an existing key in the event of type Number/String/Boolean. Expressions can also contain functions returning Number/String/Integer. For example `length(/key)` would return the length of the `key` in the event and key must of String type. Please see [expressions syntax document](https://github.com/opensearch-project/data-prepper/blob/2.3/docs/expression_syntax.md) for complete list of supported functions. Required if `value` and `format are not specified.
* `overwrite_if_key_exists` - (optional) - When set to `true`, if `key` already exists in the event, then the existing value will be overwritten. The default is `false`.
* `disable_root_keys` - (optional) - When set to `false` and used with `iterate_on`, resolves `value_expression` and `format` against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. Has no effect without `iterate_on`. Default is `true`.
* `evaluate_when_on_element` - (optional) - When set to `true` and used with `iterate_on` and `add_to_element_when`, evaluates the `add_to_element_when` condition against each individual array element instead of the root event. This enables per-element conditional logic during array iteration. Default is `false`.

___

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test:test-event')
testImplementation project(':data-prepper-test:plugin-test-framework')
testImplementation testLibs.slf4j.simple
testImplementation testLibs.spring.test
testImplementation project(':data-prepper-test:test-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ private static class EntryProperties {
final boolean appendIfExists;
final String addWhen;
final String addToElementWhen;
final boolean disableRootKeys;
final boolean evaluateWhenOnElement;
EntryProperties(AddEntryProcessorConfig.Entry entry, ExpressionEvaluator evaluator) {
this.overwriteIfExists = entry.getOverwriteIfKeyExists();
this.appendIfExists = entry.getAppendIfKeyExists();
this.addWhen = entry.getAddWhen();
this.addToElementWhen = entry.getAddToElementWhen();
this.disableRootKeys = entry.getDisableRootKeys();
this.evaluateWhenOnElement = entry.getEvaluateWhenOnElement();
String valueExpr = entry.getValueExpression();
if (valueExpr != null && !evaluator.isValidExpressionStatement(valueExpr)) {
throw new InvalidPluginConfigurationException(
Expand Down Expand Up @@ -261,33 +265,47 @@ private void handleWithIterateOn(final AddEntryProcessorConfig.Entry entry,
// Create builder once
final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()
.withEventMetadata(recordEvent.getMetadata());

// Pre-check addToElementWhen condition if static
final boolean shouldProcessAll = props.addToElementWhen == null ||
expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent);
if (shouldProcessAll) {
// Bulk process all items

if (props.addToElementWhen == null) {
// No condition — process all items
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
}
} else {
// Process items individually with condition check
} else if (props.evaluateWhenOnElement) {
// Evaluate condition against each element for selective addition
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
final Event elementContext = contextBuilder.withData(item).build();
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, elementContext)) {
iterateOnList.set(i, processIterateOnItem(entry, elementContext, item, flattenKey, key, props, recordEvent));
}
}
} else {
// Evaluate condition once against root event
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
}
}
}
recordEvent.put(iterateOn, iterateOnList);
}
}

private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props) {
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
final Event recordEvent) {
final Event context = contextBuilder.withData(item).build();
final Object value = retrieveValue(entry, context);
return processIterateOnItem(entry, context, item, flattenKey, key, props, recordEvent);
}

private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, Event context,
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
final Event recordEvent) {
final Event valueContext = !props.disableRootKeys ? recordEvent : context;
final Object value = retrieveValue(entry, valueContext);
final String keyStr = key.getKey(); // Key and keyStr are guaranteed non-null by caller
if (!item.containsKey(keyStr) || props.overwriteIfExists) {
if (flattenKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,28 @@ public static class Entry {
})
private boolean flattenKey = true;

@JsonProperty("disable_root_keys")
@JsonPropertyDescription(
"When set to <code>false</code> and used with <code>iterate_on</code>, resolves <code>value_expression</code> and <code>format</code> " +
"against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. " +
"Has no effect when not used with <code>iterate_on</code>. The default value is <code>true</code>.")
@AlsoRequired(values = {
@AlsoRequired.Required(name="iterate_on")
})
private boolean disableRootKeys = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Confusing double-negative naming for disable_root_keys

  • Problem: To enable root-level field access, users must set disable_root_keys: false. The double-negative (!disableRootKeys in the processor code) is hard to reason about. The issue discussion originally proposed use_root_context: true which reads more
    naturally.
  • Impact: User confusion in configuration. However, @dlvenable explicitly requested this name in this comment ([FEATURE] Support parent event field access during iterate_on processing #6609 (comment)), so this is intentional.
  • Fix: No change needed — this follows the maintainer's direction. Noting for awareness only.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted!!

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using a positive field name as well, like resolve_from_root, use_root_context, etc. so that: 1) the default value would be false; 2) it's easier for users to comprehend and use


@JsonProperty("evaluate_when_on_element")
@JsonPropertyDescription(
"When set to <code>true</code> and used with <code>iterate_on</code> and <code>add_to_element_when</code>, " +
"evaluates the <code>add_to_element_when</code> condition against each individual array element instead of the root event. " +
"This enables per-element conditional logic during array iteration. " +
"The default value is <code>false</code>.")
@AlsoRequired(values = {
@AlsoRequired.Required(name="iterate_on"),
@AlsoRequired.Required(name="add_to_element_when")
})
private boolean evaluateWhenOnElement = false;

@JsonProperty("add_when")
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
"such as <code>/some-key == \"test\"</code>, that will be evaluated to determine whether the processor will be run on the event.")
Expand Down Expand Up @@ -212,6 +234,14 @@ public boolean getFlattenKey(){
return flattenKey;
}

public boolean getDisableRootKeys() {
return disableRootKeys;
}

public boolean getEvaluateWhenOnElement() {
return evaluateWhenOnElement;
}

public String getAddWhen() { return addWhen; }

@AssertTrue(message = "Exactly one of value, format, or value_expression must be specified")
Expand All @@ -229,6 +259,64 @@ boolean flattenKeyFalseIsUsedWithIterateOn() {
return (!flattenKey && iterateOn!=null) || flattenKey;
}

@AssertTrue(message = "disable_root_keys=false only applies when iterate_on is configured.")
boolean disableRootKeysFalseIsUsedWithIterateOn() {
return disableRootKeys || iterateOn != null;
}

@AssertTrue(message = "evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.")
boolean evaluateWhenOnElementIsUsedWithIterateOnAndAddToElementWhen() {
return !evaluateWhenOnElement || (iterateOn != null && addToElementWhen != null);
}

public Entry(final String key,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (non-blocking): Massive constructor duplication

  • Problem: The new 13-parameter constructor duplicates all the validation logic from the existing 11-parameter constructor. There are now four constructors in Entry, three of which contain nearly identical validation blocks (null checks, mutualexclusion checks, etc.).
  • Impact: Any future validation change must be replicated across all constructors. This is a maintenance burden.
  • Fix: Have the shorter constructors delegate to the longest one with default values for the new parameters:
public Entry(String key, String metadataKey, Object value, String format,
                 String valueExpression, boolean overwriteIfKeyExists, boolean appendIfKeyExists,
                 String addWhen, String iterateOn, boolean flattenKey, String addToElementWhen) {
        this(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists,
             appendIfKeyExists, addWhen, iterateOn, flattenKey, true, false, addToElementWhen);
    }

This eliminates all duplicated validation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the duplication in the constructor as suggested.

final String metadataKey,
final Object value,
final String format,
final String valueExpression,
final boolean overwriteIfKeyExists,
final boolean appendIfKeyExists,
final String addWhen,
final String iterateOn,
final boolean flattenKey,
final boolean disableRootKeys,
final boolean evaluateWhenOnElement,
final String addToElementWhen)
{
if (key != null && metadataKey != null) {
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
}
if (key == null && metadataKey == null) {
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
}
if (metadataKey != null && iterateOn != null) {
throw new IllegalArgumentException("iterate_on cannot be applied to metadata");
}
if (iterateOn == null && addToElementWhen != null) {
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
}
if (!disableRootKeys && iterateOn == null) {
throw new InvalidPluginConfigurationException("disable_root_keys=false only applies when iterate_on is configured.");
}
if (evaluateWhenOnElement && (iterateOn == null || addToElementWhen == null)) {
throw new InvalidPluginConfigurationException("evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.");
}

this.key = key;
this.metadataKey = metadataKey;
this.value = value;
this.format = format;
this.valueExpression = valueExpression;
this.overwriteIfKeyExists = overwriteIfKeyExists;
this.appendIfKeyExists = appendIfKeyExists;
this.addWhen = addWhen;
this.iterateOn = iterateOn;
this.flattenKey = flattenKey;
this.disableRootKeys = disableRootKeys;
this.evaluateWhenOnElement = evaluateWhenOnElement;
this.addToElementWhen = addToElementWhen;
}

public Entry(final String key,
final String metadataKey,
final Object value,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest;
import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile;
import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite;

import java.util.*;
Copy link
Copy Markdown
Collaborator

@oeyh oeyh Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Star import may result in checkstyle failures. I triggered the checks, please check that gradle builds are OK.


import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

@DataPrepperPluginTest(pluginName = "add_entries", pluginType = Processor.class)
class AddEntryProcessorIT extends BaseDataPrepperPluginStandardTestSuite {

@Test
void disable_root_keys_resolves_value_expression_from_root(
@PluginConfigurationFile("iterate_on_with_disable_root_keys.yaml") final Processor<Record<Event>, Record<Event>> objectUnderTest) {
final Map<String, Object> data = new HashMap<>();
data.put("alert_title", "SQL Injection Detected");
data.put("vulns", Arrays.asList(
new HashMap<>(Map.of("cve", "CVE-2024-001")),
new HashMap<>(Map.of("cve", "CVE-2024-002"))));

final Record<Event> record = buildRecordWithEvent(data);
final List<Record<Event>> result = (List<Record<Event>>) objectUnderTest.execute(Collections.singletonList(record));

List<Map<String, Object>> vulns = result.get(0).getData().get("vulns", List.class);
assertThat(vulns.size(), equalTo(2));
assertThat(vulns.get(0).get("title"), equalTo("SQL Injection Detected"));
assertThat(vulns.get(1).get("title"), equalTo("SQL Injection Detected"));
}

@Test
void evaluate_when_on_element_filters_per_element(
@PluginConfigurationFile("iterate_on_with_evaluate_when_on_element.yaml") final Processor<Record<Event>, Record<Event>> objectUnderTest) {
final Map<String, Object> data = new HashMap<>();
data.put("vulns", Arrays.asList(
new HashMap<>(Map.of("cve", "CVE-2024-001", "severity", "critical")),
new HashMap<>(Map.of("cve", "CVE-2024-002", "severity", "low")),
new HashMap<>(Map.of("cve", "CVE-2024-003", "severity", "critical"))));

final Record<Event> record = buildRecordWithEvent(data);
final List<Record<Event>> result = (List<Record<Event>>) objectUnderTest.execute(Collections.singletonList(record));

List<Map<String, Object>> vulns = result.get(0).getData().get("vulns", List.class);
assertThat(vulns.size(), equalTo(3));
assertThat(vulns.get(0).get("flagged"), equalTo(true));
assertThat(vulns.get(1).containsKey("flagged"), is(false));
assertThat(vulns.get(2).get("flagged"), equalTo(true));
}

@Test
void both_flags_combined_resolves_root_value_with_per_element_condition(
@PluginConfigurationFile("iterate_on_with_both_flags.yaml") final Processor<Record<Event>, Record<Event>> objectUnderTest) {
final Map<String, Object> data = new HashMap<>();
data.put("alert_title", "SQL Injection Detected");
data.put("vulns", Arrays.asList(
new HashMap<>(Map.of("cve", "CVE-2024-001", "severity", "critical")),
new HashMap<>(Map.of("cve", "CVE-2024-002", "severity", "low")),
new HashMap<>(Map.of("cve", "CVE-2024-003", "severity", "critical"))));

final Record<Event> record = buildRecordWithEvent(data);
final List<Record<Event>> result = (List<Record<Event>>) objectUnderTest.execute(Collections.singletonList(record));

List<Map<String, Object>> vulns = result.get(0).getData().get("vulns", List.class);
assertThat(vulns.size(), equalTo(3));
assertThat(vulns.get(0).get("title"), equalTo("SQL Injection Detected"));
assertThat(vulns.get(1).containsKey("title"), is(false));
assertThat(vulns.get(2).get("title"), equalTo("SQL Injection Detected"));
}

private static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
}
}
Loading
Loading