NIFI-5916 Added an option to enable empty flowfiles to be sent if there are no results from a query.

NIFI-5916 Fixed potential NPE.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3315
This commit is contained in:
Mike Thomsen 2019-02-16 07:27:25 -05:00 committed by Matthew Burgess
parent ed19f61682
commit 407add7847
2 changed files with 48 additions and 3 deletions

View File

@ -29,15 +29,18 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import org.bson.json.JsonWriterSettings;
@ -58,7 +61,16 @@ import java.util.Set;
@WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
})
public class GetMongo extends AbstractMongoQueryProcessor {
public static final PropertyDescriptor SEND_EMPTY_RESULTS = new PropertyDescriptor.Builder()
.name("get-mongo-send-empty")
.displayName("Send Empty Result")
.description("If a query executes successfully, but returns no results, send an empty JSON document " +
"signifying no result.")
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.required(false)
.build();
static final AllowableValue YES_PP = new AllowableValue("true", "True");
static final AllowableValue NO_PP = new AllowableValue("false", "False");
@ -94,6 +106,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
_propertyDescriptors.add(DATE_FORMAT);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
_propertyDescriptors.add(SEND_EMPTY_RESULTS);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
@ -103,6 +116,12 @@ public class GetMongo extends AbstractMongoQueryProcessor {
relationships = Collections.unmodifiableSet(_relationships);
}
private boolean sendEmpty;
@OnScheduled
public void onScheduled(PropertyContext context) {
sendEmpty = context.getProperty(SEND_EMPTY_RESULTS).asBoolean();
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
@ -191,6 +210,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
}
long sent = 0;
try (MongoCursor<Document> cursor = it.iterator()) {
configureMapper(jsonTypeSetting, dateFormat);
@ -209,6 +229,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
logger.error("Error building batch due to {}", new Object[] {e});
}
}
sent++;
}
if (batch.size() > 0) {
@ -234,12 +255,19 @@ public class GetMongo extends AbstractMongoQueryProcessor {
outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
session.transfer(outgoingFlowFile, REL_SUCCESS);
sent++;
}
}
if (input != null) {
session.transfer(input, REL_ORIGINAL);
}
if (sent == 0 && sendEmpty) {
FlowFile empty = input != null ? session.create(input) : session.create();
empty = session.putAllAttributes(empty, attributes);
session.transfer(empty, REL_SUCCESS);
}
}
}

View File

@ -620,7 +620,7 @@ public class GetMongoIT {
//Test a bad flowfile attribute
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, "${badfromff}");
runner.enqueue("<<?>>", new HashMap<String, String>(){{
runner.enqueue("<<?>>", new HashMap<String, String>() {{
put("badfromff", "{\"prop\":}");
}});
runner.run();
@ -633,7 +633,7 @@ public class GetMongoIT {
//Test for regression on a good query from a flowfile attribute
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, "${badfromff}");
runner.enqueue("<<?>>", new HashMap<String, String>(){{
runner.enqueue("<<?>>", new HashMap<String, String>() {{
put("badfromff", "{}");
}});
runner.run();
@ -651,4 +651,21 @@ public class GetMongoIT {
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
}
public void testSendEmpty() throws Exception {
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.SEND_EMPTY_RESULTS, "true");
runner.setProperty(GetMongo.QUERY, "{ \"nothing\": true }");
runner.assertValid();
runner.enqueue("");
runner.run();
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
MockFlowFile flowFile = flowFiles.get(0);
Assert.assertEquals(0, flowFile.getSize());
}
}