NIFI-14220 allow use of Expression Language for PutElasticsearchJson and PutElasticsearchRecord boolean fields

This commit is contained in:
Chris Sampson 2025-02-03 21:35:55 +00:00
parent b91fe67009
commit b1695e4500
No known key found for this signature in database
GPG Key ID: 546AEB0826587237
3 changed files with 25 additions and 6 deletions

View File

@ -110,14 +110,14 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
.name("put-es-json-scripted-upsert")
.displayName("Scripted Upsert")
.description("Whether to add the scripted_upsert flag to the Upsert Operation. " +
"Forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. " +
"If true, forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. " +
"If the Upsert Document provided (from FlowFile content) will be empty, but sure to set the " +
CLIENT_SERVICE.getDisplayName() + " controller service's " + ElasticSearchClientService.SUPPRESS_NULLS.getDisplayName() +
" to " + ElasticSearchClientService.NEVER_SUPPRESS.getDisplayName() + " or no \"upsert\" doc will be, " +
"included in the request to Elasticsearch and the operation will not create a new document for the script " +
"to execute against, resulting in a \"not_found\" error")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();

View File

@ -166,7 +166,6 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
.displayName("Retain ID (Record Path)")
.description("Whether to retain the existing field used as the ID Record Path.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.dependsOn(ID_RECORD_PATH)
@ -242,7 +241,6 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
.displayName("Retain @timestamp (Record Path)")
.description("Whether to retain the existing field used as the @timestamp Record Path.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
@ -268,7 +266,6 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
"and the error related to the first record within the FlowFile added to the FlowFile as \"elasticsearch.bulk.error\". " +
"If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() + "\" is \"false\" then records associated with \"not_found\" " +
"Elasticsearch document responses will also be send to the \"" + REL_ERRORS.getName() + "\" relationship.")
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.required(false)

View File

@ -111,8 +111,12 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest {
}
void basicTest(final int failure, final int retry, final int successful, final Consumer<List<IndexOperationRequest>> consumer) {
basicTest(failure, retry, successful, consumer, Collections.emptyMap());
}
void basicTest(final int failure, final int retry, final int successful, final Consumer<List<IndexOperationRequest>> consumer, final Map<String, String> attr) {
clientService.setEvalConsumer(consumer);
basicTest(failure, retry, successful, Collections.emptyMap());
basicTest(failure, retry, successful, attr);
}
void basicTest(final int failure, final int retry, final int successful, final Map<String, String> attr) {
@ -289,6 +293,24 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest {
basicTest(0, 0, 1, consumer);
}
@Test
void simpleTestWithScriptedUpsertEL() {
runner.setProperty(PutElasticsearchJson.SCRIPT, script);
runner.setProperty(PutElasticsearchJson.DYNAMIC_TEMPLATES, dynamicTemplates);
runner.setProperty(PutElasticsearchJson.INDEX_OP, IndexOperationRequest.Operation.Upsert.getValue().toLowerCase());
runner.setProperty(PutElasticsearchJson.SCRIPTED_UPSERT, "${scripted}");
final Consumer<List<IndexOperationRequest>> consumer = (final List<IndexOperationRequest> items) -> {
final long scriptCount = items.stream().filter(item -> item.getScript().equals(expectedScript)).count();
final long trueScriptedUpsertCount = items.stream().filter(IndexOperationRequest::isScriptedUpsert).count();
final long dynamicTemplatesCount = items.stream().filter(item -> item.getDynamicTemplates().equals(expectedDynamicTemplate)).count();
assertEquals(1L, scriptCount);
assertEquals(1L, trueScriptedUpsertCount);
assertEquals(1L, dynamicTemplatesCount);
};
basicTest(0, 0, 1, consumer, Map.of("scripted", "true"));
}
@Test
void testNonJsonScript() {
runner.setProperty(PutElasticsearchJson.SCRIPT, "not-json");