NIFI-12255 Refactored PutElasticsearchRecord and PutElasticsearchJson Relationships

- Migrated previous relationship names to be more consistent with other Processors
- Added nifi-mock objects for Relationship Migration

This closes #7940

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Chris Sampson 2023-10-25 21:08:16 +01:00 committed by exceptionfactory
parent e7567b04e2
commit 9edea70713
No known key found for this signature in database
15 changed files with 794 additions and 530 deletions

View File

@ -20,10 +20,6 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-elasticsearch-restapi-nar</artifactId>
<packaging>nar</packaging>
<properties>
<lucene.version>6.2.1</lucene.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -55,10 +55,41 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractPutElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("All flowfiles that are sent to Elasticsearch without request failures go to this relationship.")
.build();
static final Relationship REL_SUCCESSFUL = new Relationship.Builder()
.name("successful")
.description("Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that did not result in an \"error\" (within Elasticsearch) will be routed here.")
.build();
static final Relationship REL_ERRORS = new Relationship.Builder()
.name("errors")
.description("Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that resulted in an \"error\" (within Elasticsearch) will be routed here.")
.build();
static final Relationship REL_ERROR_RESPONSES = new Relationship.Builder()
.name("error_responses")
.description("Elasticsearch _bulk API responses marked as \"error\" go here " +
"(and optionally \"not_found\" when \"Treat \"Not Found\" as Success\" is \"true\").")
.build();
static final PropertyDescriptor OUTPUT_ERROR_RESPONSES = new PropertyDescriptor.Builder()
.name("put-es-output-error-responses")
.displayName("Output Error Responses")
.description("If this is enabled, response messages from Elasticsearch marked as \"error\" will be output to the \"" + REL_ERROR_RESPONSES.getName() + "\" relationship." +
"This does not impact the output of flowfiles to the \"" + REL_SUCCESSFUL.getName() + "\" or \"" + REL_ERRORS.getName() + "\" relationships")
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("put-es-record-batch-size")
.displayName("Batch Size")
.description("The preferred number of FlowFiles to send over in a single batch.")
.description("The preferred number of FlowFiles to send over in a single batch")
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
@ -75,27 +106,17 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
.required(true)
.build();
static final PropertyDescriptor OUTPUT_ERROR_RESPONSES = new PropertyDescriptor.Builder()
.name("put-es-output-error-responses")
.displayName("Output Error Responses")
.description("If this is enabled, response messages from Elasticsearch marked as \"error\" will be output to the \"error_responses\" relationship." +
"This does not impact the output of flowfiles to the \"success\" or \"errors\" relationships")
.allowableValues("true", "false")
.defaultValue("false")
static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder()
.name("put-es-not_found-is-error")
.displayName("Treat \"Not Found\" as Success")
.description("If true, \"not_found\" Elasticsearch Document associated Records will be routed to the \"" +
REL_SUCCESSFUL.getName() + "\" relationship, otherwise to the \"" + REL_ERRORS.getName() + "\" relationship. " +
"If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is \"true\" then \"not_found\" responses from Elasticsearch " +
"will be sent to the " + REL_ERROR_RESPONSES.getName() + " relationship.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All flowfiles that succeed in being transferred into Elasticsearch go here. " +
"Documents received by the Elasticsearch _bulk API may still result in errors on the Elasticsearch side. " +
"The Elasticsearch response will need to be examined to determine whether any Document(s)/Record(s) resulted in errors.")
.build();
static final Relationship REL_ERROR_RESPONSES = new Relationship.Builder()
.name("error_responses")
.description("Elasticsearch _bulk API responses marked as \"error\" go here " +
"(and optionally \"not_found\" when \"Treat \"Not Found\" as Error\" is \"true\").")
.allowableValues("true", "false")
.defaultValue("true")
.required(false)
.build();
static final List<String> ALLOWED_INDEX_OPERATIONS = Stream.of(IndexOperationRequest.Operation.values())

View File

