NIFI-5333 Added GetMongoRecord.

Signed-off-by: zenfenan <zenfenan@apache.org>

This closes #3011
This commit is contained in:
Mike Thomsen 2018-09-02 15:58:33 -04:00 committed by zenfenan
parent e34d653ba1
commit e603c486f4
8 changed files with 602 additions and 111 deletions

View File

@ -602,6 +602,11 @@ public class DataTypeUtils {
return null;
}
if (value instanceof java.util.Date) {
java.util.Date _temp = (java.util.Date)value;
return new Date(_temp.getTime());
}
if (value instanceof Date) {
return (Date) value;
}

View File

@ -101,6 +101,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.mongodb.client.MongoCollection;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Map;
public abstract class AbstractMongoQueryProcessor extends AbstractMongoProcessor {
public static final String DB_NAME = "mongo.database.name";
public static final String COL_NAME = "mongo.collection.name";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that have the results of a successful query execution go here.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All input FlowFiles that are part of a failed query execution go here.")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("All input FlowFiles that are part of a successful query execution go here.")
.build();
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query")
.description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
" an incoming connection from another processor to provide the query as a valid JSON document inside of " +
"the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
"that will result in a full collection fetch using a \"{}\" query.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
.name("Projection")
.description("The fields to be returned from the documents in the result set; must be a valid BSON document")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("Sort")
.description("The fields by which to sort; must be a valid BSON document")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("Limit")
.description("The maximum number of elements to return")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of elements to be returned from the server in one batch")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("results-per-flowfile")
.displayName("Results Per FlowFile")
.description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
protected Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
Document query = null;
if (context.getProperty(QUERY).isSet()) {
query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
} else if (!context.getProperty(QUERY).isSet() && input == null) {
query = Document.parse("{}");
} else {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
query = Document.parse(new String(out.toByteArray()));
} catch (Exception ex) {
getLogger().error("Error reading FlowFile : ", ex);
if (input != null) { //Likely culprit is a bad query
session.transfer(input, REL_FAILURE);
session.commit();
} else {
throw new ProcessException(ex);
}
}
}
return query;
}
protected Map<String, String> getAttributes(ProcessContext context, FlowFile input, Document query, MongoCollection collection) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
attributes.put(queryAttr, query.toJson());
}
attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
return attributes;
}
}

View File

