From c7edcd68e1d6d98472953897ec8c9a18d2f7d290 Mon Sep 17 00:00:00 2001 From: eduardofontes Date: Sun, 17 May 2020 02:27:15 -0300 Subject: [PATCH] NIFI-7463 Create empty relationship for RunMongoAggregation Fix default autoterminate and condition to redirect to REL_EMPTY Change from new relationship to write an empty FlowFile to RESULT Fix MONGO_URI This closes #4281 Signed-off-by: Mike Thomsen --- .../mongodb/RunMongoAggregation.java | 8 +++++- .../mongodb/RunMongoAggregationIT.java | 27 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index 0c3bb76203..3c4ddef098 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -183,17 +183,23 @@ public class RunMongoAggregation extends AbstractMongoProcessor { iter = it.iterator(); List batch = new ArrayList<>(); + Boolean doneSomething = false; while (iter.hasNext()) { batch.add(iter.next()); if (batch.size() == resultsPerFlowfile) { writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); batch = new ArrayList<>(); + doneSomething |= true; } } - if (batch.size() > 0) { + if (! batch.isEmpty()) { + // Something remains in batch list, write it to RESULT writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); + } else if (! doneSomething) { + // The batch list is empty and no batch was written (empty result!), so write empty string to RESULT + writeBatch("", flowFile, context, session, attrs, REL_RESULTS); } if (flowFile != null) { diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java index 23ac4f0c51..2aadc5ff72 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java @@ -263,4 +263,31 @@ public class RunMongoAggregationIT { runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size()); } + + @Test + public void testEmptyResponse() throws Exception { + final String queryInput = "[\n" + + " {\n" + + " \"$match\": {\n" + + " \"val\": \"no_exists\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"$group\": {\n" + + " \"_id\": \"null\",\n" + + " \"doc_count\": {\n" + + " \"$sum\": 1\n" + + " }\n" + + " }\n" + + " }\n" + + "]"; + + runner.setProperty(RunMongoAggregation.QUERY, queryInput); + runner.enqueue("test"); + runner.run(1, true, true); + + runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, 1); + runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 0); + runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 1); + } }