NIFI-4429 Added GetMongoAggregation to support running Mongo aggregations.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2180
This commit is contained in:
Mike Thomsen 2018-02-20 12:46:00 -04:00 committed by Matthew Burgess
parent 160bea8bf3
commit 54b1659704
7 changed files with 610 additions and 107 deletions

View File

@ -18,13 +18,13 @@
*/ */
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import java.io.IOException; import com.mongodb.MongoClient;
import java.util.ArrayList; import com.mongodb.MongoClientOptions;
import java.util.List; import com.mongodb.MongoClientOptions.Builder;
import com.mongodb.MongoClientURI;
import javax.net.ssl.SSLContext;
import com.mongodb.WriteConcern; import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
@ -33,17 +33,20 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document; import org.bson.Document;
import com.mongodb.MongoClient; import javax.net.ssl.SSLContext;
import com.mongodb.MongoClientOptions; import java.io.ByteArrayInputStream;
import com.mongodb.MongoClientOptions.Builder; import java.io.IOException;
import com.mongodb.MongoClientURI; import java.io.UnsupportedEncodingException;
import com.mongodb.client.MongoCollection; import java.util.ArrayList;
import com.mongodb.client.MongoDatabase; import java.util.List;
import java.util.Map;
public abstract class AbstractMongoProcessor extends AbstractProcessor { public abstract class AbstractMongoProcessor extends AbstractProcessor {
static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
@ -55,6 +58,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
.name("Mongo URI") .name("Mongo URI")
.displayName("Mongo URI")
.description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
@ -62,6 +66,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.build(); .build();
protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("Mongo Database Name") .name("Mongo Database Name")
.displayName("Mongo Database Name")
.description("The name of the database to use") .description("The name of the database to use")
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
@ -95,6 +100,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
.name("Write Concern") .name("Write Concern")
.displayName("Write Concern")
.description("The write concern to use") .description("The write concern to use")
.required(true) .required(true)
.allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
@ -102,6 +108,42 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.defaultValue(WRITE_CONCERN_ACKNOWLEDGED) .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
.build(); .build();
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("results-per-flowfile")
.displayName("Results Per FlowFile")
.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.displayName("Batch Size")
.description("The number of elements returned from the server in one batch.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.build();
static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("mongo-query-attribute")
.displayName("Query Output Attribute")
.description("If set, the query will be written to a specified attribute on the output flowfiles.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.required(false)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("mongo-charset")
.displayName("Character Set")
.description("Specifies the character set of the document data.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static List<PropertyDescriptor> descriptors = new ArrayList<>(); static List<PropertyDescriptor> descriptors = new ArrayList<>();
static { static {
@ -221,4 +263,15 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
} }
return writeConcern; return writeConcern;
} }
protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException {
String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue()
: context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
FlowFile flowFile = parent != null ? session.create(parent) : session.create();
flowFile = session.importFrom(new ByteArrayInputStream(payload.getBytes(charset)), flowFile);
flowFile = session.putAllAttributes(flowFile, extraAttributes);
session.getProvenanceReporter().receive(flowFile, getURI(context));
session.transfer(flowFile, rel);
}
} }

View File

@ -23,14 +23,12 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.mongodb.client.FindIterable; import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoCursor;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -40,19 +38,19 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document; import org.bson.Document;
import org.bson.json.JsonWriterSettings; import org.bson.json.JsonWriterSettings;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
@ -60,9 +58,7 @@ import java.util.Set;
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Creates FlowFiles from documents in MongoDB") @CapabilityDescription("Creates FlowFiles from documents in MongoDB")
public class GetMongo extends AbstractMongoProcessor { public class GetMongo extends AbstractMongoProcessor {
public static final Validator DOCUMENT_VALIDATOR = new Validator() { public static final Validator DOCUMENT_VALIDATOR = (subject, value, context) -> {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
final ValidationResult.Builder builder = new ValidationResult.Builder(); final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject).input(value); builder.subject(subject).input(value);
@ -78,11 +74,8 @@ public class GetMongo extends AbstractMongoProcessor {
} }
return builder.explanation(reason).valid(reason == null).build(); return builder.explanation(reason).valid(reason == null).build();
}
}; };
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query") .name("Query")
.description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried") .description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried")
@ -111,6 +104,7 @@ public class GetMongo extends AbstractMongoProcessor {
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size") .name("Batch Size")
.description("The number of elements returned from the server in one batch") .description("The number of elements returned from the server in one batch")
@ -161,12 +155,16 @@ public class GetMongo extends AbstractMongoProcessor {
private final static Set<Relationship> relationships; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
static { static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors); _propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(JSON_TYPE);
_propertyDescriptors.add(USE_PRETTY_PRINTING); _propertyDescriptors.add(USE_PRETTY_PRINTING);
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(QUERY); _propertyDescriptors.add(QUERY);
_propertyDescriptors.add(QUERY_ATTRIBUTE);
_propertyDescriptors.add(PROJECTION); _propertyDescriptors.add(PROJECTION);
_propertyDescriptors.add(SORT); _propertyDescriptors.add(SORT);
_propertyDescriptors.add(LIMIT); _propertyDescriptors.add(LIMIT);
@ -226,23 +224,18 @@ public class GetMongo extends AbstractMongoProcessor {
: mapper.writer(); : mapper.writer();
} }
private void writeBatch(String payload, ProcessContext context, ProcessSession session) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(payload.getBytes("UTF-8"));
}
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().receive(flowFile, getURI(context));
session.transfer(flowFile, REL_SUCCESS);
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
Map attributes = new HashMap();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY).isSet() && context.getProperty(QUERY_ATTRIBUTE).isSet()) {
attributes.put(context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions().getValue(),
context.getProperty(QUERY).evaluateAttributeExpressions().getValue());
}
final Document query = context.getProperty(QUERY).isSet() final Document query = context.getProperty(QUERY).isSet()
? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null; ? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null;
final Document projection = context.getProperty(PROJECTION).isSet() final Document projection = context.getProperty(PROJECTION).isSet()
@ -287,17 +280,17 @@ public class GetMongo extends AbstractMongoProcessor {
log.debug("Writing batch..."); log.debug("Writing batch...");
} }
String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint); String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint);
writeBatch(payload, context, session); writeBatch(payload, null, context, session, attributes, REL_SUCCESS);
batch = new ArrayList<>(); batch = new ArrayList<>();
} catch (IOException ex) { } catch (Exception ex) {
getLogger().error("Error building batch", ex); getLogger().error("Error building batch", ex);
} }
} }
} }
if (batch.size() > 0) { if (batch.size() > 0) {
try { try {
writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), context, session); writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), null, context, session, attributes, REL_SUCCESS);
} catch (IOException ex) { } catch (Exception ex) {
getLogger().error("Error sending remainder of batch", ex); getLogger().error("Error sending remainder of batch", ex);
} }
} }
@ -311,9 +304,9 @@ public class GetMongo extends AbstractMongoProcessor {
} else { } else {
json = cursor.next().toJson(); json = cursor.next().toJson();
} }
IOUtils.write(json, out); out.write(json.getBytes(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue()));
}); });
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, getURI(context)); session.getProvenanceReporter().receive(flowFile, getURI(context));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);

View File

@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.mongodb.BasicDBObject;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.bson.conversions.Bson;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"mongo", "aggregation", "aggregate"})
@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
public class RunMongoAggregation extends AbstractMongoProcessor {
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.description("The input flowfile gets sent to this relationship when the query succeeds.")
.name("original")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.description("The input flowfile gets sent to this relationship when the query fails.")
.name("failure")
.build();
static final Relationship REL_RESULTS = new Relationship.Builder()
.description("The result set of the aggregation will be sent to this relationship.")
.name("results")
.build();
static final List<Bson> buildAggregationQuery(String query) throws IOException {
List<Bson> result = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
List<Map> values = mapper.readValue(query, List.class);
for (Map val : values) {
result.add(new BasicDBObject(val));
}
return result;
}
public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject).input(value);
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return builder.valid(true).explanation("Contains Expression Language").build();
}
String reason = null;
try {
buildAggregationQuery(value);
} catch (final RuntimeException | IOException e) {
reason = e.getLocalizedMessage();
}
return builder.explanation(reason).valid(reason == null).build();
};
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("mongo-agg-query")
.displayName("Query")
.expressionLanguageSupported(true)
.description("The aggregation query to be executed.")
.required(true)
.addValidator(AGG_VALIDATOR)
.build();
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(QUERY);
_propertyDescriptors.add(QUERY_ATTRIBUTE);
_propertyDescriptors.add(BATCH_SIZE);
_propertyDescriptors.add(RESULTS_PER_FLOWFILE);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_RESULTS);
_relationships.add(REL_ORIGINAL);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
static String buildBatch(List batch) {
ObjectMapper mapper = new ObjectMapper();
String retVal;
try {
retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
} catch (Exception e) {
retVal = null;
}
return retVal;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = null;
if (context.hasIncomingConnection()) {
flowFile = session.get();
if (flowFile == null && context.hasNonLoopConnection()) {
return;
}
}
String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
Map attrs = new HashMap();
if (queryAttr != null && queryAttr.trim().length() > 0) {
attrs.put(queryAttr, query);
}
MongoCollection collection = getCollection(context);
MongoCursor iter = null;
try {
List<Bson> aggQuery = buildAggregationQuery(query);
AggregateIterable it = collection.aggregate(aggQuery);
it.batchSize(batchSize != null ? batchSize : 1);
iter = it.iterator();
List batch = new ArrayList();
while (iter.hasNext()) {
batch.add(iter.next());
if (batch.size() == resultsPerFlowfile) {
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
batch = new ArrayList();
}
}
if (batch.size() > 0) {
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
}
if (flowFile != null) {
session.transfer(flowFile, REL_ORIGINAL);
}
} catch (Exception e) {
getLogger().error("Error running MongoDB aggregation query.", e);
if (flowFile != null) {
session.transfer(flowFile, REL_FAILURE);
}
} finally {
if (iter != null) {
iter.close();
}
}
}
}

View File

@ -15,5 +15,6 @@
org.apache.nifi.processors.mongodb.DeleteMongo org.apache.nifi.processors.mongodb.DeleteMongo
org.apache.nifi.processors.mongodb.GetMongo org.apache.nifi.processors.mongodb.GetMongo
org.apache.nifi.processors.mongodb.RunMongoAggregation
org.apache.nifi.processors.mongodb.PutMongo org.apache.nifi.processors.mongodb.PutMongo
org.apache.nifi.processors.mongodb.PutMongoRecord org.apache.nifi.processors.mongodb.PutMongoRecord

View File

@ -0,0 +1,44 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>RunMongoAggregation</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This Processors runs a MongoDB aggregation query based on user-defined settings. The
following is an example of such a query (and what the expected input looks like):
</p>
<pre>
[{
"$project": {
"domain": 1
},
"$group": {
"_id": { "domain": "$domain" },
"total": {
"$sum": 1
}
}
}]
</pre>
</body>
</html>

View File

@ -76,6 +76,7 @@ public class GetMongoTest {
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
runner.setIncomingConnection(true);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
@ -136,7 +137,7 @@ public class GetMongoTest {
// invalid projection // invalid projection
runner.setVariable("projection", "{a: x,y,z}"); runner.setVariable("projection", "{a: x,y,z}");
runner.setProperty(GetMongo.QUERY, "{a: 1}"); runner.setProperty(GetMongo.QUERY, "{a: 1}");
runner.setProperty(GetMongo.PROJECTION, "${projection}"); runner.setProperty(GetMongo.PROJECTION, "{a: z}");
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
pc = runner.getProcessContext(); pc = runner.getProcessContext();
results = new HashSet<>(); results = new HashSet<>();
@ -144,7 +145,7 @@ public class GetMongoTest {
results = ((MockProcessContext) pc).validate(); results = ((MockProcessContext) pc).validate();
} }
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Projection' .* is invalid because org.bson.json.JsonParseException")); Assert.assertTrue(results.iterator().next().toString().contains("is invalid"));
// invalid sort // invalid sort
runner.removeProperty(GetMongo.PROJECTION); runner.removeProperty(GetMongo.PROJECTION);
@ -156,7 +157,7 @@ public class GetMongoTest {
results = ((MockProcessContext) pc).validate(); results = ((MockProcessContext) pc).validate();
} }
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException")); Assert.assertTrue(results.iterator().next().toString().contains("is invalid"));
} }
@Test @Test
@ -280,4 +281,19 @@ public class GetMongoTest {
json = new String(raw); json = new String(raw);
Assert.assertFalse("New lines detected", json.contains("\n")); Assert.assertFalse("New lines detected", json.contains("\n"));
} }
@Test
public void testQueryAttribute() {
final String attr = "query.attr";
runner.setProperty(GetMongo.QUERY, "{}");
runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr);
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (MockFlowFile mff : flowFiles) {
String val = mff.getAttribute(attr);
Assert.assertNotNull("Missing query attribute", val);
Assert.assertEquals("Value was wrong", val, "{}");
}
}
} }

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Ignore("This is an integration test that requires Mongo to be running.")
public class RunMongoAggregationTest {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DB_NAME = String.format("agg_test-%s", Calendar.getInstance().getTimeInMillis());
private static final String COLLECTION_NAME = "agg_test_data";
private static final String AGG_ATTR = "mongo.aggregation.query";
private TestRunner runner;
private MongoClient mongoClient;
private Map<String, Integer> mappings;
@Before
public void setup() {
runner = TestRunners.newTestRunner(RunMongoAggregation.class);
runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DB_NAME);
runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
runner.setProperty(RunMongoAggregation.QUERY_ATTRIBUTE, AGG_ATTR);
runner.setValidateExpressionUsage(true);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
String[] values = new String[] { "a", "b", "c" };
mappings = new HashMap<>();
for (int x = 0; x < values.length; x++) {
for (int y = 0; y < x + 2; y++) {
Document doc = new Document().append("val", values[x]);
collection.insertOne(doc);
}
mappings.put(values[x], x + 2);
}
}
@After
public void teardown() {
runner = null;
mongoClient.getDatabase(DB_NAME).drop();
}
@Test
public void testAggregation() throws Exception {
final String queryInput = "[\n" +
" {\n" +
" \"$project\": {\n" +
" \"_id\": 0,\n" +
" \"val\": 1\n" +
" }\n" +
" },\n" +
" {\n" +
" \"$group\": {\n" +
" \"_id\": \"$val\",\n" +
" \"doc_count\": {\n" +
" \"$sum\": 1\n" +
" }\n" +
" }\n" +
" }\n" +
"]";
runner.setProperty(RunMongoAggregation.QUERY, queryInput);
runner.enqueue("test");
runner.run(1, true, true);
evaluateRunner(1);
runner.clearTransferState();
runner.setIncomingConnection(false);
runner.run(); //Null parent flowfile
evaluateRunner(0);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
for (MockFlowFile mff : flowFiles) {
String val = mff.getAttribute(AGG_ATTR);
Assert.assertNotNull("Missing query attribute", val);
Assert.assertEquals("Value was wrong", val, queryInput);
}
}
@Test
public void testExpressionLanguageSupport() throws Exception {
runner.setVariable("fieldName", "$val");
runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
" {\n" +
" \"$project\": {\n" +
" \"_id\": 0,\n" +
" \"val\": 1\n" +
" }\n" +
" },\n" +
" {\n" +
" \"$group\": {\n" +
" \"_id\": \"${fieldName}\",\n" +
" \"doc_count\": {\n" +
" \"$sum\": 1\n" +
" }\n" +
" }\n" +
" }\n" +
"]");
runner.enqueue("test");
runner.run(1, true, true);
evaluateRunner(1);
}
@Test
public void testInvalidQuery(){
runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
" {\n" +
" \"$invalid_stage\": {\n" +
" \"_id\": 0,\n" +
" \"val\": 1\n" +
" }\n" +
" }\n" +
"]"
);
runner.enqueue("test");
runner.run(1, true, true);
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 0);
runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, 0);
runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1);
}
private void evaluateRunner(int original) throws IOException {
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size());
runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
ObjectMapper mapper = new ObjectMapper();
for (MockFlowFile mockFlowFile : flowFiles) {
byte[] raw = runner.getContentAsByteArray(mockFlowFile);
Map read = mapper.readValue(raw, Map.class);
Assert.assertTrue("Value was not found", mappings.containsKey(read.get("_id")));
String queryAttr = mockFlowFile.getAttribute(AGG_ATTR);
Assert.assertNotNull("Query attribute was null.", queryAttr);
Assert.assertTrue("Missing $project", queryAttr.contains("$project"));
Assert.assertTrue("Missing $group", queryAttr.contains("$group"));
}
}
}