@ -32,25 +32,19 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import org.bson.json.JsonWriterSettings;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -63,75 +57,8 @@ import java.util.Set;
@WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."),
@WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
})
public class GetMongo extends AbstractMongoProcessor {
static final String DB_NAME = "mongo.database.name";
static final String COL_NAME = "mongo.collection.name";
public class GetMongo extends AbstractMongoQueryProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that have the results of a successful query execution go here.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All input FlowFiles that are part of a failed query execution go here.")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("All input FlowFiles that are part of a successful query execution go here.")
.build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query")
.description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
" an incoming connection from another processor to provide the query as a valid JSON document inside of " +
"the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
"that will result in a full collection fetch using a \"{}\" query.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
.name("Projection")
.description("The fields to be returned from the documents in the result set; must be a valid BSON document")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("Sort")
.description("The fields by which to sort; must be a valid BSON document")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("Limit")
.description("The maximum number of elements to return")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of elements to be returned from the server in one batch")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("results-per-flowfile")
.displayName("Results Per FlowFile")
.description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final AllowableValue YES_PP = new AllowableValue("true", "True");
static final AllowableValue NO_PP = new AllowableValue("false", "False");
@ -231,14 +158,7 @@ public class GetMongo extends AbstractMongoProcessor {
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
attributes.put(queryAttr, query.toJson());
}
final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
@ -250,9 +170,7 @@ public class GetMongo extends AbstractMongoProcessor {
final MongoCollection<Document> collection = getCollection(context, input);
final FindIterable<Document> it = collection.find(query);
attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
final Map<String, String> attributes = getAttributes(context, input, query, collection);
if (projection != null) {
it.projection(projection);
@ -319,31 +237,4 @@ public class GetMongo extends AbstractMongoProcessor {
}
}
private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
Document query = null;
if (context.getProperty(QUERY).isSet()) {
query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
} else if (!context.getProperty(QUERY).isSet() && input == null) {
query = Document.parse("{}");
} else {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
query = Document.parse(new String(out.toByteArray()));
} catch (Exception ex) {
logger.error("Error reading FlowFile : ", ex);
if (input != null) { //Likely culprit is a bad query
session.transfer(input, REL_FAILURE);
session.commit();
} else {
throw new ProcessException(ex);
}
}
}
return query;
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.bson.Document;
import org.bson.types.ObjectId;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@CapabilityDescription("A record-based version of GetMongo that uses the Record writers to write the MongoDB result set.")
@Tags({"mongo", "mongodb", "get", "fetch", "record", "json"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@WritesAttributes({
@WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."),
@WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
})
public class GetMongoRecord extends AbstractMongoQueryProcessor {
public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
.name("get-mongo-record-writer-factory")
.displayName("Record Writer")
.description("The record writer to use to write the result sets.")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("mongodb-schema-name")
.displayName("Schema Name")
.description("The name of the schema in the configured schema registry to use for the query results.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue("${schema.name}")
.required(true)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS;
private static final Set<Relationship> RELATIONSHIPS;
static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.add(CLIENT_SERVICE);
_temp.add(WRITER_FACTORY);
_temp.add(DATABASE_NAME);
_temp.add(COLLECTION_NAME);
_temp.add(SCHEMA_NAME);
_temp.add(QUERY_ATTRIBUTE);
_temp.add(QUERY);
_temp.add(PROJECTION);
_temp.add(SORT);
_temp.add(LIMIT);
_temp.add(BATCH_SIZE);
DESCRIPTORS = Collections.unmodifiableList(_temp);
Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_ORIGINAL);
RELATIONSHIPS = Collections.unmodifiableSet(_rels);
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
private volatile MongoDBClientService clientService;
private volatile RecordSetWriterFactory writerFactory;
@OnScheduled
public void onEnabled(ProcessContext context) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
writerFactory = context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
final String database = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(input).getValue();
final String collection = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(input).getValue();
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
final Document query = getQuery(context, session, input);
MongoCollection mongoCollection = clientService.getDatabase(database).getCollection(collection);
FindIterable<Document> find = mongoCollection.find(query);
if (context.getProperty(SORT).isSet()) {
find = find.sort(Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()));
}
if (context.getProperty(PROJECTION).isSet()) {
find = find.projection(Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()));
}
if (context.getProperty(LIMIT).isSet()) {
find = find.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
}
MongoCursor<Document> cursor = find.iterator();
FlowFile output = input != null ? session.create(input) : session.create();
final FlowFile inputPtr = input;
try {
final Map<String, String> attributes = getAttributes(context, input, query, mongoCollection);
try (OutputStream out = session.write(output)) {
Map<String, String> attrs = inputPtr != null ? inputPtr.getAttributes() : new HashMap<String, String>(){{
put("schema.name", schemaName);
}};
RecordSchema schema = writerFactory.getSchema(attrs, null);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
long count = 0L;
writer.beginRecordSet();
while (cursor.hasNext()) {
Document next = cursor.next();
if (next.get("_id") instanceof ObjectId) {
next.put("_id", next.get("_id").toString());
}
Record record = new MapRecord(schema, next);
writer.write(record);
count++;
}
writer.finishRecordSet();
writer.close();
out.close();
attributes.put("record.count", String.valueOf(count));
} catch (SchemaNotFoundException e) {
throw new RuntimeException(e);
}
output = session.putAllAttributes(output, attributes);
session.getProvenanceReporter().fetch(output, getURI(context));
session.transfer(output, REL_SUCCESS);
if (input != null) {
session.transfer(input, REL_ORIGINAL);
}
} catch (Exception ex) {
ex.printStackTrace();
getLogger().error("Error writing record set from Mongo query.", ex);
session.remove(output);
if (input != null) {
session.transfer(input, REL_FAILURE);
}
}
}
}

View File

@ -15,6 +15,7 @@
org.apache.nifi.processors.mongodb.DeleteMongo
org.apache.nifi.processors.mongodb.GetMongo
org.apache.nifi.processors.mongodb.GetMongoRecord
org.apache.nifi.processors.mongodb.RunMongoAggregation
org.apache.nifi.processors.mongodb.PutMongo
org.apache.nifi.processors.mongodb.PutMongoRecord

View File

@ -0,0 +1,59 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>GetMongoRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This processor runs queries against a MongoDB instance or cluster and writes the results to a flowfile. It allows
input, but can run standalone as well. It is a record-aware version of the <em>GetMongo</em> processor.
</p>
<h2>Specifying the Query</h2>
<p>
The query can be specified in one of three ways:
</p>
<ul>
<li>Query configuration property.</li>
<li>Query Attribute configuration property.</li>
<li>FlowFile content.</li>
</ul>
<p>
If a value is specified in either of the configuration properties, it will not look in the FlowFile content for a
query.
</p>
<h2>Limiting/Shaping Results</h2>
<p>
The following options for limiting/shaping results are available:
</p>
<ul>
<li>Limit - limit the number of results. This should not be confused with the "batch size" option which is a
setting for the underlying MongoDB driver to tell it how many items to retrieve in each poll of the server.</li>
<li>Sort - sort the result set. Requires a JSON document like <em>{ "someDate": -1 }</em></li>
<li>Projection - control which fields to return. Exampe, which would remove <em>_id</em>: <em>{ "_id": 0 }</em></li>
</ul>
<h2>Misc Options</h2>
<p>
Results Per FlowFile, if set, creates a JSON array out of a batch of results and writes the result to the output.
Pretty Print, if enabled, will format the JSON data to be easy read by a human (ex. proper indentation of fields).
</p>
</body>
</html>

