Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,55 @@ then when we run with the same input, the processor will parse the message into
{"message": "value", "newMessage": "new value"}
```

### Iterating over arrays with `iterate_on`

The `iterate_on` option allows adding entries to each element of an array. Combined with `disable_root_keys` and `evaluate_when_on_element`, you can reference root-level fields and apply per-element conditions.

```yaml
pipeline:
source:
http:
processor:
- add_entries:
entries:
- key: "title"
value_expression: "/alert_title"
iterate_on: "vulns"
disable_root_keys: false
add_to_element_when: '/severity == "critical"'
evaluate_when_on_element: true
sink:
- stdout:
```

Given the following input:
```json
{
"alert_title": "SQL Injection Detected",
"vulns": [
{"cve": "CVE-2024-001", "severity": "critical"},
{"cve": "CVE-2024-002", "severity": "low"},
{"cve": "CVE-2024-003", "severity": "critical"}
]
}
```

The processor will produce:
```json
{
"alert_title": "SQL Injection Detected",
"vulns": [
{"cve": "CVE-2024-001", "severity": "critical", "title": "SQL Injection Detected"},
{"cve": "CVE-2024-002", "severity": "low"},
{"cve": "CVE-2024-003", "severity": "critical", "title": "SQL Injection Detected"}
]
}
```

In this example:
- `disable_root_keys: false` allows `value_expression: "/alert_title"` to resolve from the root event
- `evaluate_when_on_element: true` evaluates `add_to_element_when` against each element, so only elements with `severity == "critical"` receive the new key

### Configuration
* `entries` - (required) - A list of entries to add to an event
* `key` - (required) - The key of the new entry to be added. One of `key` or `metadata_key` is required.
Expand All @@ -62,6 +111,8 @@ then when we run with the same input, the processor will parse the message into
* `format` - (optional) - A format string to use as value of the new entry to be added. For example, `${key1}-${ke2}` where `key1` and `key2` are existing keys in the event. Required if `value` is not specified.
* `value_expression` - (optional) - An expression string to use as value of the new entry to be added. For example, `/key` where `key` is an existing key in the event of type Number/String/Boolean. Expressions can also contain functions returning Number/String/Integer. For example `length(/key)` would return the length of the `key` in the event and key must of String type. Please see [expressions syntax document](https://github.com/opensearch-project/data-prepper/blob/2.3/docs/expression_syntax.md) for complete list of supported functions. Required if `value` and `format are not specified.
* `overwrite_if_key_exists` - (optional) - When set to `true`, if `key` already exists in the event, then the existing value will be overwritten. The default is `false`.
* `disable_root_keys` - (optional) - When set to `false` and used with `iterate_on`, resolves `value_expression` and `format` against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. Has no effect without `iterate_on`. Default is `true`.
* `evaluate_when_on_element` - (optional) - When set to `true` and used with `iterate_on` and `add_to_element_when`, evaluates the `add_to_element_when` condition against each individual array element instead of the root event. This enables per-element conditional logic during array iteration. Default is `false`.

___

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test:test-event')
testImplementation project(':data-prepper-test:plugin-test-framework')
testImplementation testLibs.slf4j.simple
testImplementation testLibs.spring.test
testImplementation project(':data-prepper-test:test-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ private static class EntryProperties {
final boolean appendIfExists;
final String addWhen;
final String addToElementWhen;
final boolean disableRootKeys;
final boolean evaluateWhenOnElement;
EntryProperties(AddEntryProcessorConfig.Entry entry, ExpressionEvaluator evaluator) {
this.overwriteIfExists = entry.getOverwriteIfKeyExists();
this.appendIfExists = entry.getAppendIfKeyExists();
this.addWhen = entry.getAddWhen();
this.addToElementWhen = entry.getAddToElementWhen();
this.disableRootKeys = entry.getDisableRootKeys();
this.evaluateWhenOnElement = entry.getEvaluateWhenOnElement();
String valueExpr = entry.getValueExpression();
if (valueExpr != null && !evaluator.isValidExpressionStatement(valueExpr)) {
throw new InvalidPluginConfigurationException(
Expand Down Expand Up @@ -261,33 +265,47 @@ private void handleWithIterateOn(final AddEntryProcessorConfig.Entry entry,
// Create builder once
final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()
.withEventMetadata(recordEvent.getMetadata());

// Pre-check addToElementWhen condition if static
final boolean shouldProcessAll = props.addToElementWhen == null ||
expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent);
if (shouldProcessAll) {
// Bulk process all items

if (props.addToElementWhen == null) {
// No condition — process all items
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
}
} else {
// Process items individually with condition check
} else if (props.evaluateWhenOnElement) {
// Evaluate condition against each element for selective addition
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
final Event elementContext = contextBuilder.withData(item).build();
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, elementContext)) {
iterateOnList.set(i, processIterateOnItem(entry, elementContext, item, flattenKey, key, props, recordEvent));
}
}
} else {
// Evaluate condition once against root event
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
}
}
}
recordEvent.put(iterateOn, iterateOnList);
}
}

