NIFI-11111 add option to output Elasticsearch error responses as FlowFile to PutElasticsearchJson and PutElasticsearchRecord

NIFI-11111 clarify error_responses relationships in PutElasticsearchJson/Record processors
NIFI-11111 Refactor exception handling for error response flowfile transfer
NIFI-11111 Add elasticsearch.bulk.error attributes containing the Elasticsearch _bulk response for error documents in PutElasticsearchJson

This closes #6903

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Chris Sampson 2023-01-29 19:32:27 +00:00 committed by Mike Thomsen
parent 38ece1d7d9
commit 05418d94f0
8 changed files with 272 additions and 85 deletions

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -37,12 +36,17 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
@ -67,6 +71,16 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
.required(true)
.build();
static final PropertyDescriptor OUTPUT_ERROR_RESPONSES = new PropertyDescriptor.Builder()
.name("put-es-output-error-responses")
.displayName("Output Error Responses")
.description("If this is enabled, response messages from Elasticsearch marked as \"error\" will be output to the \"error_responses\" relationship." +
"This does not impact the output of flowfiles to the \"success\" or \"errors\" relationships")
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All flowfiles that succeed in being transferred into Elasticsearch go here. " +
@ -74,6 +88,12 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
"The Elasticsearch response will need to be examined to determine whether any Document(s)/Record(s) resulted in errors.")
.build();
static final Relationship REL_ERROR_RESPONSES = new Relationship.Builder()
.name("error_responses")
.description("Elasticsearch _bulk API responses marked as \"error\" go here " +
"(and optionally \"not_found\" when \"Treat \"Not Found\" as Error\" is \"true\").")
.build();
static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
@ -82,12 +102,22 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
));
private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>(getBaseRelationships());
boolean logErrors;
boolean outputErrorResponses;
boolean notFoundIsSuccessful;
ObjectMapper errorMapper;
final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
abstract Set<Relationship> getBaseRelationships();
@Override
public Set<Relationship> getRelationships() {
return relationships.get();
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
@ -99,6 +129,17 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
.build();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (OUTPUT_ERROR_RESPONSES.equals(descriptor)) {
final Set<Relationship> newRelationships = new HashSet<>(getBaseRelationships());
if (Boolean.parseBoolean(newValue)) {
newRelationships.add(REL_ERROR_RESPONSES);
}
relationships.set(newRelationships);
}
}
@Override
public boolean isIndexNotExistSuccessful() {
// index can be created during _bulk index/create operation
@ -110,8 +151,9 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
this.outputErrorResponses = context.getProperty(OUTPUT_ERROR_RESPONSES).asBoolean();
if (errorMapper == null && (logErrors || getLogger().isDebugEnabled())) {
if (errorMapper == null && (outputErrorResponses || logErrors || getLogger().isDebugEnabled())) {
errorMapper = new ObjectMapper();
errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
}
@ -158,15 +200,35 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
}
}
void logElasticsearchDocumentErrors(final IndexOperationResponse response) throws JsonProcessingException {
if (logErrors || getLogger().isDebugEnabled()) {
final List<Map<String, Object>> errors = response.getItems();
final String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", errorMapper.writeValueAsString(errors));
void handleElasticsearchDocumentErrors(final Map<Integer, Map<String, Object>> errors, final ProcessSession session, final FlowFile parent) throws IOException {
if (!errors.isEmpty() && (outputErrorResponses || logErrors || getLogger().isDebugEnabled())) {
if (logErrors || getLogger().isDebugEnabled()) {
final String output = String.format(
"An error was encountered while processing bulk operations. Server response below:%n%n%s",
errorMapper.writeValueAsString(errors.values())
);
if (logErrors) {
getLogger().error(output);
} else {
getLogger().debug(output);
if (logErrors) {
getLogger().error(output);
} else {
getLogger().debug(output);
}
}
if (outputErrorResponses) {
FlowFile errorResponsesFF = null;
try {
errorResponsesFF = session.create(parent);
try (final OutputStream errorsOutputStream = session.write(errorResponsesFF)) {
errorMapper.writeValue(errorsOutputStream, errors.values());
}
errorResponsesFF = session.putAttribute(errorResponsesFF, "elasticsearch.put.error.count", String.valueOf(errors.size()));
session.transfer(errorResponsesFF, REL_ERROR_RESPONSES);
} catch (final IOException ex) {
getLogger().error("Unable to write error responses", ex);
session.remove(errorResponsesFF);
throw ex;
}
}
}
}
@ -179,21 +241,29 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
return inner -> inner.containsKey("result") && "not_found".equals(inner.get("result"));
}
@SafeVarargs
final List<Integer> findElasticsearchResponseIndices(final IndexOperationResponse response, final Predicate<Map<String, Object>>... responseItemFilter) {
final List<Integer> indices = new ArrayList<>(response.getItems() == null ? 0 : response.getItems().size());
if (response.getItems() != null) {
final Map<Integer, Map<String, Object>> findElasticsearchResponseErrors(final IndexOperationResponse response) {
final Map<Integer, Map<String, Object>> errors = new LinkedHashMap<>(response.getItems() == null ? 0 : response.getItems().size(), 1);
final List<Predicate<Map<String, Object>>> errorItemFilters = new ArrayList<>(2);
if (response.hasErrors()) {
errorItemFilters.add(isElasticsearchError());
}
if (!notFoundIsSuccessful) {
errorItemFilters.add(isElasticsearchNotFound());
}
if (response.getItems() != null && !errorItemFilters.isEmpty()) {
for (int index = 0; index < response.getItems().size(); index++) {
final Map<String, Object> current = response.getItems().get(index);
if (!current.isEmpty()) {
final String key = current.keySet().stream().findFirst().orElse(null);
@SuppressWarnings("unchecked") final Map<String, Object> inner = (Map<String, Object>) current.get(key);
if (inner != null && Arrays.stream(responseItemFilter).anyMatch(p -> p.test(inner))) {
indices.add(index);
if (inner != null && errorItemFilters.stream().anyMatch(p -> p.test(inner))) {
errors.put(index, inner);
}
}
}
}
return indices;
return errors;
}
}

View File

@ -51,14 +51,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "put", "index"})
@CapabilityDescription("An Elasticsearch put processor that uses the official Elastic REST client libraries.")
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.put.error", description = "The error message provided by Elasticsearch if there is an error indexing the document.")
@WritesAttribute(attribute = "elasticsearch.put.error",
description = "The error message if there is an issue parsing the FlowFile, sending the parsed document to Elasticsearch or parsing the Elasticsearch response"),
@WritesAttribute(attribute = "elasticsearch.bulk.error", description = "The _bulk response if there was an error during processing the document within Elasticsearch.")
})
@DynamicProperty(
name = "The name of a URL query parameter to add",
@ -101,7 +102,8 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
.name("put-es-json-error-documents")
.displayName("Output Error Documents")
.description("If this configuration property is true, the response from Elasticsearch will be examined for failed documents " +
"and the FlowFile(s) associated with the failed document(s) will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship.")
"and the FlowFile(s) associated with the failed document(s) will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship " +
"with \"elasticsearch.bulk.error\" attributes.")
.allowableValues("true", "false")
.defaultValue("false")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@ -110,9 +112,11 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder()
.name("put-es-json-not_found-is-error")
.displayName("Treat \"Not Found\" as Error")
.displayName("Treat \"Not Found\" as Success")
.description("If true, \"not_found\" Elasticsearch Document associated FlowFiles will be routed to the \"" + REL_SUCCESS.getName() +
"\" relationship, otherwise to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship.")
"\" relationship, otherwise to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship. " +
"If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is \"true\" then \"not_found\" responses from Elasticsearch " +
"will be sent to the " + REL_ERROR_RESPONSES.getName() + " relationship")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
@ -121,19 +125,19 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
.build();
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, CLIENT_SERVICE, LOG_ERROR_RESPONSES,
OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL
ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, CLIENT_SERVICE,
LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
static final Set<Relationship> BASE_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
)));
private boolean outputErrors;
private final ObjectMapper inputMapper = new ObjectMapper();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
Set<Relationship> getBaseRelationships() {
return BASE_RELATIONSHIPS;
}
@Override
@ -141,6 +145,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
return DESCRIPTORS;
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
@ -174,7 +179,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
try (final InputStream inStream = session.read(input)) {
final byte[] result = IOUtils.toByteArray(inStream);
@SuppressWarnings("unchecked")
final Map<String, Object> contentMap = inputMapper.readValue(new String(result, charset), Map.class);
final Map<String, Object> contentMap = objectMapper.readValue(new String(result, charset), Map.class);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(indexOp);
operations.add(new IndexOperationRequest(index, type, id, contentMap, o));
@ -195,7 +200,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
if (!originals.isEmpty()) {
try {
final List<FlowFile> errorDocuments = indexDocuments(operations, originals, context);
final List<FlowFile> errorDocuments = indexDocuments(operations, originals, context, session);
session.transfer(errorDocuments, REL_FAILED_DOCUMENTS);
errorDocuments.forEach(e ->
session.getProvenanceReporter().send(
@ -239,26 +244,28 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
}
}
@SuppressWarnings("unchecked")
private List<FlowFile> indexDocuments(final List<IndexOperationRequest> operations, final List<FlowFile> originals, final ProcessContext context) throws JsonProcessingException {
private List<FlowFile> indexDocuments(final List<IndexOperationRequest> operations, final List<FlowFile> originals, final ProcessContext context, final ProcessSession session) throws IOException {
final IndexOperationResponse response = clientService.get().bulk(operations, getUrlQueryParameters(context, originals.get(0)));
final List<FlowFile> errorDocuments = new ArrayList<>(response.getItems() == null ? 0 : response.getItems().size());
List<Predicate<Map<String, Object>>> errorItemFilters = new ArrayList<>(2);
if (response.hasErrors()) {
logElasticsearchDocumentErrors(response);
if (outputErrors) {
errorItemFilters.add(isElasticsearchError());
}
final Map<Integer, Map<String, Object>> errors = findElasticsearchResponseErrors(response);
final List<FlowFile> errorDocuments = outputErrors ? new ArrayList<>(errors.size()) : Collections.emptyList();
if (outputErrors) {
errors.forEach((index, error) -> {
String errorMessage;
try {
errorMessage = objectMapper.writeValueAsString(error);
} catch (JsonProcessingException e) {
errorMessage = String.format(
"{\"error\": {\"type\": \"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
e.getMessage().replace("\"", "\\\"")
);
}
errorDocuments.add(session.putAttribute(originals.get(index), "elasticsearch.bulk.error", errorMessage));
});
}
if (!notFoundIsSuccessful) {
errorItemFilters.add(isElasticsearchNotFound());
}
if (!errorItemFilters.isEmpty()) {
findElasticsearchResponseIndices(response, errorItemFilters.toArray(new Predicate[0]))
.forEach(index -> errorDocuments.add(originals.get((Integer) index)));
if (!errors.isEmpty()) {
handleElasticsearchDocumentErrors(errors, session, null);
}
return errorDocuments;

View File

@ -76,13 +76,13 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "put", "index", "record"})
@CapabilityDescription("A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.")
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.put.error", description = "The error message provided by Elasticsearch if there is an error indexing the documents."),
@WritesAttribute(attribute = "elasticsearch.put.error",
description = "The error message if there is an issue parsing the FlowFile records, sending the parsed documents to Elasticsearch or parsing the Elasticsearch response."),
@WritesAttribute(attribute = "elasticsearch.put.error.count", description = "The number of records that generated errors in the Elasticsearch _bulk API."),
@WritesAttribute(attribute = "elasticsearch.put.success.count", description = "The number of records that were successfully processed by the Elasticsearch _bulk API.")
})
@ -207,7 +207,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
.displayName("Result Record Writer")
.description("If this configuration property is set, the response from Elasticsearch will be examined for failed records " +
"and the failed records will be written to a record set with this record writer service and sent to the \"" +
REL_FAILED_RECORDS.getName() + "\" relationship. Successful records will be written to a record set" +
REL_FAILED_RECORDS.getName() + "\" relationship. Successful records will be written to a record set " +
"with this record writer service and sent to the \"" + REL_SUCCESSFUL_RECORDS.getName() + "\" relationship.")
.identifiesControllerService(RecordSetWriterFactory.class)
.addValidator(Validator.VALID)
@ -216,9 +216,11 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder()
.name("put-es-record-not_found-is-error")
.displayName("Treat \"Not Found\" as Error")
.displayName("Treat \"Not Found\" as Success")
.description("If true, \"not_found\" Elasticsearch Document associated Records will be routed to the \"" +
REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship.")
REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. " +
"If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is \"true\" then \"not_found\" responses from Elasticsearch " +
"will be sent to the " + REL_ERROR_RESPONSES.getName() + " relationship")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
@ -266,10 +268,11 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL
DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, RESULT_RECORD_WRITER,
NOT_FOUND_IS_SUCCESSFUL
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS
static final Set<Relationship> BASE_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS
)));
private RecordPathCache recordPathCache;
@ -281,8 +284,8 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
private volatile String timestampFormat;
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
Set<Relationship> getBaseRelationships() {
return BASE_RELATIONSHIPS;
}
@Override
@ -290,6 +293,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
return DESCRIPTORS;
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
@ -460,19 +464,12 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessContext context, final ProcessSession session, final FlowFile input) throws IOException, SchemaNotFoundException {
final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), getUrlQueryParameters(context, input));
List<Predicate<Map<String, Object>>> errorItemFilters = new ArrayList<>(2);
if (response.hasErrors()) {
logElasticsearchDocumentErrors(response);
errorItemFilters.add(isElasticsearchError());
final Map<Integer, Map<String, Object>> errors = findElasticsearchResponseErrors(response);
if (!errors.isEmpty()) {
handleElasticsearchDocumentErrors(errors, session, input);
}
if (writerFactory != null && !notFoundIsSuccessful) {
errorItemFilters.add(isElasticsearchNotFound());
}
@SuppressWarnings("unchecked")
final List<Integer> errorIndices = findElasticsearchResponseIndices(response, errorItemFilters.toArray(new Predicate[0]));
final int numErrors = errorIndices.size();
final int numErrors = errors.size();
final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
FlowFile errorFF = null;
FlowFile successFF = null;
@ -490,7 +487,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
errorWriter.beginRecordSet();
successWriter.beginRecordSet();
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
if (errorIndices.contains(o)) {
if (errors.containsKey(o)) {
errorWriter.write(bundle.getOriginalRecords().get(o));
} else {
successWriter.write(bundle.getOriginalRecords().get(o));

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.Test
import static org.hamcrest.CoreMatchers.hasItem
import static org.hamcrest.CoreMatchers.not
import static org.hamcrest.MatcherAssert.assertThat
abstract class AbstractPutElasticsearchTest<P extends AbstractPutElasticsearch> {
abstract P getProcessor()
@Test
void testOutputErrorResponsesRelationship() {
final TestRunner runner = createRunner()
assertThat(runner.getProcessor().getRelationships(), not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES)))
runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES, "true")
assertThat(runner.getProcessor().getRelationships(), hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES))
runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES, "false")
assertThat(runner.getProcessor().getRelationships(), not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES)))
}
TestRunner createRunner() {
final P processor = getProcessor()
final TestRunner runner = TestRunners.newTestRunner(processor)
return runner
}
}

