NIFI-4636 Updated GetMongo to support expression language on limit, results per flowfile and batch fields.

NIFI-4636: Added unit tests
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2289
This commit is contained in:
Mike Thomsen 2017-11-23 16:52:41 -05:00 committed by Matthew Burgess
parent f772f2f093
commit 59d3c64195
2 changed files with 21 additions and 4 deletions

View File

@ -107,12 +107,14 @@ public class GetMongo extends AbstractMongoProcessor {
.name("Limit") .name("Limit")
.description("The maximum number of elements to return") .description("The maximum number of elements to return")
.required(false) .required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size") .name("Batch Size")
.description("The number of elements returned from the server in one batch") .description("The number of elements returned from the server in one batch")
.required(false) .required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
@ -120,6 +122,7 @@ public class GetMongo extends AbstractMongoProcessor {
.displayName("Results Per FlowFile") .displayName("Results Per FlowFile")
.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
.required(false) .required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
@ -241,10 +244,10 @@ public class GetMongo extends AbstractMongoProcessor {
it.sort(sort); it.sort(sort);
} }
if (context.getProperty(LIMIT).isSet()) { if (context.getProperty(LIMIT).isSet()) {
it.limit(context.getProperty(LIMIT).asInteger()); it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions().asInteger());
} }
if (context.getProperty(BATCH_SIZE).isSet()) { if (context.getProperty(BATCH_SIZE).isSet()) {
it.batchSize(context.getProperty(BATCH_SIZE).asInteger()); it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
} }
final MongoCursor<Document> cursor = it.iterator(); final MongoCursor<Document> cursor = it.iterator();
@ -252,7 +255,7 @@ public class GetMongo extends AbstractMongoProcessor {
try { try {
FlowFile flowFile = null; FlowFile flowFile = null;
if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger();
List<Document> batch = new ArrayList<>(); List<Document> batch = new ArrayList<>();
while (cursor.hasNext()) { while (cursor.hasNext()) {

View File

@ -228,7 +228,8 @@ public class GetMongoTest {
@Test @Test
public void testLimit() throws Exception { public void testLimit() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}"); runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.setProperty(GetMongo.LIMIT, "1"); runner.setProperty(GetMongo.LIMIT, "${limit}");
runner.setVariable("limit", "1");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
@ -238,7 +239,20 @@ public class GetMongoTest {
@Test @Test
public void testResultsPerFlowfile() throws Exception { public void testResultsPerFlowfile() throws Exception {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}");
runner.setVariable("results.per.flowfile", "2");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json");
}
@Test
public void testBatchSize() throws Exception {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}");
runner.setVariable("batch.size", "1");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);