diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 7c78358b80..898824583a 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -107,12 +107,14 @@ public class GetMongo extends AbstractMongoProcessor { .name("Limit") .description("The maximum number of elements to return") .required(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") .description("The number of elements returned from the server in one batch") .required(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() @@ -120,6 +122,7 @@ public class GetMongo extends AbstractMongoProcessor { .displayName("Results Per FlowFile") .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .required(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); @@ -241,10 +244,10 @@ public class GetMongo extends AbstractMongoProcessor { it.sort(sort); } if (context.getProperty(LIMIT).isSet()) { - it.limit(context.getProperty(LIMIT).asInteger()); + it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions().asInteger()); } if (context.getProperty(BATCH_SIZE).isSet()) { - it.batchSize(context.getProperty(BATCH_SIZE).asInteger()); + it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()); } final MongoCursor cursor = it.iterator(); @@ -252,7 +255,7 @@ public class GetMongo extends AbstractMongoProcessor { try { FlowFile flowFile = null; if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { - int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger(); List batch = new ArrayList<>(); while (cursor.hasNext()) { diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index feef93f13b..2de0c97729 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -228,7 +228,8 @@ public class GetMongoTest { @Test public void testLimit() throws Exception { runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}"); - runner.setProperty(GetMongo.LIMIT, "1"); + runner.setProperty(GetMongo.LIMIT, "${limit}"); + runner.setVariable("limit", "1"); runner.run(); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); @@ -238,7 +239,20 @@ public class GetMongoTest { @Test public void testResultsPerFlowfile() throws Exception { + runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}"); + runner.setVariable("results.per.flowfile", "2"); + runner.run(); + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); + List results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); + Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); + } + + @Test + public void testBatchSize() throws Exception { runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); + runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}"); + runner.setVariable("batch.size", "1"); runner.run(); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); List results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);