From 407add7847f9d79ea9c7b604fd57c3160ff30c06 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 16 Feb 2019 07:27:25 -0500 Subject: [PATCH] NIFI-5916 Added an option to enable empty flowfiles to be sent if there are no results from a query. NIFI-5916 Fixed potential NPE. Signed-off-by: Matthew Burgess This closes #3315 --- .../nifi/processors/mongodb/GetMongo.java | 30 ++++++++++++++++++- .../nifi/processors/mongodb/GetMongoIT.java | 21 +++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 286c47edde..b3b82b0e81 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -29,15 +29,18 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; import org.bson.json.JsonWriterSettings; @@ -58,7 +61,16 @@ import java.util.Set; @WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.") }) public class GetMongo extends AbstractMongoQueryProcessor { - + public static final PropertyDescriptor SEND_EMPTY_RESULTS = new PropertyDescriptor.Builder() + .name("get-mongo-send-empty") + .displayName("Send Empty Result") + .description("If a query executes successfully, but returns no results, send an empty JSON document " + + "signifying no result.") + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .required(false) + .build(); static final AllowableValue YES_PP = new AllowableValue("true", "True"); static final AllowableValue NO_PP = new AllowableValue("false", "False"); @@ -94,6 +106,7 @@ public class GetMongo extends AbstractMongoQueryProcessor { _propertyDescriptors.add(DATE_FORMAT); _propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(CLIENT_AUTH); + _propertyDescriptors.add(SEND_EMPTY_RESULTS); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); final Set _relationships = new HashSet<>(); @@ -103,6 +116,12 @@ public class GetMongo extends AbstractMongoQueryProcessor { relationships = Collections.unmodifiableSet(_relationships); } + private boolean sendEmpty; + @OnScheduled + public void onScheduled(PropertyContext context) { + sendEmpty = context.getProperty(SEND_EMPTY_RESULTS).asBoolean(); + } + @Override public Set getRelationships() { return relationships; @@ -191,6 +210,7 @@ public class GetMongo extends AbstractMongoQueryProcessor { it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger()); } + long sent = 0; try (MongoCursor cursor = it.iterator()) { configureMapper(jsonTypeSetting, dateFormat); @@ -209,6 +229,7 @@ public class GetMongo extends AbstractMongoQueryProcessor { logger.error("Error building batch due to {}", new Object[] {e}); } } + sent++; } if (batch.size() > 0) { @@ -234,12 +255,19 @@ public class GetMongo extends AbstractMongoQueryProcessor { outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes); session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context)); session.transfer(outgoingFlowFile, REL_SUCCESS); + sent++; } } if (input != null) { session.transfer(input, REL_ORIGINAL); } + + if (sent == 0 && sendEmpty) { + FlowFile empty = input != null ? session.create(input) : session.create(); + empty = session.putAllAttributes(empty, attributes); + session.transfer(empty, REL_SUCCESS); + } } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java index 286a70db7b..ddea9a8596 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java @@ -620,7 +620,7 @@ public class GetMongoIT { //Test a bad flowfile attribute runner.setIncomingConnection(true); runner.setProperty(GetMongo.QUERY, "${badfromff}"); - runner.enqueue("<>", new HashMap(){{ + runner.enqueue("<>", new HashMap() {{ put("badfromff", "{\"prop\":}"); }}); runner.run(); @@ -633,7 +633,7 @@ public class GetMongoIT { //Test for regression on a good query from a flowfile attribute runner.setIncomingConnection(true); runner.setProperty(GetMongo.QUERY, "${badfromff}"); - runner.enqueue("<>", new HashMap(){{ + runner.enqueue("<>", new HashMap() {{ put("badfromff", "{}"); }}); runner.run(); @@ -651,4 +651,21 @@ public class GetMongoIT { runner.assertTransferCount(GetMongo.REL_SUCCESS, 0); runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); } + + public void testSendEmpty() throws Exception { + runner.setIncomingConnection(true); + runner.setProperty(GetMongo.SEND_EMPTY_RESULTS, "true"); + runner.setProperty(GetMongo.QUERY, "{ \"nothing\": true }"); + runner.assertValid(); + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + runner.assertTransferCount(GetMongo.REL_FAILURE, 0); + + List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + MockFlowFile flowFile = flowFiles.get(0); + Assert.assertEquals(0, flowFile.getSize()); + } }