private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props) {
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
final Event recordEvent) {
final Event context = contextBuilder.withData(item).build();
final Object value = retrieveValue(entry, context);
return processIterateOnItem(entry, context, item, flattenKey, key, props, recordEvent);
}

private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, Event context,
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
final Event recordEvent) {
final Event valueContext = !props.disableRootKeys ? recordEvent : context;
final Object value = retrieveValue(entry, valueContext);
final String keyStr = key.getKey(); // Key and keyStr are guaranteed non-null by caller
if (!item.containsKey(keyStr) || props.overwriteIfExists) {
if (flattenKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,35 @@ public static class Entry {

@JsonProperty("flatten_key")
@JsonPropertyDescription(
"When true and used with iterate_on, treats the key as a plain string. When false and used with iterate_on, treats the key as a json pointer. " +
"Has no effect when not used with iterate_on.")
"When true and used with iterate_on, treats the key as a plain string. When false and used with iterate_on, treats the key as a json pointer. " +
"Has no effect when not used with iterate_on.")
@AlsoRequired(values = {
@AlsoRequired.Required(name="iterate_on")
})
private boolean flattenKey = true;

@JsonProperty("disable_root_keys")
@JsonPropertyDescription(
"When set to <code>false</code> and used with <code>iterate_on</code>, resolves <code>value_expression</code> and <code>format</code> " +
"against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. " +
"Has no effect when not used with <code>iterate_on</code>. The default value is <code>true</code>.")
@AlsoRequired(values = {
@AlsoRequired.Required(name="iterate_on")
})
private boolean disableRootKeys = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Confusing double-negative naming for disable_root_keys

  • Problem: To enable root-level field access, users must set disable_root_keys: false. The double-negative (!disableRootKeys in the processor code) is hard to reason about. The issue discussion originally proposed use_root_context: true which reads more
    naturally.
  • Impact: User confusion in configuration. However, @dlvenable explicitly requested this name in this comment ([FEATURE] Support parent event field access during iterate_on processing #6609 (comment)), so this is intentional.
  • Fix: No change needed — this follows the maintainer's direction. Noting for awareness only.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted!!

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using a positive field name as well, like resolve_from_root, use_root_context, etc. so that: 1) the default value would be false; 2) it's easier for users to comprehend and use


@JsonProperty("evaluate_when_on_element")
@JsonPropertyDescription(
"When set to <code>true</code> and used with <code>iterate_on</code> and <code>add_to_element_when</code>, " +
"evaluates the <code>add_to_element_when</code> condition against each individual array element instead of the root event. " +
"This enables per-element conditional logic during array iteration. " +
"The default value is <code>false</code>.")
@AlsoRequired(values = {
@AlsoRequired.Required(name="iterate_on"),
@AlsoRequired.Required(name="add_to_element_when")
})
private boolean evaluateWhenOnElement = false;

@JsonProperty("add_when")
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
"such as <code>/some-key == \"test\"</code>, that will be evaluated to determine whether the processor will be run on the event.")
Expand Down Expand Up @@ -209,7 +231,15 @@ public boolean getAppendIfKeyExists() {
}

public boolean getFlattenKey(){
return flattenKey;
return flattenKey;
}

public boolean getDisableRootKeys() {
return disableRootKeys;
}

public boolean getEvaluateWhenOnElement() {
return evaluateWhenOnElement;
}

public String getAddWhen() { return addWhen; }
Expand All @@ -229,6 +259,16 @@ boolean flattenKeyFalseIsUsedWithIterateOn() {
return (!flattenKey && iterateOn!=null) || flattenKey;
}

@AssertTrue(message = "disable_root_keys=false only applies when iterate_on is configured.")
boolean disableRootKeysFalseIsUsedWithIterateOn() {
return disableRootKeys || iterateOn != null;
}

@AssertTrue(message = "evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.")
boolean evaluateWhenOnElementIsUsedWithIterateOnAndAddToElementWhen() {
return !evaluateWhenOnElement || (iterateOn != null && addToElementWhen != null);
}

public Entry(final String key,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (non-blocking): Massive constructor duplication

  • Problem: The new 13-parameter constructor duplicates all the validation logic from the existing 11-parameter constructor. There are now four constructors in Entry, three of which contain nearly identical validation blocks (null checks, mutualexclusion checks, etc.).
  • Impact: Any future validation change must be replicated across all constructors. This is a maintenance burden.
  • Fix: Have the shorter constructors delegate to the longest one with default values for the new parameters:
public Entry(String key, String metadataKey, Object value, String format,
                 String valueExpression, boolean overwriteIfKeyExists, boolean appendIfKeyExists,
                 String addWhen, String iterateOn, boolean flattenKey, String addToElementWhen) {
        this(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists,
             appendIfKeyExists, addWhen, iterateOn, flattenKey, true, false, addToElementWhen);
    }

This eliminates all duplicated validation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the duplication in the constructor as suggested.

final String metadataKey,
final Object value,
Expand All @@ -239,6 +279,8 @@ public Entry(final String key,
final String addWhen,
final String iterateOn,
final boolean flattenKey,
final boolean disableRootKeys,
final boolean evaluateWhenOnElement,
final String addToElementWhen)
{
if (key != null && metadataKey != null) {
Expand All @@ -253,6 +295,12 @@ public Entry(final String key,
if (iterateOn == null && addToElementWhen != null) {
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
}
if (!disableRootKeys && iterateOn == null) {
throw new InvalidPluginConfigurationException("disable_root_keys=false only applies when iterate_on is configured.");
}
if (evaluateWhenOnElement && (iterateOn == null || addToElementWhen == null)) {
throw new InvalidPluginConfigurationException("evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.");
}

this.key = key;
this.metadataKey = metadataKey;
Expand All @@ -264,6 +312,8 @@ public Entry(final String key,
this.addWhen = addWhen;
this.iterateOn = iterateOn;
this.flattenKey = flattenKey;
this.disableRootKeys = disableRootKeys;
this.evaluateWhenOnElement = evaluateWhenOnElement;
this.addToElementWhen = addToElementWhen;
}

Expand All @@ -276,33 +326,26 @@ public Entry(final String key,
final boolean appendIfKeyExists,
final String addWhen,
final String iterateOn,
final boolean flattenKey,
final String addToElementWhen)
{
if (key != null && metadataKey != null) {
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
}
if (key == null && metadataKey == null) {
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
}
if (metadataKey != null && iterateOn != null) {
throw new IllegalArgumentException("iterate_on cannot be applied to metadata");
}

if (iterateOn == null && addToElementWhen != null) {
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
}
this(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, appendIfKeyExists, addWhen,
iterateOn, flattenKey, true, false, addToElementWhen);
}

this.key = key;
this.metadataKey = metadataKey;
this.value = value;
this.format = format;
this.valueExpression = valueExpression;
this.overwriteIfKeyExists = overwriteIfKeyExists;
this.appendIfKeyExists = appendIfKeyExists;
this.addWhen = addWhen;
this.iterateOn = iterateOn;
this.flattenKey = true;
this.addToElementWhen = addToElementWhen;
public Entry(final String key,
final String metadataKey,
final Object value,
final String format,
final String valueExpression,
final boolean overwriteIfKeyExists,
final boolean appendIfKeyExists,
final String addWhen,
final String iterateOn,
final String addToElementWhen)
{
this(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, appendIfKeyExists, addWhen,
iterateOn, true, true, false, addToElementWhen);
}

public Entry() {
Expand Down
Loading
Loading