View File

@ -22,20 +22,18 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
import static org.hamcrest.CoreMatchers.containsString
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.junit.jupiter.api.Assertions.assertTrue
class PutElasticsearchJsonTest {
class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutElasticsearchJson> {
MockBulkLoadClientService clientService
TestRunner runner
@ -43,10 +41,15 @@ class PutElasticsearchJsonTest {
[ msg: "Hello, world", from: "john.smith" ]
))
@Override
PutElasticsearchJson getProcessor() {
return new PutElasticsearchJson()
}
@BeforeEach
void setup() {
clientService = new MockBulkLoadClientService()
runner = TestRunners.newTestRunner(PutElasticsearchJson.class)
runner = createRunner()
clientService.response = new IndexOperationResponse(1500)
@ -60,6 +63,7 @@ class PutElasticsearchJsonTest {
runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false")
runner.setProperty(PutElasticsearchJson.CLIENT_SERVICE, "clientService")
runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, "true")
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "false")
runner.enableControllerService(clientService)
runner.assertValid()
@ -98,6 +102,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, retry)
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, success)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
assertEquals(success,
runner.getProvenanceEvents().stream().filter({
@ -203,6 +208,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
}
@Test
@ -212,6 +218,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
}
@Test
@ -241,7 +248,13 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 1)
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(), containsString("20abcd"))
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
def failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
assertThat(failedDoc.getContent(), containsString("20abcd"))
failedDoc.assertAttributeExists("elasticsearch.bulk.error")
failedDoc.assertAttributeNotExists("elasticsearch.put.error")
assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), containsString("mapper_parsing_exception"))
assertEquals(1,
runner.getProvenanceEvents().stream().filter({
e -> ProvenanceEventType.SEND == e.getEventType() && "Elasticsearch _bulk operation error" == e.getDetails()
@ -253,6 +266,7 @@ class PutElasticsearchJsonTest {
runner.clearProvenanceEvents()
runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, "false")
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "true")
for (final def val : values) {
runner.enqueue(prettyPrint(toJson(val)))
@ -264,8 +278,24 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 2)
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(), containsString("not_found"))
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1].getContent(), containsString("20abcd"))
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1)
failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
assertThat(failedDoc.getContent(), containsString("not_found"))
failedDoc.assertAttributeExists("elasticsearch.bulk.error")
failedDoc.assertAttributeNotExists("elasticsearch.put.error")
assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), containsString("not_found"))
failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1];
assertThat(failedDoc.getContent(), containsString("20abcd"))
failedDoc.assertAttributeExists("elasticsearch.bulk.error")
failedDoc.assertAttributeNotExists("elasticsearch.put.error")
assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), containsString("number_format_exception"))
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
assertThat(errorResponses, containsString("not_found"))
assertThat(errorResponses, containsString("For input string: 20abc"))
assertEquals(2,
runner.getProvenanceEvents().stream().filter({
e -> ProvenanceEventType.SEND == e.getEventType() && "Elasticsearch _bulk operation error" == e.getDetails()
@ -285,7 +315,8 @@ class PutElasticsearchJsonTest {
[ id: "1", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
[ id: "3", field1: 'value1', field2: '20abcd' ]
[ id: "3", field1: 'value1', field2: '20abcd' ],
[ id: "4", field1: 'value2', field2: '30' ]
]
for (final def val : values) {
@ -294,10 +325,11 @@ class PutElasticsearchJsonTest {
runner.assertValid()
runner.run()
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4)
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 5)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
}
@Test
@ -309,6 +341,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE)[0].assertAttributeEquals(
"elasticsearch.put.error",

View File

@ -32,7 +32,6 @@ import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@ -46,9 +45,14 @@ import java.time.format.DateTimeFormatter
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
import static org.junit.jupiter.api.Assertions.*
import static org.hamcrest.CoreMatchers.containsString
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.junit.jupiter.api.Assertions.assertTrue
class PutElasticsearchRecordTest {
class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<PutElasticsearchRecord> {
private static final int DATE_YEAR = 2020
private static final int DATE_MONTH = 11
private static final int DATE_DAY = 27
@ -81,12 +85,17 @@ class PutElasticsearchRecordTest {
static final String flowFileContents = prettyPrint(toJson(flowFileContentMaps))
@Override
PutElasticsearchRecord getProcessor() {
return new PutElasticsearchRecord()
}
@BeforeEach
void setup() {
clientService = new MockBulkLoadClientService()
registry = new MockSchemaRegistry()
reader = new JsonTreeReader()
runner = TestRunners.newTestRunner(PutElasticsearchRecord.class)
runner = createRunner()
registry.addSchema("simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(SCHEMA)))
@ -141,6 +150,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
if (success > 0) {
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESS).forEach({ ff ->
@ -319,6 +329,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.clearTransferState()
@ -374,6 +385,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.clearTransferState()
@ -416,6 +428,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.clearTransferState()
@ -455,6 +468,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.clearTransferState()
@ -477,6 +491,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.clearTransferState()
@ -502,6 +517,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.clearTransferState()
@ -528,6 +544,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
}
@Test
@ -604,6 +621,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.clearTransferState()
@ -658,6 +676,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
}
@Test
@ -681,6 +700,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
}
@Test
@ -691,6 +711,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
}
@Test
@ -734,6 +755,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
@ -760,6 +782,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0)
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
@ -776,7 +799,9 @@ class PutElasticsearchRecordTest {
runner.clearTransferState()
runner.clearProvenanceEvents()
// errors still counted/logged even if not outputting to the error relationship
runner.removeProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER)
runner.setProperty(PutElasticsearchRecord.OUTPUT_ERROR_RESPONSES, "true")
runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 'errorTest' ])
runner.assertValid()
runner.run()
@ -786,10 +811,15 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1)
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
assertThat(errorResponses, containsString("not_found"))
assertThat(errorResponses, containsString("For input string: 20abc"))
assertEquals(1,
runner.getProvenanceEvents().stream().filter({
e -> ProvenanceEventType.SEND == e.getEventType() && e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [1 error(s), 4 success(es)]"
e -> ProvenanceEventType.SEND == e.getEventType() && e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3 success(es)]"
}).count()
)
}

View File

@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.6.1"));
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.7.0"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)

View File

@ -101,7 +101,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.6.1</elasticsearch_docker_image>
<elasticsearch_docker_image>8.7.0</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>