View File

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb
import groovy.json.JsonSlurper
import org.apache.nifi.flowfile.attributes.CoreAttributes
import org.apache.nifi.json.JsonRecordSetWriter
import org.apache.nifi.mongodb.MongoDBClientService
import org.apache.nifi.mongodb.MongoDBControllerService
import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.DateTimeUtils
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.*
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bson.Document
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import static groovy.json.JsonOutput.*
class GetMongoRecordIT {
TestRunner runner
MongoDBClientService service
static RecordSchema SCHEMA
static final String DB_NAME = GetMongoRecord.class.simpleName + Calendar.instance.timeInMillis
static final String COL_NAME = "test"
static final String URI = "mongodb://localhost:27017"
static {
def fields = [
new RecordField("name", RecordFieldType.STRING.dataType),
new RecordField("failedLogins", RecordFieldType.INT.dataType),
new RecordField("lastLogin", RecordFieldType.DATE.dataType)
]
SCHEMA = new SimpleRecordSchema(fields, new StandardSchemaIdentifier.Builder().name("sample").build())
}
static final List<Map> SAMPLES = [
[ name: "John Smith", failedLogins: 2, lastLogin: Calendar.instance.time ],
[ name: "Jane Doe", failedLogins: 1, lastLogin: Calendar.instance.time - 360000 ],
[ name: "John Brown", failedLogins: 4, lastLogin: Calendar.instance.time - 10000 ]
].collect { new Document(it) }
@Before
void setup() {
runner = TestRunners.newTestRunner(GetMongoRecord.class)
service = new MongoDBControllerService()
runner.addControllerService("client", service)
runner.setProperty(service, MongoDBControllerService.URI, URI)
runner.enableControllerService(service)
def writer = new JsonRecordSetWriter()
def registry = new MockSchemaRegistry()
registry.addSchema("sample", SCHEMA)
runner.addControllerService("writer", writer)
runner.addControllerService("registry", registry)
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
runner.setProperty(writer, DateTimeUtils.DATE_FORMAT, "yyyy")
runner.enableControllerService(registry)
runner.enableControllerService(writer)
runner.setProperty(GetMongoRecord.DATABASE_NAME, DB_NAME)
runner.setProperty(GetMongoRecord.COLLECTION_NAME, COL_NAME)
runner.setProperty(GetMongoRecord.CLIENT_SERVICE, "client")
runner.setProperty(GetMongoRecord.WRITER_FACTORY, "writer")
service.getDatabase(DB_NAME).getCollection(COL_NAME).insertMany(SAMPLES)
}
@After
void after() {
service.getDatabase(DB_NAME).drop()
}
@Test
void testLookup() {
def ffValidator = { TestRunner runner ->
def ffs = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)
Assert.assertNotNull(ffs)
Assert.assertTrue(ffs.size() == 1)
Assert.assertEquals("3", ffs[0].getAttribute("record.count"))
Assert.assertEquals("application/json", ffs[0].getAttribute(CoreAttributes.MIME_TYPE.key()))
Assert.assertEquals(COL_NAME, ffs[0].getAttribute(GetMongoRecord.COL_NAME))
Assert.assertEquals(DB_NAME, ffs[0].getAttribute(GetMongoRecord.DB_NAME))
Assert.assertEquals(Document.parse("{}"), Document.parse(ffs[0].getAttribute("executed.query")))
}
runner.setProperty(GetMongoRecord.QUERY_ATTRIBUTE, "executed.query")
runner.setProperty(GetMongoRecord.QUERY, "{}")
runner.enqueue("", [ "schema.name": "sample"])
runner.run()
runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
ffValidator(runner)
runner.clearTransferState()
runner.removeProperty(GetMongoRecord.QUERY)
runner.enqueue("{}", [ "schema.name": "sample"])
runner.run()
runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
ffValidator(runner)
}
@Test
void testSortAndProjection() {
runner.setIncomingConnection(false)
runner.setVariable("schema.name", "sample")
runner.setProperty(GetMongoRecord.SORT, toJson([failedLogins: 1]))
runner.setProperty(GetMongoRecord.PROJECTION, toJson([failedLogins: 1]))
runner.setProperty(GetMongoRecord.QUERY, "{}")
runner.run()
def parsed = sharedTest()
Assert.assertEquals(3, parsed.size())
def values = [1, 2, 4]
int index = 0
parsed.each {
Assert.assertEquals(values[index++], it["failedLogins"])
Assert.assertNull(it["name"])
Assert.assertNull(it["lastLogin"])
}
}
List<Map<String, Object>> sharedTest() {
runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
def ff = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)[0]
def raw = runner.getContentAsByteArray(ff)
String content = new String(raw)
def parsed = new JsonSlurper().parseText(content)
Assert.assertNotNull(parsed)
parsed
}
@Test
void testLimit() {
runner.setIncomingConnection(false)
runner.setProperty(GetMongoRecord.LIMIT, "1")
runner.setProperty(GetMongoRecord.QUERY, "{}")
runner.setVariable("schema.name", "sample")
runner.run()
def parsed = sharedTest()
Assert.assertEquals(1, parsed.size())
}
}