mirror of https://github.com/apache/nifi.git
NIFI-7463
Create empty relationship for RunMongoAggregation Fix default autoterminate and condition to redirect to REL_EMPTY Change from new relationship to write an empty FlowFile to RESULT Fix MONGO_URI This closes #4281 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
d195702ee2
commit
c7edcd68e1
|
@ -183,17 +183,23 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
|
||||||
|
|
||||||
iter = it.iterator();
|
iter = it.iterator();
|
||||||
List<Document> batch = new ArrayList<>();
|
List<Document> batch = new ArrayList<>();
|
||||||
|
Boolean doneSomething = false;
|
||||||
|
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
batch.add(iter.next());
|
batch.add(iter.next());
|
||||||
if (batch.size() == resultsPerFlowfile) {
|
if (batch.size() == resultsPerFlowfile) {
|
||||||
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
|
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
|
||||||
batch = new ArrayList<>();
|
batch = new ArrayList<>();
|
||||||
|
doneSomething |= true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (batch.size() > 0) {
|
if (! batch.isEmpty()) {
|
||||||
|
// Something remains in batch list, write it to RESULT
|
||||||
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
|
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
|
||||||
|
} else if (! doneSomething) {
|
||||||
|
// The batch list is empty and no batch was written (empty result!), so write empty string to RESULT
|
||||||
|
writeBatch("", flowFile, context, session, attrs, REL_RESULTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (flowFile != null) {
|
if (flowFile != null) {
|
||||||
|
|
|
@ -263,4 +263,31 @@ public class RunMongoAggregationIT {
|
||||||
|
|
||||||
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size());
|
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyResponse() throws Exception {
|
||||||
|
final String queryInput = "[\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"$match\": {\n" +
|
||||||
|
" \"val\": \"no_exists\"\n" +
|
||||||
|
" }\n" +
|
||||||
|
" },\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"$group\": {\n" +
|
||||||
|
" \"_id\": \"null\",\n" +
|
||||||
|
" \"doc_count\": {\n" +
|
||||||
|
" \"$sum\": 1\n" +
|
||||||
|
" }\n" +
|
||||||
|
" }\n" +
|
||||||
|
" }\n" +
|
||||||
|
"]";
|
||||||
|
|
||||||
|
runner.setProperty(RunMongoAggregation.QUERY, queryInput);
|
||||||
|
runner.enqueue("test");
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 0);
|
||||||
|
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue