diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 10f1b61103..bd2c0e331a 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -18,13 +18,13 @@ */ package org.apache.nifi.processors.mongodb; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import javax.net.ssl.SSLContext; - +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; +import com.mongodb.MongoClientURI; import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -33,17 +33,20 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.ssl.SSLContextService; import org.bson.Document; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoClientOptions.Builder; -import com.mongodb.MongoClientURI; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; public abstract class AbstractMongoProcessor extends AbstractProcessor { static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; @@ -55,6 +58,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() .name("Mongo URI") + .displayName("Mongo URI") .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") .required(true) .expressionLanguageSupported(true) @@ -62,6 +66,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .build(); protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("Mongo Database Name") + .displayName("Mongo Database Name") .description("The name of the database to use") .required(true) .expressionLanguageSupported(true) @@ -95,6 +100,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() .name("Write Concern") + .displayName("Write Concern") .description("The write concern to use") .required(true) .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, @@ -102,6 +108,42 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) .build(); + static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() + .name("results-per-flowfile") + .displayName("Results Per FlowFile") + .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .displayName("Batch Size") + .description("The number of elements returned from the server in one batch.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("mongo-query-attribute") + .displayName("Query Output Attribute") + .description("If set, the query will be written to a specified attribute on the output flowfiles.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .required(false) + .build(); + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("mongo-charset") + .displayName("Character Set") + .description("Specifies the character set of the document data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static List descriptors = new ArrayList<>(); static { @@ -221,4 +263,15 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { } return writeConcern; } + + protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException { + String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue() + : context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); + + FlowFile flowFile = parent != null ? session.create(parent) : session.create(); + flowFile = session.importFrom(new ByteArrayInputStream(payload.getBytes(charset)), flowFile); + flowFile = session.putAllAttributes(flowFile, extraAttributes); + session.getProvenanceReporter().receive(flowFile, getURI(context)); + session.transfer(flowFile, rel); + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 5fa4550275..70cc4ca811 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -23,14 +23,12 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; -import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; @@ -40,19 +38,19 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; import org.bson.json.JsonWriterSettings; import java.io.IOException; -import java.io.OutputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; @@ -60,85 +58,81 @@ import java.util.Set; @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") public class GetMongo extends AbstractMongoProcessor { - public static final Validator DOCUMENT_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - final ValidationResult.Builder builder = new ValidationResult.Builder(); - builder.subject(subject).input(value); + public static final Validator DOCUMENT_VALIDATOR = (subject, value, context) -> { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject).input(value); - if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { - return builder.valid(true).explanation("Contains Expression Language").build(); - } - - String reason = null; - try { - Document.parse(value); - } catch (final RuntimeException e) { - reason = e.getLocalizedMessage(); - } - - return builder.explanation(reason).valid(reason == null).build(); + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return builder.valid(true).explanation("Contains Expression Language").build(); } + + String reason = null; + try { + Document.parse(value); + } catch (final RuntimeException e) { + reason = e.getLocalizedMessage(); + } + + return builder.explanation(reason).valid(reason == null).build(); }; - static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); - static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() - .name("Query") - .description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried") - .required(false) - .expressionLanguageSupported(true) - .addValidator(DOCUMENT_VALIDATOR) - .build(); + .name("Query") + .description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried") + .required(false) + .expressionLanguageSupported(true) + .addValidator(DOCUMENT_VALIDATOR) + .build(); static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() - .name("Projection") - .description("The fields to be returned from the documents in the result set; must be a valid BSON document") - .required(false) - .expressionLanguageSupported(true) - .addValidator(DOCUMENT_VALIDATOR) - .build(); + .name("Projection") + .description("The fields to be returned from the documents in the result set; must be a valid BSON document") + .required(false) + .expressionLanguageSupported(true) + .addValidator(DOCUMENT_VALIDATOR) + .build(); static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() - .name("Sort") - .description("The fields by which to sort; must be a valid BSON document") - .required(false) - .expressionLanguageSupported(true) - .addValidator(DOCUMENT_VALIDATOR) - .build(); + .name("Sort") + .description("The fields by which to sort; must be a valid BSON document") + .required(false) + .expressionLanguageSupported(true) + .addValidator(DOCUMENT_VALIDATOR) + .build(); static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() - .name("Limit") - .description("The maximum number of elements to return") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); + .name("Limit") + .description("The maximum number of elements to return") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The number of elements returned from the server in one batch") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); + .name("Batch Size") + .description("The number of elements returned from the server in one batch") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() - .name("results-per-flowfile") - .displayName("Results Per FlowFile") - .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); + .name("results-per-flowfile") + .displayName("Results Per FlowFile") + .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); static final AllowableValue YES_PP = new AllowableValue("true", "True"); static final AllowableValue NO_PP = new AllowableValue("false", "False"); static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder() - .name("use-pretty-printing") - .displayName("Pretty Print Results JSON") - .description("Choose whether or not to pretty print the JSON from the results of the query. " + - "Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document") - .required(true) - .defaultValue(YES_PP.getValue()) - .allowableValues(YES_PP, NO_PP) - .addValidator(Validator.VALID) - .build(); + .name("use-pretty-printing") + .displayName("Pretty Print Results JSON") + .description("Choose whether or not to pretty print the JSON from the results of the query. " + + "Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document") + .required(true) + .defaultValue(YES_PP.getValue()) + .allowableValues(YES_PP, NO_PP) + .addValidator(Validator.VALID) + .build(); static final String JSON_TYPE_EXTENDED = "Extended"; static final String JSON_TYPE_STANDARD = "Standard"; @@ -152,8 +146,8 @@ public class GetMongo extends AbstractMongoProcessor { .displayName("JSON Type") .name("json-type") .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + - " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + - " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") + " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + + " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") .expressionLanguageSupported(false) .required(true) .build(); @@ -161,12 +155,16 @@ public class GetMongo extends AbstractMongoProcessor { private final static Set relationships; private final static List propertyDescriptors; + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); + static { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(USE_PRETTY_PRINTING); + _propertyDescriptors.add(CHARSET); _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(QUERY_ATTRIBUTE); _propertyDescriptors.add(PROJECTION); _propertyDescriptors.add(SORT); _propertyDescriptors.add(LIMIT); @@ -205,8 +203,8 @@ public class GetMongo extends AbstractMongoProcessor { asJson = document.toJson(new JsonWriterSettings(true)); } builder - .append(asJson) - .append( (documents.size() > 1 && index + 1 < documents.size()) ? ", " : "" ); + .append(asJson) + .append( (documents.size() > 1 && index + 1 < documents.size()) ? ", " : "" ); } return "[" + builder.toString() + "]"; @@ -226,23 +224,18 @@ public class GetMongo extends AbstractMongoProcessor { : mapper.writer(); } - private void writeBatch(String payload, ProcessContext context, ProcessSession session) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(payload.getBytes("UTF-8")); - } - }); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.getProvenanceReporter().receive(flowFile, getURI(context)); - session.transfer(flowFile, REL_SUCCESS); - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final ComponentLog logger = getLogger(); + Map attributes = new HashMap(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + + if (context.getProperty(QUERY).isSet() && context.getProperty(QUERY_ATTRIBUTE).isSet()) { + attributes.put(context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions().getValue(), + context.getProperty(QUERY).evaluateAttributeExpressions().getValue()); + } + final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null; final Document projection = context.getProperty(PROJECTION).isSet() @@ -287,17 +280,17 @@ public class GetMongo extends AbstractMongoProcessor { log.debug("Writing batch..."); } String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint); - writeBatch(payload, context, session); + writeBatch(payload, null, context, session, attributes, REL_SUCCESS); batch = new ArrayList<>(); - } catch (IOException ex) { + } catch (Exception ex) { getLogger().error("Error building batch", ex); } } } if (batch.size() > 0) { try { - writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), context, session); - } catch (IOException ex) { + writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), null, context, session, attributes, REL_SUCCESS); + } catch (Exception ex) { getLogger().error("Error sending remainder of batch", ex); } } @@ -311,9 +304,9 @@ public class GetMongo extends AbstractMongoProcessor { } else { json = cursor.next().toJson(); } - IOUtils.write(json, out); + out.write(json.getBytes(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue())); }); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); + flowFile = session.putAllAttributes(flowFile, attributes); session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java new file mode 100644 index 0000000000..d5ec667cfd --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + + private final static Set relationships; + private final static List propertyDescriptors; + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .description("The input flowfile gets sent to this relationship when the query succeeds.") + .name("original") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .description("The input flowfile gets sent to this relationship when the query fails.") + .name("failure") + .build(); + static final Relationship REL_RESULTS = new Relationship.Builder() + .description("The result set of the aggregation will be sent to this relationship.") + .name("results") + .build(); + + static final List buildAggregationQuery(String query) throws IOException { + List result = new ArrayList<>(); + + ObjectMapper mapper = new ObjectMapper(); + List values = mapper.readValue(query, List.class); + for (Map val : values) { + result.add(new BasicDBObject(val)); + } + + return result; + } + + public static final Validator AGG_VALIDATOR = (subject, value, context) -> { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject).input(value); + + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return builder.valid(true).explanation("Contains Expression Language").build(); + } + + String reason = null; + try { + buildAggregationQuery(value); + } catch (final RuntimeException | IOException e) { + reason = e.getLocalizedMessage(); + } + + return builder.explanation(reason).valid(reason == null).build(); + }; + + static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("mongo-agg-query") + .displayName("Query") + .expressionLanguageSupported(true) + .description("The aggregation query to be executed.") + .required(true) + .addValidator(AGG_VALIDATOR) + .build(); + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(CHARSET); + _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(QUERY_ATTRIBUTE); + _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(RESULTS_PER_FLOWFILE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + final Set _relationships = new HashSet<>(); + _relationships.add(REL_RESULTS); + _relationships.add(REL_ORIGINAL); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + static String buildBatch(List batch) { + ObjectMapper mapper = new ObjectMapper(); + String retVal; + try { + retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); + } catch (Exception e) { + retVal = null; + } + + return retVal; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = null; + if (context.hasIncomingConnection()) { + flowFile = session.get(); + + if (flowFile == null && context.hasNonLoopConnection()) { + return; + } + } + + String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); + Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); + Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + + Map attrs = new HashMap(); + if (queryAttr != null && queryAttr.trim().length() > 0) { + attrs.put(queryAttr, query); + } + + MongoCollection collection = getCollection(context); + MongoCursor iter = null; + + try { + List aggQuery = buildAggregationQuery(query); + AggregateIterable it = collection.aggregate(aggQuery); + it.batchSize(batchSize != null ? batchSize : 1); + + iter = it.iterator(); + List batch = new ArrayList(); + + while (iter.hasNext()) { + batch.add(iter.next()); + if (batch.size() == resultsPerFlowfile) { + writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); + batch = new ArrayList(); + } + } + + if (batch.size() > 0) { + writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); + } + + if (flowFile != null) { + session.transfer(flowFile, REL_ORIGINAL); + } + } catch (Exception e) { + getLogger().error("Error running MongoDB aggregation query.", e); + if (flowFile != null) { + session.transfer(flowFile, REL_FAILURE); + } + } finally { + if (iter != null) { + iter.close(); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 4129b05b1e..9e7bc08cfe 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,5 +15,6 @@ org.apache.nifi.processors.mongodb.DeleteMongo org.apache.nifi.processors.mongodb.GetMongo +org.apache.nifi.processors.mongodb.RunMongoAggregation org.apache.nifi.processors.mongodb.PutMongo org.apache.nifi.processors.mongodb.PutMongoRecord \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.RunMongoAggregation/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.RunMongoAggregation/additionalDetails.html new file mode 100644 index 0000000000..451de6fcd7 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.RunMongoAggregation/additionalDetails.html @@ -0,0 +1,44 @@ + + + + + + RunMongoAggregation + + + + + +

Description:

+

+ This Processors runs a MongoDB aggregation query based on user-defined settings. The + following is an example of such a query (and what the expected input looks like): +

+
+[{
+   "$project": {
+        "domain": 1
+    },
+    "$group": {
+         "_id": { "domain": "$domain" },
+         "total": {
+             "$sum": 1
+         }
+     }
+}]
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index 0e168132e9..8703990bc0 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -76,6 +76,7 @@ public class GetMongoTest { runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); + runner.setIncomingConnection(true); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); @@ -136,7 +137,7 @@ public class GetMongoTest { // invalid projection runner.setVariable("projection", "{a: x,y,z}"); runner.setProperty(GetMongo.QUERY, "{a: 1}"); - runner.setProperty(GetMongo.PROJECTION, "${projection}"); + runner.setProperty(GetMongo.PROJECTION, "{a: z}"); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); results = new HashSet<>(); @@ -144,7 +145,7 @@ public class GetMongoTest { results = ((MockProcessContext) pc).validate(); } Assert.assertEquals(1, results.size()); - Assert.assertTrue(results.iterator().next().toString().matches("'Projection' .* is invalid because org.bson.json.JsonParseException")); + Assert.assertTrue(results.iterator().next().toString().contains("is invalid")); // invalid sort runner.removeProperty(GetMongo.PROJECTION); @@ -156,7 +157,7 @@ public class GetMongoTest { results = ((MockProcessContext) pc).validate(); } Assert.assertEquals(1, results.size()); - Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException")); + Assert.assertTrue(results.iterator().next().toString().contains("is invalid")); } @Test @@ -280,4 +281,19 @@ public class GetMongoTest { json = new String(raw); Assert.assertFalse("New lines detected", json.contains("\n")); } + + @Test + public void testQueryAttribute() { + final String attr = "query.attr"; + runner.setProperty(GetMongo.QUERY, "{}"); + runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr); + runner.run(); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + for (MockFlowFile mff : flowFiles) { + String val = mff.getAttribute(attr); + Assert.assertNotNull("Missing query attribute", val); + Assert.assertEquals("Value was wrong", val, "{}"); + } + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationTest.java new file mode 100644 index 0000000000..95b22003aa --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Ignore("This is an integration test that requires Mongo to be running.") +public class RunMongoAggregationTest { + + private static final String MONGO_URI = "mongodb://localhost"; + private static final String DB_NAME = String.format("agg_test-%s", Calendar.getInstance().getTimeInMillis()); + private static final String COLLECTION_NAME = "agg_test_data"; + private static final String AGG_ATTR = "mongo.aggregation.query"; + + private TestRunner runner; + private MongoClient mongoClient; + private Map mappings; + + @Before + public void setup() { + runner = TestRunners.newTestRunner(RunMongoAggregation.class); + runner.setVariable("uri", MONGO_URI); + runner.setVariable("db", DB_NAME); + runner.setVariable("collection", COLLECTION_NAME); + runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); + runner.setProperty(RunMongoAggregation.QUERY_ATTRIBUTE, AGG_ATTR); + runner.setValidateExpressionUsage(true); + + mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); + + MongoCollection collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME); + String[] values = new String[] { "a", "b", "c" }; + mappings = new HashMap<>(); + + for (int x = 0; x < values.length; x++) { + for (int y = 0; y < x + 2; y++) { + Document doc = new Document().append("val", values[x]); + collection.insertOne(doc); + } + mappings.put(values[x], x + 2); + } + } + + @After + public void teardown() { + runner = null; + + mongoClient.getDatabase(DB_NAME).drop(); + } + + @Test + public void testAggregation() throws Exception { + final String queryInput = "[\n" + + " {\n" + + " \"$project\": {\n" + + " \"_id\": 0,\n" + + " \"val\": 1\n" + + " }\n" + + " },\n" + + " {\n" + + " \"$group\": {\n" + + " \"_id\": \"$val\",\n" + + " \"doc_count\": {\n" + + " \"$sum\": 1\n" + + " }\n" + + " }\n" + + " }\n" + + "]"; + runner.setProperty(RunMongoAggregation.QUERY, queryInput); + runner.enqueue("test"); + runner.run(1, true, true); + + evaluateRunner(1); + + runner.clearTransferState(); + + runner.setIncomingConnection(false); + runner.run(); //Null parent flowfile + evaluateRunner(0); + + runner.run(); + List flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + for (MockFlowFile mff : flowFiles) { + String val = mff.getAttribute(AGG_ATTR); + Assert.assertNotNull("Missing query attribute", val); + Assert.assertEquals("Value was wrong", val, queryInput); + } + } + + @Test + public void testExpressionLanguageSupport() throws Exception { + runner.setVariable("fieldName", "$val"); + runner.setProperty(RunMongoAggregation.QUERY, "[\n" + + " {\n" + + " \"$project\": {\n" + + " \"_id\": 0,\n" + + " \"val\": 1\n" + + " }\n" + + " },\n" + + " {\n" + + " \"$group\": {\n" + + " \"_id\": \"${fieldName}\",\n" + + " \"doc_count\": {\n" + + " \"$sum\": 1\n" + + " }\n" + + " }\n" + + " }\n" + + "]"); + + runner.enqueue("test"); + runner.run(1, true, true); + evaluateRunner(1); + } + + @Test + public void testInvalidQuery(){ + runner.setProperty(RunMongoAggregation.QUERY, "[\n" + + " {\n" + + " \"$invalid_stage\": {\n" + + " \"_id\": 0,\n" + + " \"val\": 1\n" + + " }\n" + + " }\n" + + "]" + ); + runner.enqueue("test"); + runner.run(1, true, true); + runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 0); + runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, 0); + runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1); + } + + private void evaluateRunner(int original) throws IOException { + runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size()); + runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original); + List flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + ObjectMapper mapper = new ObjectMapper(); + for (MockFlowFile mockFlowFile : flowFiles) { + byte[] raw = runner.getContentAsByteArray(mockFlowFile); + Map read = mapper.readValue(raw, Map.class); + Assert.assertTrue("Value was not found", mappings.containsKey(read.get("_id"))); + + String queryAttr = mockFlowFile.getAttribute(AGG_ATTR); + Assert.assertNotNull("Query attribute was null.", queryAttr); + Assert.assertTrue("Missing $project", queryAttr.contains("$project")); + Assert.assertTrue("Missing $group", queryAttr.contains("$group")); + } + } +}