NIFI-8790 allow Expression Language for Index Operation in PutElasticsearchRecord

Improved validation for PutElasticsearchRecord Index Operation property

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5220
This commit is contained in:
Chris Sampson 2021-07-16 10:55:17 +01:00 committed by Matthew Burgess
parent 8d2ced429d
commit 2e1f276f06
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 80 additions and 27 deletions

View File

@ -24,6 +24,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchError;
@ -55,6 +58,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -88,17 +92,10 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.name("put-es-record-index-op")
.displayName("Index Operation")
.description("The type of the operation used to index (create, delete, index, update, upsert)")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.allowableValues(
IndexOperationRequest.Operation.Create.getValue(),
IndexOperationRequest.Operation.Delete.getValue(),
IndexOperationRequest.Operation.Index.getValue(),
IndexOperationRequest.Operation.Update.getValue(),
IndexOperationRequest.Operation.Upsert.getValue()
)
.defaultValue(IndexOperationRequest.Operation.Index.getValue())
.required(true)
.build();
static final PropertyDescriptor INDEX_OP_RECORD_PATH = new PropertyDescriptor.Builder()
@ -183,6 +180,38 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
}
static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
));
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>();
final PropertyValue indexOp = validationContext.getProperty(INDEX_OP);
final ValidationResult.Builder indexOpValidationResult = new ValidationResult.Builder().subject(INDEX_OP.getName());
if (!indexOp.isExpressionLanguagePresent()) {
final String indexOpValue = indexOp.evaluateAttributeExpressions().getValue();
indexOpValidationResult.input(indexOpValue);
if (!ALLOWED_INDEX_OPERATIONS.contains(indexOpValue.toLowerCase())) {
indexOpValidationResult.valid(false)
.explanation(String.format("%s must be Expression Language or one of %s",
INDEX_OP.getDisplayName(), ALLOWED_INDEX_OPERATIONS)
);
} else {
indexOpValidationResult.valid(true);
}
} else {
indexOpValidationResult.valid(true).input(indexOp.getValue()).explanation("Expression Language present");
}
validationResults.add(indexOpValidationResult.build());
return validationResults;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@ -228,6 +257,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp));
final String id = path != null ? getFromRecordPath(record, path, null) : null;
@SuppressWarnings("unchecked")
Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
@ -297,17 +327,20 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
if (writerFactory != null) {
FlowFile errorFF = session.create(input);
try (OutputStream os = session.write(errorFF);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os )) {
RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
int added = 0;
writer.beginRecordSet();
for (int index = 0; index < response.getItems().size(); index++) {
Map<String, Object> current = response.getItems().get(index);
String key = current.keySet().stream().findFirst().get();
Map<String, Object> inner = (Map<String, Object>) current.get(key);
if (inner.containsKey("error")) {
writer.write(bundle.getOriginalRecords().get(index));
added++;
if (!current.isEmpty()) {
String key = current.keySet().iterator().next();
@SuppressWarnings("unchecked")
Map<String, Object> inner = (Map<String, Object>) current.get(key);
if (inner.containsKey("error")) {
writer.write(bundle.getOriginalRecords().get(index));
added++;
}
}
}
writer.finishRecordSet();
@ -325,14 +358,11 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
throw ex;
}
}
return null;
} else {
return null;
}
return null;
}
private String getFromRecordPath(Record record, RecordPath path, final String fallback) {
private String getFromRecordPath(final Record record, final RecordPath path, final String fallback) {
if (path == null) {
return fallback;
}
@ -349,9 +379,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
fieldValue.updateValue(null);
String retVal = fieldValue.getValue().toString();
return retVal;
return fieldValue.getValue().toString();
} else {
return fallback;
}

View File

@ -189,7 +189,7 @@ class PutElasticsearchRecordTest {
[ id: "rec-2", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-3", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-4", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-5", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-5", op: "update", index: null, type: null, msg: "Hello" ],
[ id: "rec-6", op: null, index: "bulk_b", type: "message", msg: "Hello" ]
]))
@ -199,18 +199,21 @@ class PutElasticsearchRecordTest {
def testIndexCount = items.findAll { it.index == "test_index" }.size()
def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
def updateOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
Assert.assertEquals(5, testTypeCount)
Assert.assertEquals(1, messageTypeCount)
Assert.assertEquals(5, testIndexCount)
Assert.assertEquals(1, bulkIndexCount)
Assert.assertEquals(6, indexOperationCount)
Assert.assertEquals(5, indexOperationCount)
Assert.assertEquals(1, updateOperationCount)
}
clientService.evalClosure = evalClosure
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "\${operation}")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
"schema.name": "recordPathTest",
"operation": "index"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
@ -241,6 +244,7 @@ class PutElasticsearchRecordTest {
clientService.evalClosure = evalClosure
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
runner.removeProperty(PutElasticsearchRecord.TYPE)
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
@ -283,6 +287,27 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
}
@Test
void testInvalidIndexOperation() {
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "not-valid")
runner.assertNotValid()
final AssertionError ae = Assert.assertThrows(AssertionError.class, runner.&run)
Assert.assertEquals(String.format("Processor has 1 validation failures:\n'%s' validated against 'not-valid' is invalid because %s must be Expression Language or one of %s\n",
PutElasticsearchRecord.INDEX_OP.getName(), PutElasticsearchRecord.INDEX_OP.getDisplayName(), PutElasticsearchRecord.ALLOWED_INDEX_OPERATIONS),
ae.getMessage()
)
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "\${operation}")
runner.assertValid()
runner.enqueue(flowFileContents, [
"operation": "not-valid2"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
}
@Test
void testInputRequired() {
runner.run()