Skip to content

Commit 0ba2de5

Browse files
committed
Add disable_root_keys and evaluate_when_on_element configs to add_entries processor
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
1 parent 5aee2c5 commit 0ba2de5

File tree

9 files changed

+539
-14
lines changed

9 files changed

+539
-14
lines changed

data-prepper-plugins/mutate-event-processors/README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,55 @@ then when we run with the same input, the processor will parse the message into
5454
{"message": "value", "newMessage": "new value"}
5555
```
5656

57+
### Iterating over arrays with `iterate_on`
58+
59+
The `iterate_on` option allows adding entries to each element of an array. Combined with `disable_root_keys` and `evaluate_when_on_element`, you can reference root-level fields and apply per-element conditions.
60+
61+
```yaml
62+
pipeline:
63+
source:
64+
http:
65+
processor:
66+
- add_entries:
67+
entries:
68+
- key: "title"
69+
value_expression: "/alert_title"
70+
iterate_on: "vulns"
71+
disable_root_keys: false
72+
add_to_element_when: '/severity == "critical"'
73+
evaluate_when_on_element: true
74+
sink:
75+
- stdout:
76+
```
77+
78+
Given the following input:
79+
```json
80+
{
81+
"alert_title": "SQL Injection Detected",
82+
"vulns": [
83+
{"cve": "CVE-2024-001", "severity": "critical"},
84+
{"cve": "CVE-2024-002", "severity": "low"},
85+
{"cve": "CVE-2024-003", "severity": "critical"}
86+
]
87+
}
88+
```
89+
90+
The processor will produce:
91+
```json
92+
{
93+
"alert_title": "SQL Injection Detected",
94+
"vulns": [
95+
{"cve": "CVE-2024-001", "severity": "critical", "title": "SQL Injection Detected"},
96+
{"cve": "CVE-2024-002", "severity": "low"},
97+
{"cve": "CVE-2024-003", "severity": "critical", "title": "SQL Injection Detected"}
98+
]
99+
}
100+
```
101+
102+
In this example:
103+
- `disable_root_keys: false` allows `value_expression: "/alert_title"` to resolve from the root event
104+
- `evaluate_when_on_element: true` evaluates `add_to_element_when` against each element, so only elements with `severity == "critical"` receive the new key
105+
57106
### Configuration
58107
* `entries` - (required) - A list of entries to add to an event
59108
* `key` - (required) - The key of the new entry to be added. One of `key` or `metadata_key` is required.
@@ -62,6 +111,8 @@ then when we run with the same input, the processor will parse the message into
62111
* `format` - (optional) - A format string to use as value of the new entry to be added. For example, `${key1}-${ke2}` where `key1` and `key2` are existing keys in the event. Required if `value` is not specified.
63112
* `value_expression` - (optional) - An expression string to use as value of the new entry to be added. For example, `/key` where `key` is an existing key in the event of type Number/String/Boolean. Expressions can also contain functions returning Number/String/Integer. For example `length(/key)` would return the length of the `key` in the event and key must of String type. Please see [expressions syntax document](https://github.com/opensearch-project/data-prepper/blob/2.3/docs/expression_syntax.md) for complete list of supported functions. Required if `value` and `format are not specified.
64113
* `overwrite_if_key_exists` - (optional) - When set to `true`, if `key` already exists in the event, then the existing value will be overwritten. The default is `false`.
114+
* `disable_root_keys` - (optional) - When set to `false` and used with `iterate_on`, resolves `value_expression` and `format` against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. Has no effect without `iterate_on`. Default is `true`.
115+
* `evaluate_when_on_element` - (optional) - When set to `true` and used with `iterate_on` and `add_to_element_when`, evaluates the `add_to_element_when` condition against each individual array element instead of the root event. This enables per-element conditional logic during array iteration. Default is `false`.
65116

66117
___
67118