@ -37,6 +37,8 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.RelationshipConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -85,11 +87,6 @@ import java.util.stream.Collectors;
resource = SystemResource.MEMORY,
description = "The Batch of FlowFiles will be stored in memory until the bulk operation is performed.")
public class PutElasticsearchJson extends AbstractPutElasticsearch {
static final Relationship REL_FAILED_DOCUMENTS = new Relationship.Builder()
.name("errors").description("If a \"Output Error Documents\" is set, any FlowFile(s) corresponding to Elasticsearch document(s) " +
"that resulted in an \"error\" (within Elasticsearch) will be routed here.")
.autoTerminateDefault(true).build();
static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("put-es-json-id-attr")
.displayName("Identifier Attribute")
@ -105,8 +102,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
.name("put-es-json-script")
.displayName("Script")
.description("The script for the document update/upsert. Only applies to Update/Upsert operations. " +
"Must be parsable as JSON Object. " +
"If left blank, the FlowFile content will be used for document update/upsert")
"Must be parsable as JSON Object. If left blank, the FlowFile content will be used for document update/upsert")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -129,8 +125,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
static final PropertyDescriptor DYNAMIC_TEMPLATES = new PropertyDescriptor.Builder()
.name("put-es-json-dynamic_templates")
.displayName("Dynamic Templates")
.description("The dynamic_templates for the document. Must be parsable as a JSON Object. " +
"Requires Elasticsearch 7+")
.description("The dynamic_templates for the document. Must be parsable as a JSON Object. Requires Elasticsearch 7+")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -145,40 +140,12 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
.required(true)
.build();
static final PropertyDescriptor OUTPUT_ERROR_DOCUMENTS = new PropertyDescriptor.Builder()
.name("put-es-json-error-documents")
.displayName("Output Error Documents")
.description("If this configuration property is true, the response from Elasticsearch will be examined for failed documents " +
"and the FlowFile(s) associated with the failed document(s) will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship " +
"with \"elasticsearch.bulk.error\" attributes.")
.allowableValues("true", "false")
.defaultValue("false")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder()
.name("put-es-json-not_found-is-error")
.displayName("Treat \"Not Found\" as Success")
.description("If true, \"not_found\" Elasticsearch Document associated FlowFiles will be routed to the \"" + REL_SUCCESS.getName() +
"\" relationship, otherwise to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship. " +
"If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is \"true\" then \"not_found\" responses from Elasticsearch " +
"will be sent to the " + REL_ERROR_RESPONSES.getName() + " relationship")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
.required(false)
.dependsOn(OUTPUT_ERROR_DOCUMENTS, "true")
.build();
static final List<PropertyDescriptor> DESCRIPTORS = List.of(
ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, SCRIPT, SCRIPTED_UPSERT, DYNAMIC_TEMPLATES, BATCH_SIZE, CHARSET, CLIENT_SERVICE,
LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL
ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, SCRIPT, SCRIPTED_UPSERT, DYNAMIC_TEMPLATES, BATCH_SIZE, CHARSET, CLIENT_SERVICE,
LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, NOT_FOUND_IS_SUCCESSFUL
);
static final Set<Relationship> BASE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS);
private boolean outputErrors;
Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, REL_ERRORS);
@Override
Set<Relationship> getBaseRelationships() {
@ -190,12 +157,28 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
return DESCRIPTORS;
}
@Override
public void migrateProperties(final PropertyConfiguration config) {
super.migrateProperties(config);
if (config.hasProperty("put-es-json-error-documents")) {
config.removeProperty("put-es-json-error-documents");
}
config.renameProperty("put-es-json-not_found-is-error", AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
}
@Override
public void migrateRelationships(final RelationshipConfiguration config) {
super.migrateRelationships(config);
config.renameRelationship("success", AbstractPutElasticsearch.REL_ORIGINAL.getName());
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
this.outputErrors = context.getProperty(OUTPUT_ERROR_DOCUMENTS).asBoolean();
this.notFoundIsSuccessful = context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
}
@ -221,6 +204,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
try {
final List<FlowFile> errorDocuments = indexDocuments(operations, originals, context, session);
handleResponse(context, session, errorDocuments, originals);
session.transfer(originals, REL_ORIGINAL);
} catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Routing to retry." : "Routing to failure");
@ -229,8 +213,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
transferFlowFilesOnException(ese, rel, session, true, originals.toArray(new FlowFile[0]));
} catch (final JsonProcessingException jpe) {
getLogger().warn("Could not log Elasticsearch operation errors nor determine which documents errored.", jpe);
final Relationship rel = outputErrors ? REL_FAILED_DOCUMENTS : REL_FAILURE;
transferFlowFilesOnException(jpe, rel, session, true, originals.toArray(new FlowFile[0]));
transferFlowFilesOnException(jpe, REL_ERRORS, session, true, originals.toArray(new FlowFile[0]));
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
transferFlowFilesOnException(ex, REL_FAILURE, session, false, originals.toArray(new FlowFile[0]));
@ -294,21 +277,19 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
final IndexOperationResponse response = clientService.get().bulk(operations, getRequestURLParameters(dynamicProperties));
final Map<Integer, Map<String, Object>> errors = findElasticsearchResponseErrors(response);
final List<FlowFile> errorDocuments = outputErrors ? new ArrayList<>(errors.size()) : Collections.emptyList();
if (outputErrors) {
errors.forEach((index, error) -> {
String errorMessage;
try {
errorMessage = mapper.writeValueAsString(error);
} catch (JsonProcessingException e) {
errorMessage = String.format(
"{\"error\": {\"type\": \"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
e.getMessage().replace("\"", "\\\"")
);
}
errorDocuments.add(session.putAttribute(originals.get(index), "elasticsearch.bulk.error", errorMessage));
});
}
final List<FlowFile> errorDocuments = new ArrayList<>(errors.size());
errors.forEach((index, error) -> {
String errorMessage;
try {
errorMessage = mapper.writeValueAsString(error);
} catch (JsonProcessingException e) {
errorMessage = String.format(
"{\"error\": {\"type\": \"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
e.getMessage().replace("\"", "\\\"")
);
}
errorDocuments.add(session.putAttribute(originals.get(index), "elasticsearch.bulk.error", errorMessage));
});
if (!errors.isEmpty()) {
handleElasticsearchDocumentErrors(errors, session, null);
@ -318,8 +299,10 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
}
private void handleResponse(final ProcessContext context, final ProcessSession session, final List<FlowFile> errorDocuments, final List<FlowFile> originals) {
session.transfer(errorDocuments, REL_FAILED_DOCUMENTS);
errorDocuments.forEach(e ->
// clone FlowFiles to be transferred to errors/successful as the originals are pass through to REL_ORIGINAL
final List<FlowFile> copiedErrors = errorDocuments.stream().map(session::clone).collect(Collectors.toList());
session.transfer(copiedErrors, REL_ERRORS);
copiedErrors.forEach(e ->
session.getProvenanceReporter().send(
e,
clientService.get().getTransitUrl(
@ -330,8 +313,8 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
)
);
final List<FlowFile> successfulDocuments = originals.stream().filter(f -> !errorDocuments.contains(f)).collect(Collectors.toList());
session.transfer(successfulDocuments, REL_SUCCESS);
final List<FlowFile> successfulDocuments = originals.stream().filter(f -> !errorDocuments.contains(f)).map(session::clone).collect(Collectors.toList());
session.transfer(successfulDocuments, REL_SUCCESSFUL);
successfulDocuments.forEach(s ->
session.getProvenanceReporter().send(
s,

View File

@ -38,6 +38,8 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.RelationshipConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -116,16 +118,6 @@ import java.util.concurrent.atomic.AtomicLong;
resource = SystemResource.MEMORY,
description = "The Batch of Records will be stored in memory until the bulk operation is performed.")
public class PutElasticsearchRecord extends AbstractPutElasticsearch {
static final Relationship REL_FAILED_RECORDS = new Relationship.Builder()
.name("errors").description("If a \"Result Record Writer\" is set, any Record(s) corresponding to Elasticsearch document(s) " +
"that resulted in an \"error\" (within Elasticsearch) will be routed here.")
.autoTerminateDefault(true).build();
static final Relationship REL_SUCCESSFUL_RECORDS = new Relationship.Builder()
.name("successful_records").description("If a \"Result Record Writer\" is set, any Record(s) corresponding to Elasticsearch document(s) " +
"that did not result in an \"error\" (within Elasticsearch) will be routed here.")
.autoTerminateDefault(true).build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("put-es-record-reader")
.displayName("Record Reader")
@ -260,37 +252,22 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
static final PropertyDescriptor RESULT_RECORD_WRITER = new PropertyDescriptor.Builder()
.name("put-es-record-error-writer")
.displayName("Result Record Writer")
.description("If this configuration property is set, the response from Elasticsearch will be examined for failed records " +
.description("The response from Elasticsearch will be examined for failed records " +
"and the failed records will be written to a record set with this record writer service and sent to the \"" +
REL_FAILED_RECORDS.getName() + "\" relationship. Successful records will be written to a record set " +
"with this record writer service and sent to the \"" + REL_SUCCESSFUL_RECORDS.getName() + "\" relationship.")
REL_ERRORS.getName() + "\" relationship. Successful records will be written to a record set " +
"with this record writer service and sent to the \"" + REL_SUCCESSFUL.getName() + "\" relationship.")
.identifiesControllerService(RecordSetWriterFactory.class)
.addValidator(Validator.VALID)
.required(false)
.build();
static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder()
.name("put-es-record-not_found-is-error")
.displayName("Treat \"Not Found\" as Success")
.description("If true, \"not_found\" Elasticsearch Document associated Records will be routed to the \"" +
REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. " +
"If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is \"true\" then \"not_found\" responses from Elasticsearch " +
"will be sent to the " + REL_ERROR_RESPONSES.getName() + " relationship.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
.required(false)
.dependsOn(RESULT_RECORD_WRITER)
.required(true)
.build();
static final PropertyDescriptor GROUP_BULK_ERRORS_BY_TYPE = new PropertyDescriptor.Builder()
.name("put-es-record-bulk-error-groups")
.displayName("Group Results by Bulk Error Type")
.description("If this configuration property is set, the response from Elasticsearch will be examined for _bulk errors. " +
"The failed records written to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship will be grouped by error type " +
.description("The errored records written to the \"" + REL_ERRORS.getName() + "\" relationship will be grouped by error type " +
"and the error related to the first record within the FlowFile added to the FlowFile as \"elasticsearch.bulk.error\". " +
"If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() +"\" is \"false\" then records associated with \"not_found\" " +
"Elasticsearch document responses will also be send to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship.")
"Elasticsearch document responses will also be send to the \"" + REL_ERRORS.getName() + "\" relationship.")
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
@ -343,7 +320,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
GROUP_BULK_ERRORS_BY_TYPE
);
static final Set<Relationship> BASE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS);
Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_ERRORS, REL_SUCCESSFUL);
private static final String OUTPUT_TYPE_SUCCESS = "success";
private static final String OUTPUT_TYPE_ERROR = "error";
@ -369,6 +346,25 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
return DESCRIPTORS;
}
@Override
public void migrateProperties(final PropertyConfiguration config) {
super.migrateProperties(config);
config.renameProperty("put-es-record-not_found-is-error", AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
if (config.getPropertyValue(RESULT_RECORD_WRITER).isEmpty()) {
final String resultRecordWriterId = config.createControllerService("org.apache.nifi.json.JsonRecordSetWriter", Collections.emptyMap());
config.setProperty(RESULT_RECORD_WRITER, resultRecordWriterId);
}
}
@Override
public void migrateRelationships(final RelationshipConfiguration config) {
super.migrateRelationships(config);
config.renameRelationship("success", AbstractPutElasticsearch.REL_ORIGINAL.getName());
config.renameRelationship("successful_records", AbstractPutElasticsearch.REL_SUCCESSFUL.getName());
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
@ -463,12 +459,12 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
stopWatch.getDuration(TimeUnit.MILLISECONDS)
);
input = session.putAllAttributes(input, new HashMap<String, String>() {{
input = session.putAllAttributes(input, new HashMap<>() {{
put("elasticsearch.put.error.count", String.valueOf(erroredRecords.get()));
put("elasticsearch.put.success.count", String.valueOf(successfulRecords.get()));
}});
session.transfer(input, REL_SUCCESS);
session.transfer(input, REL_ORIGINAL);
}
private void addOperation(final List<IndexOperationRequest> operationList, final Record record, final IndexOperationParameters indexOperationParameters,
@ -512,9 +508,9 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
final ResponseDetails responseDetails = indexDocuments(bundle, session, input, requestParameters);
successfulRecords.getAndAdd(responseDetails.getSuccessCount());
erroredRecords.getAndAdd(responseDetails.getErrorCount());
resultRecords.addAll(responseDetails.getOutputs().values().stream().map(Output::getFlowFile).toList());
successfulRecords.getAndAdd(responseDetails.successCount());
erroredRecords.getAndAdd(responseDetails.errorCount());
resultRecords.addAll(responseDetails.outputs().values().stream().map(Output::getFlowFile).toList());
operationList.clear();
originals.clear();
@ -541,47 +537,45 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
final Map<String, Output> outputs = new HashMap<>();
if (writerFactory != null) {
try {
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
final String type;
final Relationship relationship;
final Map<String, Object> error;
if (errors.containsKey(o)) {
relationship = REL_FAILED_RECORDS;
error = errors.get(o);
if (groupBulkErrors) {
if (isElasticsearchNotFound().test(error)) {
type = OUTPUT_TYPE_NOT_FOUND;
} else {
type = getErrorType(error);
}
try {
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
final String type;
final Relationship relationship;
final Map<String, Object> error;
if (errors.containsKey(o)) {
relationship = REL_ERRORS;
error = errors.get(o);
if (groupBulkErrors) {
if (isElasticsearchNotFound().test(error)) {
type = OUTPUT_TYPE_NOT_FOUND;
} else {
type = OUTPUT_TYPE_ERROR;
type = getErrorType(error);
}
} else {
relationship = REL_SUCCESSFUL_RECORDS;
error = null;
type = OUTPUT_TYPE_SUCCESS;
type = OUTPUT_TYPE_ERROR;
}
final Output output = getOutputByType(outputs, type, session, relationship, input, bundle.getSchema());
output.write(bundle.getOriginalRecords().get(o), error);
} else {
relationship = REL_SUCCESSFUL;
error = null;
type = OUTPUT_TYPE_SUCCESS;
}
for (final Output output : outputs.values()) {
output.transfer(session);
}
} catch (final IOException | SchemaNotFoundException ex) {
getLogger().error("Unable to write error/successful records", ex);
outputs.values().forEach(o -> {
try {
o.remove(session);
} catch (IOException ioe) {
getLogger().warn("Error closing RecordSetWriter for FlowFile", ioe);
}
});
throw ex;
final Output output = getOutputByType(outputs, type, session, relationship, input, bundle.getSchema());
output.write(bundle.getOriginalRecords().get(o), error);
}
for (final Output output : outputs.values()) {
output.transfer(session);
}
} catch (final IOException | SchemaNotFoundException ex) {
getLogger().error("Unable to write error/successful records", ex);
outputs.values().forEach(o -> {
try {
o.remove(session);
} catch (IOException ioe) {
getLogger().warn("Error closing RecordSetWriter for FlowFile", ioe);
}
});
throw ex;
}
return new ResponseDetails(outputs, numSuccessful, numErrors);
@ -734,29 +728,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
}
}
private static class ResponseDetails {
final Map<String, Output> outputs;
final int errorCount;
final int successCount;
ResponseDetails(final Map<String, Output> outputs, final int successCount, final int errorCount) {
this.outputs = outputs;
this.successCount = successCount;
this.errorCount = errorCount;
}
public Map<String, Output> getOutputs() {
return outputs;
}
public int getErrorCount() {
return errorCount;
}
public int getSuccessCount() {
return successCount;
}
}
private record ResponseDetails(Map<String, Output> outputs, int successCount, int errorCount) {}
private String determineDateFormat(final RecordFieldType recordFieldType) {
return switch (recordFieldType) {

View File

@ -16,16 +16,44 @@
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class AbstractPutElasticsearchTest<P extends AbstractPutElasticsearch> {
static final String TEST_COMMON_DIR = "src/test/resources/common";
public abstract Class<? extends AbstractPutElasticsearch> getTestProcessor();
MockBulkLoadClientService clientService;
TestRunner runner;
@BeforeEach
public void setup() throws Exception {
runner = TestRunners.newTestRunner(getTestProcessor());
clientService = new MockBulkLoadClientService();
clientService.setResponse(new IndexOperationResponse(1500));
runner.addControllerService("clientService", clientService);
runner.setProperty(AbstractPutElasticsearch.CLIENT_SERVICE, "clientService");
runner.enableControllerService(clientService);
runner.setProperty(AbstractPutElasticsearch.INDEX_OP, IndexOperationRequest.Operation.Index.getValue());
runner.setProperty(AbstractPutElasticsearch.INDEX, "test_index");
runner.setProperty(AbstractPutElasticsearch.TYPE, "test_type");
runner.setProperty(AbstractPutElasticsearch.LOG_ERROR_RESPONSES, "false");
runner.setProperty(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL, "true");
runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES, "false");
}
@Test
public void testOutputErrorResponsesRelationship() {
final TestRunner runner = TestRunners.newTestRunner(getTestProcessor());

View File

@ -19,11 +19,11 @@ package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.RelationshipMigrationResult;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -38,18 +38,16 @@ import java.util.Map;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutElasticsearchJson> {
private static final String TEST_DIR = "src/test/resources/PutElasticsearchJsonTest";
private static final String TEST_COMMON_DIR = "src/test/resources/common";
private static final Path BATCH_WITH_ERROR = Paths.get(TEST_DIR,"batchWithError.json");
private static String script;
private static String dynamicTemplates;
private MockBulkLoadClientService clientService;
private TestRunner runner;
private static String flowFileContents;
private static String sampleErrorResponse;
private static Map<String, Object> expectedScript;
@ -66,6 +64,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
flowFileContents = JsonUtils.readString(Paths.get(TEST_DIR, "flowFileContents.json"));
script = JsonUtils.readString(Paths.get(TEST_DIR,"script.json"));
dynamicTemplates = JsonUtils.readString(Paths.get(TEST_COMMON_DIR,"dynamicTemplates.json"));
expectedScript = new LinkedHashMap<>();
expectedScript.put("_source", "some script");
expectedScript.put("language", "painless");
@ -75,31 +74,19 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
yourField.put("type", "text");
yourField.put("keyword", Collections.singletonMap("type", "text"));
expectedDynamicTemplate.put("your_field", yourField);
}
@BeforeEach
public void setup() throws Exception {
runner = TestRunners.newTestRunner(getTestProcessor());
clientService = new MockBulkLoadClientService();
clientService.setResponse(new IndexOperationResponse(1500));
runner.addControllerService("clientService", clientService);
super.setup();
runner.setProperty(PutElasticsearchJson.ID_ATTRIBUTE, "doc_id");
runner.setProperty(PutElasticsearchJson.INDEX_OP, IndexOperationRequest.Operation.Index.getValue());
runner.setProperty(PutElasticsearchJson.INDEX, "test_index");
runner.setProperty(PutElasticsearchJson.TYPE, "test_type");
runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "false");
runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false");
runner.setProperty(PutElasticsearchJson.CLIENT_SERVICE, "clientService");
runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, "true");
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "false");
runner.enableControllerService(clientService);
runner.setProperty(AbstractPutElasticsearch.BATCH_SIZE, "1");
runner.assertValid();
}
public void basicTest(final int failure, final int retry, final int success) {
void basicTest(final int failure, final int retry, final int successful) {
final Consumer<List<IndexOperationRequest>> consumer = (final List<IndexOperationRequest> items) -> {
final long nullIdCount = items.stream().filter(item -> item.getId() == null).count();
final long indexCount = items.stream().filter(item -> "test_index".equals(item.getIndex())).count();
@ -120,16 +107,16 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
assertEquals(1L, emptyHeaderFields);
};
basicTest(failure, retry, success, consumer, null);
basicTest(failure, retry, successful, consumer);
}
public void basicTest(final int failure, final int retry, final int success, final Consumer<List<IndexOperationRequest>> consumer, final Map<String, String> attr) {
void basicTest(final int failure, final int retry, final int successful, final Consumer<List<IndexOperationRequest>> consumer) {
clientService.setEvalConsumer(consumer);
basicTest(failure, retry, success, attr);
basicTest(failure, retry, successful, Collections.emptyMap());
}
public void basicTest(final int failure, final int retry, final int success, final Map<String, String> attr) {
if (attr != null) {
void basicTest(final int failure, final int retry, final int successful, final Map<String, String> attr) {
if (attr != null && !attr.isEmpty()) {
runner.enqueue(flowFileContents, attr);
} else {
runner.enqueue(flowFileContents);
@ -137,26 +124,64 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
runner.run();
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, failure);
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, retry);
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, success);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, failure);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, retry);
// for the "basic test"s, all original FlowFiles should be successful
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, successful);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, successful);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
assertEquals(success, runner.getProvenanceEvents().stream()
assertEquals(successful, runner.getProvenanceEvents().stream()
.filter(e -> ProvenanceEventType.SEND == e.getEventType() && e.getDetails() == null)
.count());
}
@Test
public void simpleTest() {
void testMigrateProperties() {
runner.removeProperty(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL);
runner.setProperty("put-es-json-not_found-is-error", "true");
runner.setProperty("put-es-json-error-documents", "true");
runner.assertValid();
final PropertyMigrationResult result = runner.migrateProperties();
runner.assertValid();
assertEquals("true", runner.getProcessContext().getProperty(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL).getValue());
assertTrue(runner.getProcessContext().getProperties().keySet().stream().noneMatch(pd -> "put-es-json-not_found-is-error".equals(pd.getName())));
assertTrue(runner.getProcessContext().getProperties().keySet().stream().noneMatch(pd -> "put-es-json-error-documents".equals(pd.getName())));
assertEquals(1, result.getPropertiesRenamed().size());
assertEquals(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName(), result.getPropertiesRenamed().get("put-es-json-not_found-is-error"));
assertEquals(1, result.getPropertiesRemoved().size());
assertTrue(result.getPropertiesRemoved().contains("put-es-json-error-documents"));
assertEquals(0, result.getPropertiesUpdated().size());
}
@Test
void testMigrateRelationships() {
runner.addConnection("success");
assertFalse(runner.getProcessContext().hasConnection(AbstractPutElasticsearch.REL_ORIGINAL));
final RelationshipMigrationResult result = runner.migrateRelationships();
assertTrue(runner.getProcessContext().hasConnection(AbstractPutElasticsearch.REL_ORIGINAL));
assertTrue(((MockProcessContext) runner.getProcessContext()).getAllRelationships().stream().noneMatch(r -> "success".equals(r.getName())));
assertEquals(1, result.getRenamedRelationships().size());
assertEquals(AbstractPutElasticsearch.REL_ORIGINAL.getName(), result.getRenamedRelationships().get("success"));
assertEquals(0, result.getPreviousRelationships().size());
}
@Test
void simpleTest() {
clientService.setEvalParametersConsumer((Map<String, String> params) -> assertTrue(params.isEmpty()));
basicTest(0, 0, 1);
}
@Test
public void simpleTestWithDocIdAndRequestParametersAndBulkHeaders() {
void simpleTestWithDocIdAndRequestParametersAndBulkHeaders() {
runner.setProperty("refresh", "true");
runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "routing", "1");
runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "version", "${version}");
@ -196,7 +221,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
}
@Test
public void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() {
void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() {
runner.setProperty("refresh", "true");
runner.setProperty("slices", "${slices}");
runner.setEnvironmentVariableValue("blank", " ");
@ -232,7 +257,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
}
@Test
public void simpleTestWithScriptAndDynamicTemplates() {
void simpleTestWithScriptAndDynamicTemplates() {
runner.setProperty(PutElasticsearchJson.SCRIPT, script);
runner.setProperty(PutElasticsearchJson.DYNAMIC_TEMPLATES, dynamicTemplates);
final Consumer<List<IndexOperationRequest>> consumer = (final List<IndexOperationRequest> items) -> {
@ -243,11 +268,11 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
assertEquals(1L, falseScriptedUpsertCount);
assertEquals(1L, dynamicTemplatesCount);
};
basicTest(0, 0, 1, consumer, null);
basicTest(0, 0, 1, consumer);
}
@Test
public void simpleTestWithScriptedUpsert() {
void simpleTestWithScriptedUpsert() {
runner.setProperty(PutElasticsearchJson.SCRIPT, script);
runner.setProperty(PutElasticsearchJson.DYNAMIC_TEMPLATES, dynamicTemplates);
runner.setProperty(PutElasticsearchJson.INDEX_OP, IndexOperationRequest.Operation.Upsert.getValue().toLowerCase());
@ -261,11 +286,11 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
assertEquals(1L, trueScriptedUpsertCount);
assertEquals(1L, dynamicTemplatesCount);
};
basicTest(0, 0, 1, consumer, null);
basicTest(0, 0, 1, consumer);
}
@Test
public void testNonJsonScript() {
void testNonJsonScript() {
runner.setProperty(PutElasticsearchJson.SCRIPT, "not-json");
runner.setProperty(PutElasticsearchJson.INDEX_OP, IndexOperationRequest.Operation.Upsert.getValue().toLowerCase());
runner.setProperty(PutElasticsearchJson.SCRIPTED_UPSERT, "true");
@ -277,19 +302,19 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
}
@Test
public void testFatalError() {
void testFatalError() {
clientService.setThrowFatalError(true);
basicTest(1, 0, 0);
}
@Test
public void testRetriable() {
void testRetriable() {
clientService.setThrowRetriableError(true);
basicTest(0, 1, 0);
}
@Test
public void testInvalidIndexOperation() {
void testInvalidIndexOperation() {
runner.setProperty(PutElasticsearchJson.INDEX_OP, "not-valid");
runner.assertNotValid();
final AssertionError ae = assertThrows(AssertionError.class, () -> runner.run());
@ -300,26 +325,27 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
runner.assertValid();
runner.enqueue(flowFileContents, Collections.singletonMap("operation", "not-valid2"));
runner.run();
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testInputRequired() {
void testInputRequired() {
runner.run();
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testBatchingAndErrorRelationshipNotFoundSuccessful() throws Exception {
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "true");
void testBatchingAndErrorRelationshipNotFoundSuccessful() throws Exception {
runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "true");
runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "100");
runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, "true");
@ -329,25 +355,26 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
runner.assertValid();
runner.run();
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4);
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 3);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 7);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 4);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 3);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).getFirst();
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst();
assertTrue(failedDoc.getContent().contains("20abcd"));
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
assertTrue(failedDoc.getAttribute("elasticsearch.bulk.error").contains("mapper_parsing_exception"));
failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).get(1);
failedDoc = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1);
assertTrue(failedDoc.getContent().contains("213,456.9"));
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
assertTrue(failedDoc.getAttribute("elasticsearch.bulk.error").contains("mapper_parsing_exception"));
failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).get(2);
failedDoc = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(2);
assertTrue(failedDoc.getContent().contains("unit test"));
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
@ -355,11 +382,12 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
assertEquals(3, runner.getProvenanceEvents().stream().filter(
e -> ProvenanceEventType.SEND == e.getEventType() && "Elasticsearch _bulk operation error".equals(e.getDetails())).count());
assertEquals(4, runner.getProvenanceEvents().stream().filter(
e -> ProvenanceEventType.SEND == e.getEventType() && null == e.getDetails()).count());
}
@Test
public void testBatchingAndErrorRelationshipNotFoundNotSuccessful() throws Exception {
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "true");
void testBatchingAndErrorRelationshipNotFoundNotSuccessful() throws Exception {
runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "true");
runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "100");
runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, "false");
@ -370,37 +398,38 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
runner.assertValid();
runner.run();
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 3);
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 4);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 7);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 3);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 4);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 1);
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).getFirst();
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst();
assertTrue(failedDoc.getContent().contains("not_found"));
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
assertTrue(failedDoc.getAttribute("elasticsearch.bulk.error").contains("not_found"));
failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).get(1);
failedDoc = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1);
assertTrue(failedDoc.getContent().contains("20abcd"));
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
assertTrue(failedDoc.getAttribute("elasticsearch.bulk.error").contains("number_format_exception"));
failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).get(2);
failedDoc = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(2);
assertTrue(failedDoc.getContent().contains("213,456.9"));
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
assertTrue(failedDoc.getAttribute("elasticsearch.bulk.error").contains("mapper_parsing_exception"));
failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).get(3);
failedDoc = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(3);
assertTrue(failedDoc.getContent().contains("unit test"));
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
assertTrue(failedDoc.getAttribute("elasticsearch.bulk.error").contains("some_other_exception"));
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES).getFirst().getContent();
final String errorResponses = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERROR_RESPONSES).getFirst().getContent();
assertTrue(errorResponses.contains("not_found"));
assertTrue(errorResponses.contains("For input string: 20abc"));
assertTrue(errorResponses.contains("For input string: 213,456.9"));
@ -408,13 +437,15 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
assertEquals(4, runner.getProvenanceEvents().stream().filter( e ->
ProvenanceEventType.SEND == e.getEventType() && "Elasticsearch _bulk operation error".equals(e.getDetails())).count());
assertEquals(3, runner.getProvenanceEvents().stream().filter(
e -> ProvenanceEventType.SEND == e.getEventType() && null == e.getDetails()).count());
}
@Test
public void testBatchingAndNoErrorOutput() throws Exception {
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "false");
void testBatchingAndNoErrorOutput() throws Exception {
runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false");
runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "100");
runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "false");
clientService.setResponse(IndexOperationResponse.fromJsonResponse(sampleErrorResponse));
for (final String val : JsonUtils.readListOfMapsAsIndividualJson(JsonUtils.readString(Paths.get(TEST_DIR, "batchWithoutError.json")))) {
runner.enqueue(val);
@ -423,23 +454,25 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
runner.assertValid();
runner.run();
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 5);
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 7);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 4);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 3);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testInvalidInput() {
void testInvalidInput() {
runner.enqueue("not-json");
runner.run();
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE).getFirst();
assertTrue(flowFile.getAttribute("elasticsearch.put.error").contains("not"));

View File

@ -22,18 +22,19 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockSchemaRegistry;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.RelationshipMigrationResult;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -56,6 +57,7 @@ import java.util.Map;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -71,7 +73,6 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
private static final LocalDate LOCAL_DATE = LocalDate.of(DATE_YEAR, DATE_MONTH, DATE_DAY);
private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE, TIME_SECOND);
private static final String TEST_DIR = "src/test/resources/PutElasticsearchRecordTest";
private static final String TEST_COMMON_DIR = "src/test/resources/common";
private static final String RECORD_PATH_TEST_SCHEMA = "recordPathTest";
private static final String DATE_TIME_FORMATTING_TEST_SCHEMA = "dateTimeFormattingTest";
private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
@ -81,9 +82,9 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
private static RecordSchema recordPathTestSchema;
private static RecordSchema dateTimeFormattingTestSchema;
private static RecordSchema errorTestSchema;
private MockBulkLoadClientService clientService;
private MockSchemaRegistry registry;
private TestRunner runner;
private JsonRecordSetWriter writer;
@Override
public Class<? extends AbstractPutElasticsearch> getTestProcessor() {
@ -101,32 +102,36 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
@BeforeEach
public void setup() throws Exception {
clientService = new MockBulkLoadClientService();
clientService.setResponse(new IndexOperationResponse(1500));
super.setup();
registry = new MockSchemaRegistry();
registry.addSchema("simple", simpleSchema);
final RecordReaderFactory reader = new JsonTreeReader();
runner = TestRunners.newTestRunner(getTestProcessor());
runner.addControllerService("registry", registry);
runner.assertValid(registry);
runner.enableControllerService(registry);
final RecordReaderFactory reader = new JsonTreeReader();
runner.addControllerService("reader", reader);
runner.addControllerService("clientService", clientService);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader");
runner.setProperty(PutElasticsearchRecord.INDEX_OP, IndexOperationRequest.Operation.Index.getValue());
runner.setProperty(PutElasticsearchRecord.INDEX, "test_index");
runner.setProperty(PutElasticsearchRecord.TYPE, "test_type");
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "test_timestamp");
runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService");
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, "true");
runner.enableControllerService(registry);
runner.assertValid(reader);
runner.enableControllerService(reader);
runner.enableControllerService(clientService);
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader");
writer = new JsonRecordSetWriter();
runner.addControllerService("writer", writer);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
runner.assertValid(writer);
runner.enableControllerService(writer);
runner.setProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER, "writer");
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "test_timestamp");
runner.assertValid();
}
public void basicTest(final int failure, final int retry, final int success) {
void basicTest(final int failure, final int retry, final int successful) {
final Consumer<List<IndexOperationRequest>> consumer = (final List<IndexOperationRequest> items) -> {
final long timestampDefaultCount = items.stream().filter(item -> "test_timestamp".equals(item.getFields().get("@timestamp"))).count();
final long indexCount = items.stream().filter(item -> "test_index".equals(item.getIndex())).count();
@ -146,45 +151,106 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
assertEquals(2, emptyHeaderFields);
};
basicTest(failure, retry, success, consumer);
basicTest(failure, retry, successful, consumer);
}
public void basicTest(final int failure, final int retry, final int success, final Consumer<List<IndexOperationRequest>> consumer) {
basicTest(failure, retry, success, consumer, null);
void basicTest(final int failure, final int retry, final int successful, final Consumer<List<IndexOperationRequest>> consumer) {
basicTest(failure, retry, successful, consumer, null);
}
public void basicTest(final int failure, final int retry, final int success, final Consumer<List<IndexOperationRequest>> consumer, final Map<String, String> attributes) {
void basicTest(final int failure, final int retry, final int successful, final Consumer<List<IndexOperationRequest>> consumer, final Map<String, String> attributes) {
clientService.setEvalConsumer(consumer);
runner.enqueue(flowFileContentMaps, attributes != null && !attributes.isEmpty() ? attributes : Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, "simple"));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, retry);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, failure);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, retry);
// for the "basic test"s, all original FlowFiles should be successful
runner.assertTransferCount(PutElasticsearchJson.REL_ORIGINAL, successful);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, successful);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
if (success > 0) {
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESS).forEach(ff -> {
if (successful > 0) {
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ORIGINAL).forEach(ff -> {
ff.assertAttributeEquals("elasticsearch.put.success.count", "0");
ff.assertAttributeEquals("elasticsearch.put.error.count", "0");});
ff.assertAttributeEquals("elasticsearch.put.error.count", "0");
});
assertEquals(success,
assertEquals(successful,
runner.getProvenanceEvents().stream().filter(
e -> ProvenanceEventType.SEND.equals(e.getEventType()) && "1 Elasticsearch _bulk operation batch(es) [0 error(s), 0 success(es)]".equals(e.getDetails()))
.count());
assertEquals(successful,
runner.getProvenanceEvents().stream().filter(
e -> ProvenanceEventType.FORK.equals(e.getEventType()) && e.getParentUuids().contains(
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ORIGINAL).getFirst().getAttribute("uuid")
)
).count());
}
}
@Test
public void simpleTest() {
void testMigrateProperties() {
runner.removeProperty(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL);
runner.setProperty("put-es-record-not_found-is-error", "true");
runner.disableControllerService(writer);
runner.removeProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER);
runner.removeControllerService(writer);
runner.assertNotValid();
final PropertyMigrationResult result = runner.migrateProperties();
runner.assertValid();
assertEquals("true", runner.getProcessContext().getProperty(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL).getValue());
assertTrue(runner.getProcessContext().getProperties().keySet().stream().noneMatch(pd -> "put-es-record-not_found-is-error".equals(pd.getName())));
assertTrue(runner.getProcessContext().getProperties().containsKey(PutElasticsearchRecord.RESULT_RECORD_WRITER));
final RecordSetWriterFactory writer = runner.getControllerService(
runner.getProcessContext().getProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER.getName()).getValue(),
RecordSetWriterFactory.class
);
assertNotNull(writer);
assertTrue(runner.isControllerServiceEnabled(writer));
assertEquals(1, result.getPropertiesRenamed().size());
assertEquals(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName(), result.getPropertiesRenamed().get("put-es-record-not_found-is-error"));
assertEquals(0, result.getPropertiesRemoved().size());
assertEquals(1, result.getPropertiesUpdated().size());
assertTrue(result.getPropertiesUpdated().contains(PutElasticsearchRecord.RESULT_RECORD_WRITER.getName()));
}
@Test
void testMigrateRelationships() {
runner.addConnection("success");
assertFalse(runner.getProcessContext().hasConnection(AbstractPutElasticsearch.REL_ORIGINAL));
runner.addConnection("successful_records");
assertFalse(runner.getProcessContext().hasConnection(AbstractPutElasticsearch.REL_SUCCESSFUL));
final RelationshipMigrationResult result = runner.migrateRelationships();
assertTrue(runner.getProcessContext().hasConnection(AbstractPutElasticsearch.REL_ORIGINAL));
assertTrue(((MockProcessContext) runner.getProcessContext()).getAllRelationships().stream().noneMatch(r -> "success".equals(r.getName())));
assertTrue(runner.getProcessContext().hasConnection(AbstractPutElasticsearch.REL_SUCCESSFUL));
assertTrue(((MockProcessContext) runner.getProcessContext()).getAllRelationships().stream().noneMatch(r -> "successful_records".equals(r.getName())));
assertEquals(2, result.getRenamedRelationships().size());
assertEquals(AbstractPutElasticsearch.REL_ORIGINAL.getName(), result.getRenamedRelationships().get("success"));
assertEquals(AbstractPutElasticsearch.REL_SUCCESSFUL.getName(), result.getRenamedRelationships().get("successful_records"));
assertEquals(0, result.getPreviousRelationships().size());
}
@Test
void simpleTest() {
clientService.setEvalParametersConsumer((Map<String, String> params) -> assertTrue(params.isEmpty()));
basicTest(0, 0, 1);
}
@Test
public void simpleTestCoercedDefaultTimestamp() {
void simpleTestCoercedDefaultTimestamp() {
final Consumer<List<IndexOperationRequest>> consumer = (List<IndexOperationRequest> items) ->
assertEquals(2L, items.stream().filter(item -> Long.valueOf(100).equals(item.getFields().get("@timestamp"))).count());
@ -193,7 +259,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
}
@Test
public void simpleTestWithRequestParametersAndBulkHeaders() {
void simpleTestWithRequestParametersAndBulkHeaders() {
runner.setProperty("another", "${blank}");
runner.setEnvironmentVariableValue("slices", "auto");
runner.setEnvironmentVariableValue("version", "/version");
@ -201,7 +267,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
}
@Test
public void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() {
void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() {
final Map<String, String> attributes = new LinkedHashMap<>();
attributes.put(SCHEMA_NAME_ATTRIBUTE, "simple");
attributes.put("version", "/version");
@ -211,7 +277,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
}
@Test
public void simpleTestWithMockReader() throws Exception{
void simpleTestWithMockReader() throws Exception{
final MockRecordParser mockReader = new MockRecordParser();
mockReader.addSchemaField("msg", RecordFieldType.STRING);
mockReader.addSchemaField("from", RecordFieldType.STRING);
@ -226,19 +292,19 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
}
@Test
public void testFatalError() {
void testFatalError() {
clientService.setThrowFatalError(true);
basicTest(1, 0, 0);
}
@Test
public void testRetriable() {
void testRetriable() {
clientService.setThrowRetriableError(true);
basicTest(0, 1, 0);
}
@Test
public void testRecordPathFeatures() throws Exception {
void testRecordPathFeatures() throws Exception {
final Map<String, Object> script =
JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR, "script.json")));
final Map<String, Object> dynamicTemplates =
@ -302,16 +368,16 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.enqueue(flowFileContents, Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testTimestampDateFormatAndScriptRecordPath() throws Exception {
void testTimestampDateFormatAndScriptRecordPath() throws Exception {
final Map<String, Object> script =
JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR, "script.json")));
clientService.setEvalConsumer((final List<IndexOperationRequest> items) -> {
@ -360,16 +426,16 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
flowFileContents = flowFileContents.replaceFirst("\\d{13}", String.valueOf(Date.valueOf(LOCAL_DATE).getTime()));
runner.enqueue(flowFileContents, attributes);
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testNullRecordPaths() throws Exception {
void testNullRecordPaths() throws Exception {
clientService.setEvalConsumer((final List<IndexOperationRequest> items) -> {
final long nullTypeCount = items.stream().filter(item -> item.getType() == null).count();
final long messageTypeCount = items.stream().filter(item -> "message".equals(item.getType())).count();
@ -395,16 +461,16 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
flowFileContents = flowFileContents.replaceFirst("\\d{8}", String.valueOf(Time.valueOf(LOCAL_TIME).getTime()));
runner.enqueue(flowFileContents, Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testIndexOperationRecordPath() throws Exception {
void testIndexOperationRecordPath() throws Exception {
clientService.setEvalConsumer((final List<IndexOperationRequest> items) -> {
final long index = items.stream().filter(item -> IndexOperationRequest.Operation.Index.equals(item.getOperation())).count();
final long create = items.stream().filter(item -> IndexOperationRequest.Operation.Create.equals(item.getOperation())).count();
@ -428,16 +494,16 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.removeProperty(PutElasticsearchRecord.AT_TIMESTAMP);
runner.enqueue(Paths.get(TEST_DIR, "4_flowFileContents.json"), Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testIncompatibleTimestampRecordPath() throws Exception {
void testIncompatibleTimestampRecordPath() throws Exception {
clientService.setEvalConsumer((final List<IndexOperationRequest> items) -> {
final long timestampCount = items.stream().filter(item -> "Hello".equals(item.getFields().get("@timestamp"))).count();
assertEquals(1, timestampCount);
@ -447,22 +513,22 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/msg");
runner.enqueue(Paths.get(TEST_DIR, "5_flowFileContents.json"), Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testNotFoundELRecordPaths() throws Exception {
void testNotFoundELRecordPaths() throws Exception {
testInvalidELRecordPaths("${id_not_exist}", "${not_exist}",
Paths.get(TEST_DIR, "6_flowFileContents.json"), Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA));
}
@Test
public void testEmptyELRecordPaths() throws Exception {
void testEmptyELRecordPaths() throws Exception {
final Map<String, String> attributes = new LinkedHashMap<>();
attributes.put(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA);
attributes.put("will_be_empty", "/empty");
@ -471,24 +537,24 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
}
@Test
public void testInvalidDynamicTemplatesRecordPath() throws Exception {
void testInvalidDynamicTemplatesRecordPath() throws Exception {
registry.addSchema(RECORD_PATH_TEST_SCHEMA, recordPathTestSchema);
runner.setProperty(PutElasticsearchRecord.DYNAMIC_TEMPLATES_RECORD_PATH, "/dynamic_templates");
runner.enqueue(Paths.get(TEST_DIR, "8_flowFileContents.json"), Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
final MockFlowFile failure = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILURE).getFirst();
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
final MockFlowFile failure = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_FAILURE).getFirst();
failure.assertAttributeEquals("elasticsearch.put.error", String.format("Field referenced by %s must be Map-type compatible or a String parsable into a JSON Object", "/dynamic_templates"));
}
@Test
public void testRecordPathFieldDefaults() throws Exception {
void testRecordPathFieldDefaults() throws Exception {
clientService.setEvalConsumer((final List<IndexOperationRequest> items) -> {
final long idNotNull = items.stream().filter(item -> item.getId() != null).count();
final long opIndex = items.stream().filter(item -> IndexOperationRequest.Operation.Index.equals(item.getOperation())).count();
@ -519,16 +585,16 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.enqueue(Paths.get(TEST_DIR, "9_flowFileContents.json"), Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testDefaultDateTimeFormatting() throws Exception{
void testDefaultDateTimeFormatting() throws Exception{
clientService.setEvalConsumer((final List<IndexOperationRequest> items) -> {
final long msg = items.stream().filter(item -> (item.getFields().get("msg") != null)).count();
final long timestamp = items.stream().filter(item ->
@ -562,16 +628,16 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.enqueue(getDateTimeFormattingJson(), Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, DATE_TIME_FORMATTING_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testCustomDateTimeFormatting() throws Exception {
void testCustomDateTimeFormatting() throws Exception {
final String timestampFormat = "yy MMM d H";
final String dateFormat = "dd/MM/yyyy";
final String timeFormat = "HHmmss";
@ -615,16 +681,16 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.enqueue(getDateTimeFormattingJson(), Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, DATE_TIME_FORMATTING_TEST_SCHEMA));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testInvalidIndexOperation() {
void testInvalidIndexOperation() {
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "not-valid");
runner.assertNotValid();
final AssertionError ae = assertThrows(AssertionError.class, runner::run);
@ -636,122 +702,127 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.enqueue(flowFileContentMaps, Collections.singletonMap("operation", "not-valid2"));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testInputRequired() {
void testInputRequired() {
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
public void testFailedRecordsOutput() throws Exception {
void testFailedRecordsOutput() throws Exception {
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, "true");
runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true");
final int errorCount = 3;
final int successCount = 4;
testErrorRelationship(errorCount, successCount, true);
testErrorRelationship(errorCount, 1, successCount);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(errorCount));
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(errorCount));
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
String.valueOf(successCount));
}
@Test
public void testFailedRecordsOutputGroupedByErrorType() throws Exception {
void testFailedRecordsOutputGroupedByErrorType() throws Exception {
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, "true");
runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true");
runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, "true");
final int successCount = 4;
testErrorRelationship(3, successCount, true);
testErrorRelationship(3, 2, successCount);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 2);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 2);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(0)
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(0)
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2");
assertTrue(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(0)
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(0)
.getAttribute("elasticsearch.bulk.error").contains("mapper_parsing_exception"));
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(1)
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1)
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1");
assertTrue(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(1)
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1)
.getAttribute("elasticsearch.bulk.error").contains("some_other_exception"));
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst()
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst()
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(successCount));
}
@Test
public void testNotFoundResponsesTreatedAsFailedRecords() throws Exception {
void testNotFoundResponsesTreatedAsFailedRecords() throws Exception {
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, "false");
runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, "false");
final int errorCount = 4;
final int successCount = 3;
testErrorRelationship(errorCount, successCount, true);
testErrorRelationship(errorCount, 1, successCount);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).getFirst()
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst()
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(errorCount));
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst()
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst()
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(successCount));
}
@Test
public void testNotFoundFailedRecordsGroupedAsErrorType() throws Exception {
void testNotFoundFailedRecordsGroupedAsErrorType() throws Exception {
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, "false");
runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, "true");
final int errorCount = 4;
final int successCount = 3;
testErrorRelationship(errorCount, successCount, true);
testErrorRelationship(errorCount, 3, successCount);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 3);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 3);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(0)
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(0)
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2");
assertTrue(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(0)
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(0)
.getAttribute("elasticsearch.bulk.error").contains("mapper_parsing_exception"));
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(1)
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1)
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1");
assertTrue(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(1)
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1)
.getAttribute("elasticsearch.bulk.error").contains("some_other_exception"));
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(2)
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(2)
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1");
assertTrue(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(2)
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(2)
.getAttribute("elasticsearch.bulk.error").contains("not_found"));
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst()
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst()
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(successCount));
}
@Test
public void testErrorsLoggedWithoutErrorRelationship() throws Exception {
void testErrorsLoggedWithoutErrorRelationship() throws Exception {
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, "false");
runner.setProperty(PutElasticsearchRecord.OUTPUT_ERROR_RESPONSES, "true");
runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, "false");
final int errorCount = 4;
final int successCount = 3;
testErrorRelationship(errorCount, successCount, false);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1);
testErrorRelationship(errorCount, 1, successCount);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 1);
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES).getFirst().getContent();
assertTrue(errorResponses.contains("not_found"));
assertTrue(errorResponses.contains("For input string: 20abc"));
@ -760,7 +831,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
}
@Test
public void testInvalidBulkHeaderProperty() {
void testInvalidBulkHeaderProperty() {
runner.assertValid();
runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "routing", "not-record-path");
runner.assertNotValid();
@ -770,32 +841,29 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
return AvroTypeUtil.createSchema(new Schema.Parser().parse(Files.readString(schema)));
}
private void testErrorRelationship(final int errorCount, final int successCount, final boolean recordWriter) throws Exception {
private void testErrorRelationship(final int errorCount, final int errorGroupsCount, final int successfulCount) throws Exception {
final String schemaName = "errorTest";
final JsonRecordSetWriter writer = new JsonRecordSetWriter();
runner.addControllerService("writer", writer);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
runner.enableControllerService(writer);
clientService.setResponse(IndexOperationResponse.fromJsonResponse(JsonUtils.readString(Paths.get(TEST_COMMON_DIR, "sampleErrorResponse.json"))));
registry.addSchema(schemaName, errorTestSchema);
if (recordWriter) {
runner.setProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER, "writer");
}
runner.assertValid();
runner.enqueue(VALUES, Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, schemaName));
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, successfulCount > 0 ? 1 : 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, errorGroupsCount);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
assertEquals(1,
runner.getProvenanceEvents().stream().filter(e -> ProvenanceEventType.SEND.equals(e.getEventType())
&& String.format("1 Elasticsearch _bulk operation batch(es) [%d error(s), %d success(es)]", errorCount, successCount).equals(e.getDetails())).count());
&& String.format("1 Elasticsearch _bulk operation batch(es) [%d error(s), %d success(es)]", errorCount, successfulCount).equals(e.getDetails())).count());
assertTrue(runner.getProvenanceEvents().stream().anyMatch(e -> ProvenanceEventType.FORK.equals(e.getEventType()) && e.getParentUuids().equals(
Collections.singletonList(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ORIGINAL).getFirst().getAttribute("uuid"))
)));
}
private void testInvalidELRecordPaths(final String idRecordPath, final String atTimestampRecordPath, final Path path, final Map<String, String> attributes) throws IOException {
@ -812,12 +880,12 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, atTimestampRecordPath);
runner.enqueue(path, attributes);
runner.run();
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
private void testWithRequestParametersAndBulkHeaders(final Map<String, String> attributes) {

View File

@ -18,4 +18,12 @@
"id" : "4",
"field1" : "value2",
"field2" : "30"
}, {
"id" : "6",
"field1" : "value1",
"field2" : "213,456.9"
}, {
"id" : "7",
"field1" : "value1",
"field2" : "unit test"
} ]

View File

@ -52,7 +52,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.10.3"));
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.13.3"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)

View File

@ -95,7 +95,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.10.3</elasticsearch_docker_image>
<elasticsearch_docker_image>8.13.3</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
@ -126,7 +126,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch7</id>
<properties>
<elasticsearch_docker_image>7.17.14</elasticsearch_docker_image>
<elasticsearch_docker_image>7.17.21</elasticsearch_docker_image>
</properties>
</profile>
</profiles>

View File

@ -58,7 +58,6 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
private String annotationData = null;
private boolean yieldCalled = false;
private boolean enableExpressionValidation = false;
private boolean allowExpressionValidation = true;
private volatile boolean incomingConnection = true;
private volatile boolean nonLoopConnection = true;
@ -154,8 +153,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
final String setPropertyValue = properties.get(canonicalDescriptor);
final String propValue = (setPropertyValue == null) ? canonicalDescriptor.getDefaultValue() : setPropertyValue;
final MockPropertyValue propertyValue = new MockPropertyValue(propValue, this, canonicalDescriptor, true, environmentVariables);
return propertyValue;
return new MockPropertyValue(propValue, this, canonicalDescriptor, true, environmentVariables);
}
@Override
@ -169,8 +167,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
final boolean alreadyEvaluated = !this.allowExpressionValidation;
final MockPropertyValue propertyValue = new MockPropertyValue(propValue, this, descriptor, alreadyEvaluated, environmentVariables);
return propertyValue;
return new MockPropertyValue(propValue, this, descriptor, alreadyEvaluated, environmentVariables);
}
@Override
@ -198,8 +195,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
* @return result
*/
public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
requireNonNull(descriptor);
requireNonNull(value, "Cannot set property to null value; if the intent is to remove the property, call removeProperty instead");
requireNonNull(descriptor, "Cannot set property for null descriptor");
requireNonNull(value, "Cannot set property " + descriptor.getName() + " to null value; if the intent is to remove the property, call removeProperty instead");
final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());
final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this, stateManager));
@ -236,8 +233,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
}
public void clearProperties() {
Map<PropertyDescriptor, String> properties = getProperties();
for (Map.Entry<PropertyDescriptor, String> e : properties.entrySet()) {
final Map<PropertyDescriptor, String> props = getProperties();
for (final Map.Entry<PropertyDescriptor, String> e : props.entrySet()) {
removeProperty(e.getKey());
}
}
@ -312,10 +309,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
* non-null
*/
public Collection<ValidationResult> validate() {
final List<ValidationResult> results = new ArrayList<>();
final ValidationContext validationContext = new MockValidationContext(this, stateManager);
final Collection<ValidationResult> componentResults = component.validate(validationContext);
results.addAll(componentResults);
final List<ValidationResult> results = new ArrayList<>(componentResults);
final Collection<ValidationResult> serviceResults = validateReferencedControllerServices(validationContext);
results.addAll(serviceResults);
@ -412,7 +408,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
}
if (failureCount > 0) {
Assertions.fail("Processor has " + failureCount + " validation failures:\n" + sb.toString());
Assertions.fail("Processor has " + failureCount + " validation failures:\n" + sb);
}
}
@ -420,14 +416,6 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
allowExpressionValidation = validate;
}
public void enableExpressionValidation() {
enableExpressionValidation = true;
}
public void disableExpressionValidation() {
enableExpressionValidation = false;
}
Map<PropertyDescriptor, String> getControllerServiceProperties(final ControllerService controllerService) {
return super.getConfiguration(controllerService.getIdentifier()).getProperties();
}
@ -441,13 +429,17 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
return this;
}
@Override
public Set<Relationship> getAvailableRelationships() {
public Set<Relationship> getAllRelationships() {
if (!(component instanceof Processor)) {
return Collections.emptySet();
}
final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
return new HashSet<>(((Processor) component).getRelationships());
}
@Override
public Set<Relationship> getAvailableRelationships() {
final Set<Relationship> relationships = getAllRelationships();
relationships.removeAll(unavailableRelationships);
return relationships;
}
@ -491,6 +483,10 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
this.connections.remove(relationship);
}
public void clearConnections() {
this.connections = new HashSet<>();
}
public void setConnections(final Set<Relationship> connections) {
if (connections == null) {
this.connections = Collections.emptySet();
@ -506,7 +502,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
return !elRanges.isEmpty();
}
@Override

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.migration.RelationshipConfiguration;
import org.apache.nifi.processor.Relationship;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MockRelationshipConfiguration implements RelationshipConfiguration {
private final Map<String, Set<String>> relationshipSplits = new HashMap<>();
private final Map<String, String> relationshipRenames = new HashMap<>();
private final Map<String, Relationship> rawRelationships;
private final Set<Relationship> originalRelationships;
public MockRelationshipConfiguration(final Set<Relationship> relationships) {
this.originalRelationships = Collections.unmodifiableSet(relationships);
this.rawRelationships = relationships.stream().collect(Collectors.toMap(Relationship::getName, Function.identity()));
}
public RelationshipMigrationResult toRelationshipMigrationResult() {
return new RelationshipMigrationResult() {
@Override
public Map<String, Set<String>> getPreviousRelationships() {
return Collections.unmodifiableMap(relationshipSplits);
}
@Override
public Map<String, String> getRenamedRelationships() {
return Collections.unmodifiableMap(relationshipRenames);
}
};
}
@Override
public boolean renameRelationship(final String relationshipName, final String newName) {
relationshipRenames.put(relationshipName, newName);
final boolean hasRelationship = hasRelationship(relationshipName);
if (!hasRelationship) {
return false;
}
rawRelationships.remove(relationshipName);
rawRelationships.put(relationshipName, findRelationshipByName(newName));
return true;
}
@Override
public boolean splitRelationship(final String relationshipName, final String newRelationshipName, final String... additionalRelationshipNames) {
final Set<String> newRelationships = new HashSet<>();
newRelationships.add(newRelationshipName);
if (additionalRelationshipNames != null) {
newRelationships.addAll(Arrays.stream(additionalRelationshipNames).toList());
}
relationshipSplits.put(relationshipName, newRelationships);
final boolean hasRelationship = hasRelationship(relationshipName);
if (!hasRelationship) {
return false;
}
rawRelationships.remove(relationshipName);
newRelationships.forEach(r -> rawRelationships.put(r, findRelationshipByName(r)));
return true;
}
@Override
public boolean hasRelationship(final String relationshipName) {
return rawRelationships.containsKey(relationshipName);
}
private Relationship findRelationshipByName(final String name) {
return originalRelationships.stream().filter(r -> name.equals(r.getName())).findFirst().orElseThrow();
}
public Set<Relationship> getRawRelationships() {
return Set.copyOf(rawRelationships.values());
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.util.Map;
import java.util.Set;
public interface RelationshipMigrationResult {
/**
* @return a mapping of previous relationship names to the new names of those relationships
*/
Map<String, Set<String>> getPreviousRelationships();
/**
* @return a mapping of previous relationship names to the new names of those relationships
*/
Map<String, String> getRenamedRelationships();
}

View File

@ -63,6 +63,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@ -202,23 +203,20 @@ public class StandardProcessorTestRunner implements TestRunner {
}
context.assertValid();
context.enableExpressionValidation();
// Call onConfigurationRestored here, right before the test run, as all properties should have been set byt this point.
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor, this.context);
try {
if (initialize) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
} catch (final Exception e) {
Assertions.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e, e);
}
if (initialize) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
} catch (final Exception e) {
Assertions.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e, e);
}
}
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(numThreads);
@SuppressWarnings("unchecked")
final Future<Throwable>[] futures = new Future[iterations];
@SuppressWarnings("unchecked") final Future<Throwable>[] futures = new Future[iterations];
try (final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(numThreads)) {
for (int i = 0; i < iterations; i++) {
final Future<Throwable> future = executorService.schedule(new RunProcessor(), i * runSchedule, TimeUnit.MILLISECONDS);
futures[i] = future;
@ -229,33 +227,31 @@ public class StandardProcessorTestRunner implements TestRunner {
executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e1) {
}
}
int finishedCount = 0;
boolean unscheduledRun = false;
for (final Future<Throwable> future : futures) {
try {
final Throwable thrown = future.get(); // wait for the result
if (thrown != null) {
throw new AssertionError(thrown);
}
if (++finishedCount == 1 && stopOnFinish) {
unscheduledRun = true;
unSchedule();
}
} catch (final Exception e) {
int finishedCount = 0;
boolean unscheduledRun = false;
for (final Future<Throwable> future : futures) {
try {
final Throwable thrown = future.get(); // wait for the result
if (thrown != null) {
throw new AssertionError(thrown);
}
}
if (!unscheduledRun && stopOnFinish) {
unSchedule();
if (++finishedCount == 1 && stopOnFinish) {
unscheduledRun = true;
unSchedule();
}
} catch (final InterruptedException | ExecutionException e) {
}
}
if (stopOnFinish) {
stop();
}
} finally {
context.disableExpressionValidation();
if (!unscheduledRun && stopOnFinish) {
unSchedule();
}
if (stopOnFinish) {
stop();
}
}
@ -331,7 +327,7 @@ public class StandardProcessorTestRunner implements TestRunner {
assertAllFlowFiles(new FlowFileValidator() {
@Override
public void assertFlowFile(FlowFile f) {
Assertions.assertTrue(f.getAttribute(attributeName) != null);
Assertions.assertNotNull(f.getAttribute(attributeName));
}
});
}
@ -341,7 +337,7 @@ public class StandardProcessorTestRunner implements TestRunner {
assertAllFlowFiles(relationship, new FlowFileValidator() {
@Override
public void assertFlowFile(FlowFile f) {
Assertions.assertTrue(f.getAttribute(attributeName) != null);
Assertions.assertNotNull(f.getAttribute(attributeName));
}
});
}
@ -387,7 +383,7 @@ public class StandardProcessorTestRunner implements TestRunner {
List<MockFlowFile> flowFiles = getFlowFilesForRelationship(relationship);
List<String> actualContent = flowFiles.stream()
.map(flowFile -> flowFile.getContent())
.map(MockFlowFile::getContent)
.collect(Collectors.toList());
assertEquals(expectedContent, actualContent);
@ -664,11 +660,11 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException {
final MockComponentLog logger = new MockComponentLog(identifier, service);
controllerServiceLoggers.put(identifier, logger);
final MockComponentLog mockComponentLog = new MockComponentLog(identifier, service);
controllerServiceLoggers.put(identifier, mockComponentLog);
final MockStateManager serviceStateManager = new MockStateManager(service);
final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(
requireNonNull(service), requireNonNull(identifier), logger, serviceStateManager, kerberosContext);
requireNonNull(service), requireNonNull(identifier), mockComponentLog, serviceStateManager, kerberosContext);
controllerServiceStateManagers.put(identifier, serviceStateManager);
initContext.addControllerServices(context);
service.initialize(initContext);
@ -719,7 +715,7 @@ public class StandardProcessorTestRunner implements TestRunner {
for (final ValidationResult result : results) {
if (!result.isValid()) {
Assertions.fail("Expected Controller Service to be valid but it is invalid due to: " + result.toString());
Assertions.fail("Expected Controller Service to be valid but it is invalid due to: " + result);
}
}
}
@ -768,7 +764,7 @@ public class StandardProcessorTestRunner implements TestRunner {
for (final ValidationResult result : results) {
if (!result.isValid()) {
throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is in an invalid state: " + result.toString());
throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is in an invalid state: " + result);
}
}
@ -1046,7 +1042,7 @@ public class StandardProcessorTestRunner implements TestRunner {
}
for (MockFlowFile flowFile : flowFiles) {
if (predicate.test(flowFile)==false) {
if (!predicate.test(flowFile)) {
Assertions.fail("FlowFile " + flowFile + " does not meet all condition");
}
}
@ -1075,7 +1071,7 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public PropertyMigrationResult migrateProperties() {
final MockPropertyConfiguration mockPropertyConfiguration = new MockPropertyConfiguration(getProcessContext().getAllProperties());
final MockPropertyConfiguration mockPropertyConfiguration = new MockPropertyConfiguration(context.getAllProperties());
getProcessor().migrateProperties(mockPropertyConfiguration);
final PropertyMigrationResult migrationResult = mockPropertyConfiguration.toPropertyMigrationResult();
@ -1093,6 +1089,7 @@ public class StandardProcessorTestRunner implements TestRunner {
serviceImpl = (ControllerService) newInstance;
addControllerService(service.id(), serviceImpl, service.serviceProperties());
enableControllerService(serviceImpl);
} catch (final Exception e) {
if (serviceCreationException == null) {
if (e instanceof RuntimeException) {
@ -1111,10 +1108,27 @@ public class StandardProcessorTestRunner implements TestRunner {
}
final Map<String, String> updatedProperties = mockPropertyConfiguration.getRawProperties();
final MockProcessContext processContext = getProcessContext();
processContext.clearProperties();
updatedProperties.forEach(processContext::setProperty);
clearProperties();
updatedProperties.forEach((propertyName, propertyValue) -> {
if (propertyValue == null) {
removeProperty(propertyName);
} else {
setProperty(propertyName, propertyValue);
}
});
return migrationResult;
}
@Override
public RelationshipMigrationResult migrateRelationships() {
final MockRelationshipConfiguration mockRelationshipConfiguration = new MockRelationshipConfiguration(context.getAllRelationships());
getProcessor().migrateRelationships(mockRelationshipConfiguration);
final Set<Relationship> updatedRelationships = mockRelationshipConfiguration.getRawRelationships();
context.clearConnections();
updatedRelationships.forEach(context::addConnection);
return mockRelationshipConfiguration.toRelationshipMigrationResult();
}
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.RelationshipConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
@ -345,7 +346,7 @@ public interface TestRunner {
* @param relationship The relationship on which to check the contents of flowfiles
* @param expectedContent The expected contents of all flowfiles
*/
public void assertContents(Relationship relationship, List<String> expectedContent);
void assertContents(Relationship relationship, List<String> expectedContent);
/**
* Assert that the number of FlowFiles transferred to the given relationship
@ -1077,4 +1078,13 @@ public interface TestRunner {
* @return the results of migrating properties
*/
PropertyMigrationResult migrateProperties();
/**
* Causes the TestRunner to call the Processor's {@link Processor#migrateRelationships(RelationshipConfiguration)} method. The effects that are
* caused by calling the method are applied, as they would be in a running NiFi instance. Unlike in a running NiFi instance, though, the
* operations that were performed are captured so that they can be examined and assertions made about the migration that occurred.
*
* @return the results of migrating relationships
*/
RelationshipMigrationResult migrateRelationships();
}