diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java index af12d18102..0ab0a504f7 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.elasticsearch; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -37,12 +36,17 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; +import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -67,6 +71,16 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme .required(true) .build(); + static final PropertyDescriptor OUTPUT_ERROR_RESPONSES = new PropertyDescriptor.Builder() + .name("put-es-output-error-responses") + .displayName("Output Error Responses") + .description("If this is enabled, response messages from Elasticsearch marked as \"error\" will be output to the \"error_responses\" relationship." + + "This does not impact the output of flowfiles to the \"success\" or \"errors\" relationships") + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All flowfiles that succeed in being transferred into Elasticsearch go here. " + @@ -74,6 +88,12 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme "The Elasticsearch response will need to be examined to determine whether any Document(s)/Record(s) resulted in errors.") .build(); + static final Relationship REL_ERROR_RESPONSES = new Relationship.Builder() + .name("error_responses") + .description("Elasticsearch _bulk API responses marked as \"error\" go here " + + "(and optionally \"not_found\" when \"Treat \"Not Found\" as Error\" is \"true\").") + .build(); + static final List ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList( IndexOperationRequest.Operation.Create.getValue().toLowerCase(), IndexOperationRequest.Operation.Delete.getValue().toLowerCase(), @@ -82,12 +102,22 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme IndexOperationRequest.Operation.Upsert.getValue().toLowerCase() )); + private final AtomicReference> relationships = new AtomicReference<>(getBaseRelationships()); + boolean logErrors; + boolean outputErrorResponses; boolean notFoundIsSuccessful; ObjectMapper errorMapper; final AtomicReference clientService = new AtomicReference<>(null); + abstract Set getBaseRelationships(); + + @Override + public Set getRelationships() { + return relationships.get(); + } + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() @@ -99,6 +129,17 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme .build(); } + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (OUTPUT_ERROR_RESPONSES.equals(descriptor)) { + final Set newRelationships = new HashSet<>(getBaseRelationships()); + if (Boolean.parseBoolean(newValue)) { + newRelationships.add(REL_ERROR_RESPONSES); + } + relationships.set(newRelationships); + } + } + @Override public boolean isIndexNotExistSuccessful() { // index can be created during _bulk index/create operation @@ -110,8 +151,9 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class)); this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean(); + this.outputErrorResponses = context.getProperty(OUTPUT_ERROR_RESPONSES).asBoolean(); - if (errorMapper == null && (logErrors || getLogger().isDebugEnabled())) { + if (errorMapper == null && (outputErrorResponses || logErrors || getLogger().isDebugEnabled())) { errorMapper = new ObjectMapper(); errorMapper.enable(SerializationFeature.INDENT_OUTPUT); } @@ -158,15 +200,35 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme } } - void logElasticsearchDocumentErrors(final IndexOperationResponse response) throws JsonProcessingException { - if (logErrors || getLogger().isDebugEnabled()) { - final List> errors = response.getItems(); - final String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", errorMapper.writeValueAsString(errors)); + void handleElasticsearchDocumentErrors(final Map> errors, final ProcessSession session, final FlowFile parent) throws IOException { + if (!errors.isEmpty() && (outputErrorResponses || logErrors || getLogger().isDebugEnabled())) { + if (logErrors || getLogger().isDebugEnabled()) { + final String output = String.format( + "An error was encountered while processing bulk operations. Server response below:%n%n%s", + errorMapper.writeValueAsString(errors.values()) + ); - if (logErrors) { - getLogger().error(output); - } else { - getLogger().debug(output); + if (logErrors) { + getLogger().error(output); + } else { + getLogger().debug(output); + } + } + + if (outputErrorResponses) { + FlowFile errorResponsesFF = null; + try { + errorResponsesFF = session.create(parent); + try (final OutputStream errorsOutputStream = session.write(errorResponsesFF)) { + errorMapper.writeValue(errorsOutputStream, errors.values()); + } + errorResponsesFF = session.putAttribute(errorResponsesFF, "elasticsearch.put.error.count", String.valueOf(errors.size())); + session.transfer(errorResponsesFF, REL_ERROR_RESPONSES); + } catch (final IOException ex) { + getLogger().error("Unable to write error responses", ex); + session.remove(errorResponsesFF); + throw ex; + } } } } @@ -179,21 +241,29 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme return inner -> inner.containsKey("result") && "not_found".equals(inner.get("result")); } - @SafeVarargs - final List findElasticsearchResponseIndices(final IndexOperationResponse response, final Predicate>... responseItemFilter) { - final List indices = new ArrayList<>(response.getItems() == null ? 0 : response.getItems().size()); - if (response.getItems() != null) { + final Map> findElasticsearchResponseErrors(final IndexOperationResponse response) { + final Map> errors = new LinkedHashMap<>(response.getItems() == null ? 0 : response.getItems().size(), 1); + + final List>> errorItemFilters = new ArrayList<>(2); + if (response.hasErrors()) { + errorItemFilters.add(isElasticsearchError()); + } + if (!notFoundIsSuccessful) { + errorItemFilters.add(isElasticsearchNotFound()); + } + + if (response.getItems() != null && !errorItemFilters.isEmpty()) { for (int index = 0; index < response.getItems().size(); index++) { final Map current = response.getItems().get(index); if (!current.isEmpty()) { final String key = current.keySet().stream().findFirst().orElse(null); @SuppressWarnings("unchecked") final Map inner = (Map) current.get(key); - if (inner != null && Arrays.stream(responseItemFilter).anyMatch(p -> p.test(inner))) { - indices.add(index); + if (inner != null && errorItemFilters.stream().anyMatch(p -> p.test(inner))) { + errors.put(index, inner); } } } } - return indices; + return errors; } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java index aac7e00d1f..f17f015813 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java @@ -51,14 +51,15 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Predicate; import java.util.stream.Collectors; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "put", "index"}) @CapabilityDescription("An Elasticsearch put processor that uses the official Elastic REST client libraries.") @WritesAttributes({ - @WritesAttribute(attribute = "elasticsearch.put.error", description = "The error message provided by Elasticsearch if there is an error indexing the document.") + @WritesAttribute(attribute = "elasticsearch.put.error", + description = "The error message if there is an issue parsing the FlowFile, sending the parsed document to Elasticsearch or parsing the Elasticsearch response"), + @WritesAttribute(attribute = "elasticsearch.bulk.error", description = "The _bulk response if there was an error during processing the document within Elasticsearch.") }) @DynamicProperty( name = "The name of a URL query parameter to add", @@ -101,7 +102,8 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch { .name("put-es-json-error-documents") .displayName("Output Error Documents") .description("If this configuration property is true, the response from Elasticsearch will be examined for failed documents " + - "and the FlowFile(s) associated with the failed document(s) will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship.") + "and the FlowFile(s) associated with the failed document(s) will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship " + + "with \"elasticsearch.bulk.error\" attributes.") .allowableValues("true", "false") .defaultValue("false") .expressionLanguageSupported(ExpressionLanguageScope.NONE) @@ -110,9 +112,11 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch { static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder() .name("put-es-json-not_found-is-error") - .displayName("Treat \"Not Found\" as Error") + .displayName("Treat \"Not Found\" as Success") .description("If true, \"not_found\" Elasticsearch Document associated FlowFiles will be routed to the \"" + REL_SUCCESS.getName() + - "\" relationship, otherwise to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship.") + "\" relationship, otherwise to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship. " + + "If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is \"true\" then \"not_found\" responses from Elasticsearch " + + "will be sent to the " + REL_ERROR_RESPONSES.getName() + " relationship") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .allowableValues("true", "false") .defaultValue("true") @@ -121,19 +125,19 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch { .build(); static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( - ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, CLIENT_SERVICE, LOG_ERROR_RESPONSES, - OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL + ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, CLIENT_SERVICE, + LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL )); - static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS + static final Set BASE_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS ))); private boolean outputErrors; - private final ObjectMapper inputMapper = new ObjectMapper(); + private final ObjectMapper objectMapper = new ObjectMapper(); @Override - public Set getRelationships() { - return RELATIONSHIPS; + Set getBaseRelationships() { + return BASE_RELATIONSHIPS; } @Override @@ -141,6 +145,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch { return DESCRIPTORS; } + @Override @OnScheduled public void onScheduled(final ProcessContext context) { super.onScheduled(context); @@ -174,7 +179,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch { try (final InputStream inStream = session.read(input)) { final byte[] result = IOUtils.toByteArray(inStream); @SuppressWarnings("unchecked") - final Map contentMap = inputMapper.readValue(new String(result, charset), Map.class); + final Map contentMap = objectMapper.readValue(new String(result, charset), Map.class); final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(indexOp); operations.add(new IndexOperationRequest(index, type, id, contentMap, o)); @@ -195,7 +200,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch { if (!originals.isEmpty()) { try { - final List errorDocuments = indexDocuments(operations, originals, context); + final List errorDocuments = indexDocuments(operations, originals, context, session); session.transfer(errorDocuments, REL_FAILED_DOCUMENTS); errorDocuments.forEach(e -> session.getProvenanceReporter().send( @@ -239,26 +244,28 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch { } } - @SuppressWarnings("unchecked") - private List indexDocuments(final List operations, final List originals, final ProcessContext context) throws JsonProcessingException { + private List indexDocuments(final List operations, final List originals, final ProcessContext context, final ProcessSession session) throws IOException { final IndexOperationResponse response = clientService.get().bulk(operations, getUrlQueryParameters(context, originals.get(0))); - final List errorDocuments = new ArrayList<>(response.getItems() == null ? 0 : response.getItems().size()); - List>> errorItemFilters = new ArrayList<>(2); - if (response.hasErrors()) { - logElasticsearchDocumentErrors(response); - - if (outputErrors) { - errorItemFilters.add(isElasticsearchError()); - } + final Map> errors = findElasticsearchResponseErrors(response); + final List errorDocuments = outputErrors ? new ArrayList<>(errors.size()) : Collections.emptyList(); + if (outputErrors) { + errors.forEach((index, error) -> { + String errorMessage; + try { + errorMessage = objectMapper.writeValueAsString(error); + } catch (JsonProcessingException e) { + errorMessage = String.format( + "{\"error\": {\"type\": \"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}", + e.getMessage().replace("\"", "\\\"") + ); + } + errorDocuments.add(session.putAttribute(originals.get(index), "elasticsearch.bulk.error", errorMessage)); + }); } - if (!notFoundIsSuccessful) { - errorItemFilters.add(isElasticsearchNotFound()); - } - if (!errorItemFilters.isEmpty()) { - findElasticsearchResponseIndices(response, errorItemFilters.toArray(new Predicate[0])) - .forEach(index -> errorDocuments.add(originals.get((Integer) index))); + if (!errors.isEmpty()) { + handleElasticsearchDocumentErrors(errors, session, null); } return errorDocuments; diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java index d70a615915..13fb557756 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java @@ -76,13 +76,13 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Predicate; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "put", "index", "record"}) @CapabilityDescription("A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.") @WritesAttributes({ - @WritesAttribute(attribute = "elasticsearch.put.error", description = "The error message provided by Elasticsearch if there is an error indexing the documents."), + @WritesAttribute(attribute = "elasticsearch.put.error", + description = "The error message if there is an issue parsing the FlowFile records, sending the parsed documents to Elasticsearch or parsing the Elasticsearch response."), @WritesAttribute(attribute = "elasticsearch.put.error.count", description = "The number of records that generated errors in the Elasticsearch _bulk API."), @WritesAttribute(attribute = "elasticsearch.put.success.count", description = "The number of records that were successfully processed by the Elasticsearch _bulk API.") }) @@ -207,7 +207,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch { .displayName("Result Record Writer") .description("If this configuration property is set, the response from Elasticsearch will be examined for failed records " + "and the failed records will be written to a record set with this record writer service and sent to the \"" + - REL_FAILED_RECORDS.getName() + "\" relationship. Successful records will be written to a record set" + + REL_FAILED_RECORDS.getName() + "\" relationship. Successful records will be written to a record set " + "with this record writer service and sent to the \"" + REL_SUCCESSFUL_RECORDS.getName() + "\" relationship.") .identifiesControllerService(RecordSetWriterFactory.class) .addValidator(Validator.VALID) @@ -216,9 +216,11 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch { static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder() .name("put-es-record-not_found-is-error") - .displayName("Treat \"Not Found\" as Error") + .displayName("Treat \"Not Found\" as Success") .description("If true, \"not_found\" Elasticsearch Document associated Records will be routed to the \"" + - REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship.") + REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. " + + "If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is \"true\" then \"not_found\" responses from Elasticsearch " + + "will be sent to the " + REL_ERROR_RESPONSES.getName() + " relationship") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .allowableValues("true", "false") .defaultValue("true") @@ -266,10 +268,11 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch { static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD, INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD, - DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL + DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, RESULT_RECORD_WRITER, + NOT_FOUND_IS_SUCCESSFUL )); - static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS + static final Set BASE_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS ))); private RecordPathCache recordPathCache; @@ -281,8 +284,8 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch { private volatile String timestampFormat; @Override - public Set getRelationships() { - return RELATIONSHIPS; + Set getBaseRelationships() { + return BASE_RELATIONSHIPS; } @Override @@ -290,6 +293,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch { return DESCRIPTORS; } + @Override @OnScheduled public void onScheduled(final ProcessContext context) { super.onScheduled(context); @@ -460,19 +464,12 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch { private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessContext context, final ProcessSession session, final FlowFile input) throws IOException, SchemaNotFoundException { final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), getUrlQueryParameters(context, input)); - List>> errorItemFilters = new ArrayList<>(2); - if (response.hasErrors()) { - logElasticsearchDocumentErrors(response); - errorItemFilters.add(isElasticsearchError()); + final Map> errors = findElasticsearchResponseErrors(response); + if (!errors.isEmpty()) { + handleElasticsearchDocumentErrors(errors, session, input); } - if (writerFactory != null && !notFoundIsSuccessful) { - errorItemFilters.add(isElasticsearchNotFound()); - } - - @SuppressWarnings("unchecked") - final List errorIndices = findElasticsearchResponseIndices(response, errorItemFilters.toArray(new Predicate[0])); - final int numErrors = errorIndices.size(); + final int numErrors = errors.size(); final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors; FlowFile errorFF = null; FlowFile successFF = null; @@ -490,7 +487,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch { errorWriter.beginRecordSet(); successWriter.beginRecordSet(); for (int o = 0; o < bundle.getOriginalRecords().size(); o++) { - if (errorIndices.contains(o)) { + if (errors.containsKey(o)) { errorWriter.write(bundle.getOriginalRecords().get(o)); } else { successWriter.write(bundle.getOriginalRecords().get(o)); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy new file mode 100644 index 0000000000..b287d42298 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch + +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.junit.jupiter.api.Test + +import static org.hamcrest.CoreMatchers.hasItem +import static org.hamcrest.CoreMatchers.not +import static org.hamcrest.MatcherAssert.assertThat + +abstract class AbstractPutElasticsearchTest

{ + abstract P getProcessor() + + @Test + void testOutputErrorResponsesRelationship() { + final TestRunner runner = createRunner() + + assertThat(runner.getProcessor().getRelationships(), not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES))) + + runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES, "true") + assertThat(runner.getProcessor().getRelationships(), hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES)) + + runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES, "false") + assertThat(runner.getProcessor().getRelationships(), not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES))) + } + + TestRunner createRunner() { + final P processor = getProcessor() + final TestRunner runner = TestRunners.newTestRunner(processor) + + return runner + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy index 21068df443..e3c371eea8 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy @@ -22,20 +22,18 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService import org.apache.nifi.provenance.ProvenanceEventType import org.apache.nifi.util.TestRunner -import org.apache.nifi.util.TestRunners import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import static groovy.json.JsonOutput.prettyPrint import static groovy.json.JsonOutput.toJson - import static org.hamcrest.CoreMatchers.containsString import static org.hamcrest.MatcherAssert.assertThat import static org.junit.jupiter.api.Assertions.assertEquals import static org.junit.jupiter.api.Assertions.assertThrows import static org.junit.jupiter.api.Assertions.assertTrue -class PutElasticsearchJsonTest { +class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest { MockBulkLoadClientService clientService TestRunner runner @@ -43,10 +41,15 @@ class PutElasticsearchJsonTest { [ msg: "Hello, world", from: "john.smith" ] )) + @Override + PutElasticsearchJson getProcessor() { + return new PutElasticsearchJson() + } + @BeforeEach void setup() { clientService = new MockBulkLoadClientService() - runner = TestRunners.newTestRunner(PutElasticsearchJson.class) + runner = createRunner() clientService.response = new IndexOperationResponse(1500) @@ -60,6 +63,7 @@ class PutElasticsearchJsonTest { runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false") runner.setProperty(PutElasticsearchJson.CLIENT_SERVICE, "clientService") runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, "true") + runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "false") runner.enableControllerService(clientService) runner.assertValid() @@ -98,6 +102,7 @@ class PutElasticsearchJsonTest { runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, retry) runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, success) runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) assertEquals(success, runner.getProvenanceEvents().stream().filter({ @@ -203,6 +208,7 @@ class PutElasticsearchJsonTest { runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1) runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) } @Test @@ -212,6 +218,7 @@ class PutElasticsearchJsonTest { runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0) runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) } @Test @@ -241,7 +248,13 @@ class PutElasticsearchJsonTest { runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 1) - assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(), containsString("20abcd")) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) + + def failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0]; + assertThat(failedDoc.getContent(), containsString("20abcd")) + failedDoc.assertAttributeExists("elasticsearch.bulk.error") + failedDoc.assertAttributeNotExists("elasticsearch.put.error") + assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), containsString("mapper_parsing_exception")) assertEquals(1, runner.getProvenanceEvents().stream().filter({ e -> ProvenanceEventType.SEND == e.getEventType() && "Elasticsearch _bulk operation error" == e.getDetails() @@ -253,6 +266,7 @@ class PutElasticsearchJsonTest { runner.clearProvenanceEvents() runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, "false") + runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "true") for (final def val : values) { runner.enqueue(prettyPrint(toJson(val))) @@ -264,8 +278,24 @@ class PutElasticsearchJsonTest { runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 2) - assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(), containsString("not_found")) - assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1].getContent(), containsString("20abcd")) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1) + + failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0]; + assertThat(failedDoc.getContent(), containsString("not_found")) + failedDoc.assertAttributeExists("elasticsearch.bulk.error") + failedDoc.assertAttributeNotExists("elasticsearch.put.error") + assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), containsString("not_found")) + + failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1]; + assertThat(failedDoc.getContent(), containsString("20abcd")) + failedDoc.assertAttributeExists("elasticsearch.bulk.error") + failedDoc.assertAttributeNotExists("elasticsearch.put.error") + assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), containsString("number_format_exception")) + + final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent() + assertThat(errorResponses, containsString("not_found")) + assertThat(errorResponses, containsString("For input string: 20abc")) + assertEquals(2, runner.getProvenanceEvents().stream().filter({ e -> ProvenanceEventType.SEND == e.getEventType() && "Elasticsearch _bulk operation error" == e.getDetails() @@ -285,7 +315,8 @@ class PutElasticsearchJsonTest { [ id: "1", field1: 'value1', field2: '20' ], [ id: "2", field1: 'value1', field2: '20' ], [ id: "2", field1: 'value1', field2: '20' ], - [ id: "3", field1: 'value1', field2: '20abcd' ] + [ id: "3", field1: 'value1', field2: '20abcd' ], + [ id: "4", field1: 'value2', field2: '30' ] ] for (final def val : values) { @@ -294,10 +325,11 @@ class PutElasticsearchJsonTest { runner.assertValid() runner.run() - runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4) + runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 5) runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) } @Test @@ -309,6 +341,7 @@ class PutElasticsearchJsonTest { runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1) runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE)[0].assertAttributeEquals( "elasticsearch.put.error", diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy index 5e976a31d3..085844be3a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy @@ -32,7 +32,6 @@ import org.apache.nifi.serialization.record.MockSchemaRegistry import org.apache.nifi.serialization.record.RecordFieldType import org.apache.nifi.util.StringUtils import org.apache.nifi.util.TestRunner -import org.apache.nifi.util.TestRunners import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -46,9 +45,14 @@ import java.time.format.DateTimeFormatter import static groovy.json.JsonOutput.prettyPrint import static groovy.json.JsonOutput.toJson -import static org.junit.jupiter.api.Assertions.* +import static org.hamcrest.CoreMatchers.containsString +import static org.hamcrest.MatcherAssert.assertThat +import static org.junit.jupiter.api.Assertions.assertEquals +import static org.junit.jupiter.api.Assertions.assertNotNull +import static org.junit.jupiter.api.Assertions.assertThrows +import static org.junit.jupiter.api.Assertions.assertTrue -class PutElasticsearchRecordTest { +class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest { private static final int DATE_YEAR = 2020 private static final int DATE_MONTH = 11 private static final int DATE_DAY = 27 @@ -81,12 +85,17 @@ class PutElasticsearchRecordTest { static final String flowFileContents = prettyPrint(toJson(flowFileContentMaps)) + @Override + PutElasticsearchRecord getProcessor() { + return new PutElasticsearchRecord() + } + @BeforeEach void setup() { clientService = new MockBulkLoadClientService() registry = new MockSchemaRegistry() reader = new JsonTreeReader() - runner = TestRunners.newTestRunner(PutElasticsearchRecord.class) + runner = createRunner() registry.addSchema("simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(SCHEMA))) @@ -141,6 +150,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) if (success > 0) { runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESS).forEach({ ff -> @@ -319,6 +329,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.clearTransferState() @@ -374,6 +385,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.clearTransferState() @@ -416,6 +428,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.clearTransferState() @@ -455,6 +468,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.clearTransferState() @@ -477,6 +491,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.clearTransferState() @@ -502,6 +517,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.clearTransferState() @@ -528,6 +544,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) } @Test @@ -604,6 +621,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.clearTransferState() @@ -658,6 +676,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) } @Test @@ -681,6 +700,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) } @Test @@ -691,6 +711,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) } @Test @@ -734,6 +755,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0] .assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1") @@ -760,6 +782,7 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0) runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0] .assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2") @@ -776,7 +799,9 @@ class PutElasticsearchRecordTest { runner.clearTransferState() runner.clearProvenanceEvents() + // errors still counted/logged even if not outputting to the error relationship runner.removeProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER) + runner.setProperty(PutElasticsearchRecord.OUTPUT_ERROR_RESPONSES, "true") runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 'errorTest' ]) runner.assertValid() runner.run() @@ -786,10 +811,15 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0) runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1) + + final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent() + assertThat(errorResponses, containsString("not_found")) + assertThat(errorResponses, containsString("For input string: 20abc")) assertEquals(1, runner.getProvenanceEvents().stream().filter({ - e -> ProvenanceEventType.SEND == e.getEventType() && e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [1 error(s), 4 success(es)]" + e -> ProvenanceEventType.SEND == e.getEventType() && e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3 success(es)]" }).count() ) } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java index c839a3f090..5c2b56ba94 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java @@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY; public abstract class AbstractElasticsearchITBase { // default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile protected static final DockerImageName IMAGE = DockerImageName - .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.6.1")); + .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.7.0")); protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20)); private static final int PORT = 9200; protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml index dc9946dddc..5364ec52a1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml @@ -101,7 +101,7 @@ language governing permissions and limitations under the License. --> - 8.6.1 + 8.7.0 s3cret