diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index c9e6d56b33..3352ab0e4d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.JsonValidator; +import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; import org.bson.conversions.Bson; @@ -91,11 +92,23 @@ public class RunMongoAggregation extends AbstractMongoProcessor { .addValidator(JsonValidator.INSTANCE) .build(); + static final PropertyDescriptor ALLOW_DISK_USE = new PropertyDescriptor.Builder() + .name("allow-disk-use") + .displayName("Allow Disk Use") + .description("Set this to true to enable writing data to temporary files to prevent exceeding the " + + "maximum memory use limit during aggregation pipeline staged when handling large datasets.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + static { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(CHARSET); _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(ALLOW_DISK_USE); _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(QUERY_ATTRIBUTE); _propertyDescriptors.add(BATCH_SIZE); @@ -145,6 +158,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { } final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final Boolean allowDiskUse = context.getProperty(ALLOW_DISK_USE).asBoolean(); final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); @@ -163,7 +177,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { try { MongoCollection collection = getCollection(context, flowFile); List aggQuery = buildAggregationQuery(query); - AggregateIterable it = collection.aggregate(aggQuery); + AggregateIterable it = collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);; it.batchSize(batchSize != null ? batchSize : 1); iter = it.iterator();