NIFI-6148 - Added support for aggregation option 'allowDiskUse'

This closes #3389

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Craig Davidson 2019-03-25 20:40:33 +00:00 committed by Mike Thomsen
parent d3d43262db
commit 4de51fd3d5
1 changed files with 15 additions and 1 deletions

View File

@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import org.bson.conversions.Bson;
@ -91,11 +92,23 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
.addValidator(JsonValidator.INSTANCE)
.build();
static final PropertyDescriptor ALLOW_DISK_USE = new PropertyDescriptor.Builder()
.name("allow-disk-use")
.displayName("Allow Disk Use")
.description("Set this to true to enable writing data to temporary files to prevent exceeding the " +
"maximum memory use limit during aggregation pipeline staged when handling large datasets.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(QUERY);
_propertyDescriptors.add(ALLOW_DISK_USE);
_propertyDescriptors.add(JSON_TYPE);
_propertyDescriptors.add(QUERY_ATTRIBUTE);
_propertyDescriptors.add(BATCH_SIZE);
@ -145,6 +158,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
}
final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
final Boolean allowDiskUse = context.getProperty(ALLOW_DISK_USE).asBoolean();
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
@ -163,7 +177,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
try {
MongoCollection<Document> collection = getCollection(context, flowFile);
List<Bson> aggQuery = buildAggregationQuery(query);
AggregateIterable<Document> it = collection.aggregate(aggQuery);
AggregateIterable<Document> it = collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);;
it.batchSize(batchSize != null ? batchSize : 1);
iter = it.iterator();