data-prepper-plugins/mutate-event-processors/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ dependencies {
2323
implementation project(':data-prepper-plugins:common')
2424
implementation 'com.fasterxml.jackson.core:jackson-databind'
2525
testImplementation project(':data-prepper-test:test-event')
26+
testImplementation project(':data-prepper-test:plugin-test-framework')
2627
testImplementation testLibs.slf4j.simple
2728
testImplementation testLibs.spring.test
2829
testImplementation project(':data-prepper-test:test-common')

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,15 @@ private static class EntryProperties {
5252
final boolean appendIfExists;
5353
final String addWhen;
5454
final String addToElementWhen;
55+
final boolean disableRootKeys;
56+
final boolean evaluateWhenOnElement;
5557
EntryProperties(AddEntryProcessorConfig.Entry entry, ExpressionEvaluator evaluator) {
5658
this.overwriteIfExists = entry.getOverwriteIfKeyExists();
5759
this.appendIfExists = entry.getAppendIfKeyExists();
5860
this.addWhen = entry.getAddWhen();
5961
this.addToElementWhen = entry.getAddToElementWhen();
62+
this.disableRootKeys = entry.getDisableRootKeys();
63+
this.evaluateWhenOnElement = entry.getEvaluateWhenOnElement();
6064
String valueExpr = entry.getValueExpression();
6165
if (valueExpr != null && !evaluator.isValidExpressionStatement(valueExpr)) {
6266
throw new InvalidPluginConfigurationException(
@@ -261,33 +265,47 @@ private void handleWithIterateOn(final AddEntryProcessorConfig.Entry entry,
261265
// Create builder once
262266
final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()
263267
.withEventMetadata(recordEvent.getMetadata());
264-
265-
// Pre-check addToElementWhen condition if static
266-
final boolean shouldProcessAll = props.addToElementWhen == null ||
267-
expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent);
268-
if (shouldProcessAll) {
269-
// Bulk process all items
268+
269+
if (props.addToElementWhen == null) {
270+
// No condition — process all items
270271
for (int i = 0; i < iterateOnList.size(); i++) {
271272
final Map<String, Object> item = iterateOnList.get(i);
272-
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
273+
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
273274
}
274-
} else {
275-
// Process items individually with condition check
275+
} else if (props.evaluateWhenOnElement) {
276+
// Evaluate condition against each element for selective addition
276277
for (int i = 0; i < iterateOnList.size(); i++) {
277278
final Map<String, Object> item = iterateOnList.get(i);
278-
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
279-
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
279+
final Event elementContext = contextBuilder.withData(item).build();
280+
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, elementContext)) {
281+
iterateOnList.set(i, processIterateOnItem(entry, elementContext, item, flattenKey, key, props, recordEvent));
282+
}
283+
}
284+
} else {
285+
// Evaluate condition once against root event
286+
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
287+
for (int i = 0; i < iterateOnList.size(); i++) {
288+
final Map<String, Object> item = iterateOnList.get(i);
289+
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
280290
}
281291
}
282292
}
283293
recordEvent.put(iterateOn, iterateOnList);
284294
}
285295
}
286296

287-
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
288-
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props) {
297+
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
298+
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
299+
final Event recordEvent) {
289300
final Event context = contextBuilder.withData(item).build();
290-
final Object value = retrieveValue(entry, context);
301+
return processIterateOnItem(entry, context, item, flattenKey, key, props, recordEvent);
302+
}
303+
304+
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, Event context,
305+
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
306+
final Event recordEvent) {
307+
final Event valueContext = !props.disableRootKeys ? recordEvent : context;
308+
final Object value = retrieveValue(entry, valueContext);
291309
final String keyStr = key.getKey(); // Key and keyStr are guaranteed non-null by caller
292310
if (!item.containsKey(keyStr) || props.overwriteIfExists) {
293311
if (flattenKey) {

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,28 @@ public static class Entry {
166166
})
167167
private boolean flattenKey = true;
168168

169+
@JsonProperty("disable_root_keys")
170+
@JsonPropertyDescription(
171+
"When set to <code>false</code> and used with <code>iterate_on</code>, resolves <code>value_expression</code> and <code>format</code> " +
172+
"against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. " +
173+
"Has no effect when not used with <code>iterate_on</code>. The default value is <code>true</code>.")
174+
@AlsoRequired(values = {
175+
@AlsoRequired.Required(name="iterate_on")
176+
})
177+
private boolean disableRootKeys = true;
178+
179+
@JsonProperty("evaluate_when_on_element")
180+
@JsonPropertyDescription(
181+
"When set to <code>true</code> and used with <code>iterate_on</code> and <code>add_to_element_when</code>, " +
182+
"evaluates the <code>add_to_element_when</code> condition against each individual array element instead of the root event. " +
183+
"This enables per-element conditional logic during array iteration. " +
184+
"The default value is <code>false</code>.")
185+
@AlsoRequired(values = {
186+
@AlsoRequired.Required(name="iterate_on"),
187+
@AlsoRequired.Required(name="add_to_element_when")
188+
})
189+
private boolean evaluateWhenOnElement = false;
190+
169191
@JsonProperty("add_when")
170192
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
171193
"such as <code>/some-key == \"test\"</code>, that will be evaluated to determine whether the processor will be run on the event.")
@@ -212,6 +234,14 @@ public boolean getFlattenKey(){
212234
return flattenKey;
213235
}
214236

237+
public boolean getDisableRootKeys() {
238+
return disableRootKeys;
239+
}
240+
241+
public boolean getEvaluateWhenOnElement() {
242+
return evaluateWhenOnElement;
243+
}
244+
215245
public String getAddWhen() { return addWhen; }
216246

217247
@AssertTrue(message = "Exactly one of value, format, or value_expression must be specified")
@@ -229,6 +259,64 @@ boolean flattenKeyFalseIsUsedWithIterateOn() {
229259
return (!flattenKey && iterateOn!=null) || flattenKey;
230260
}
231261

262+
@AssertTrue(message = "disable_root_keys=false only applies when iterate_on is configured.")
263+
boolean disableRootKeysFalseIsUsedWithIterateOn() {
264+
return disableRootKeys || iterateOn != null;
265+
}
266+
267+
@AssertTrue(message = "evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.")
268+
boolean evaluateWhenOnElementIsUsedWithIterateOnAndAddToElementWhen() {
269+
return !evaluateWhenOnElement || (iterateOn != null && addToElementWhen != null);
270+
}
271+
272+
public Entry(final String key,
273+
final String metadataKey,
274+
final Object value,
275+
final String format,
276+
final String valueExpression,
277+
final boolean overwriteIfKeyExists,
278+
final boolean appendIfKeyExists,
279+
final String addWhen,
280+
final String iterateOn,
281+
final boolean flattenKey,
282+
final boolean disableRootKeys,
283+
final boolean evaluateWhenOnElement,
284+
final String addToElementWhen)
285+
{
286+
if (key != null && metadataKey != null) {
287+
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
288+
}
289+
if (key == null && metadataKey == null) {
290+
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
291+
}
292+
if (metadataKey != null && iterateOn != null) {
293+
throw new IllegalArgumentException("iterate_on cannot be applied to metadata");
294+
}
295+
if (iterateOn == null && addToElementWhen != null) {
296+
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
297+
}
298+
if (!disableRootKeys && iterateOn == null) {
299+
throw new InvalidPluginConfigurationException("disable_root_keys=false only applies when iterate_on is configured.");
300+
}
301+
if (evaluateWhenOnElement && (iterateOn == null || addToElementWhen == null)) {
302+
throw new InvalidPluginConfigurationException("evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.");
303+
}
304+
305+
this.key = key;
306+
this.metadataKey = metadataKey;
307+
this.value = value;
308+
this.format = format;
309+
this.valueExpression = valueExpression;
310+
this.overwriteIfKeyExists = overwriteIfKeyExists;
311+
this.appendIfKeyExists = appendIfKeyExists;
312+
this.addWhen = addWhen;
313+
this.iterateOn = iterateOn;
314+
this.flattenKey = flattenKey;
315+
this.disableRootKeys = disableRootKeys;
316+
this.evaluateWhenOnElement = evaluateWhenOnElement;
317+
this.addToElementWhen = addToElementWhen;
318+
}
319+
232320
public Entry(final String key,
233321
final String metadataKey,
234322
final Object value,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
11+
12+
import org.junit.jupiter.api.Test;
13+
import org.opensearch.dataprepper.model.event.Event;
14+
import org.opensearch.dataprepper.model.event.JacksonEvent;
15+
import org.opensearch.dataprepper.model.processor.Processor;
16+
import org.opensearch.dataprepper.model.record.Record;
17+
import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest;
18+
import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile;
19+
import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite;
20+
21+
import java.util.*;
22+
23+
import static org.hamcrest.CoreMatchers.equalTo;
24+
import static org.hamcrest.CoreMatchers.is;
25+
import static org.hamcrest.MatcherAssert.assertThat;
26+
27+
@DataPrepperPluginTest(pluginName = "add_entries", pluginType = Processor.class)
28+
class AddEntryProcessorIT extends BaseDataPrepperPluginStandardTestSuite {
29+
30+
@Test
31+
void disable_root_keys_resolves_value_expression_from_root(
32+
@PluginConfigurationFile("iterate_on_with_disable_root_keys.yaml") final Processor<Record<Event>, Record<Event>> objectUnderTest) {
33+
final Map<String, Object> data = new HashMap<>();
34+
data.put("alert_title", "SQL Injection Detected");
35+
data.put("vulns", Arrays.asList(
36+
new HashMap<>(Map.of("cve", "CVE-2024-001")),
37+
new HashMap<>(Map.of("cve", "CVE-2024-002"))));
38+
39+
final Record<Event> record = buildRecordWithEvent(data);
40+
final List<Record<Event>> result = (List<Record<Event>>) objectUnderTest.execute(Collections.singletonList(record));
41+
42+
List<Map<String, Object>> vulns = result.get(0).getData().get("vulns", List.class);
43+
assertThat(vulns.size(), equalTo(2));
44+
assertThat(vulns.get(0).get("title"), equalTo("SQL Injection Detected"));
45+
assertThat(vulns.get(1).get("title"), equalTo("SQL Injection Detected"));
46+
}
47+
48+
@Test
49+
void evaluate_when_on_element_filters_per_element(
50+
@PluginConfigurationFile("iterate_on_with_evaluate_when_on_element.yaml") final Processor<Record<Event>, Record<Event>> objectUnderTest) {
51+
final Map<String, Object> data = new HashMap<>();
52+
data.put("vulns", Arrays.asList(
53+
new HashMap<>(Map.of("cve", "CVE-2024-001", "severity", "critical")),
54+
new HashMap<>(Map.of("cve", "CVE-2024-002", "severity", "low")),
55+
new HashMap<>(Map.of("cve", "CVE-2024-003", "severity", "critical"))));
56+
57+
final Record<Event> record = buildRecordWithEvent(data);
58+
final List<Record<Event>> result = (List<Record<Event>>) objectUnderTest.execute(Collections.singletonList(record));
59+
60+
List<Map<String, Object>> vulns = result.get(0).getData().get("vulns", List.class);
61+
assertThat(vulns.size(), equalTo(3));
62+
assertThat(vulns.get(0).get("flagged"), equalTo(true));
63+
assertThat(vulns.get(1).containsKey("flagged"), is(false));
64+
assertThat(vulns.get(2).get("flagged"), equalTo(true));
65+
}
66+
67+
@Test
68+
void both_flags_combined_resolves_root_value_with_per_element_condition(
69+
@PluginConfigurationFile("iterate_on_with_both_flags.yaml") final Processor<Record<Event>, Record<Event>> objectUnderTest) {
70+
final Map<String, Object> data = new HashMap<>();
71+
data.put("alert_title", "SQL Injection Detected");
72+
data.put("vulns", Arrays.asList(
73+
new HashMap<>(Map.of("cve", "CVE-2024-001", "severity", "critical")),
74+
new HashMap<>(Map.of("cve", "CVE-2024-002", "severity", "low")),
75+
new HashMap<>(Map.of("cve", "CVE-2024-003", "severity", "critical"))));
76+
77+
final Record<Event> record = buildRecordWithEvent(data);
78+
final List<Record<Event>> result = (List<Record<Event>>) objectUnderTest.execute(Collections.singletonList(record));
79+
80+
List<Map<String, Object>> vulns = result.get(0).getData().get("vulns", List.class);
81+
assertThat(vulns.size(), equalTo(3));
82+
assertThat(vulns.get(0).get("title"), equalTo("SQL Injection Detected"));
83+
assertThat(vulns.get(1).containsKey("title"), is(false));
84+
assertThat(vulns.get(2).get("title"), equalTo("SQL Injection Detected"));
85+
}
86+
87+
private static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
88+
return new Record<>(JacksonEvent.builder()
89+
.withData(data)
90+
.withEventType("event")
91+
.build());
92+
}
93+
}

0 commit comments

Comments
 (0)