NIFI-4827 Added support for reading queries from the flowfile body to GetMongo.

NIFI-4827 Added changes from code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2443
This commit is contained in:
Mike Thomsen 2018-02-22 13:15:00 -04:00 committed by Matthew Burgess
parent f7fe2da106
commit 3d9c470be3
2 changed files with 220 additions and 30 deletions

View File

@ -42,6 +42,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document; import org.bson.Document;
import org.bson.json.JsonWriterSettings; import org.bson.json.JsonWriterSettings;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -55,7 +56,7 @@ import java.util.Set;
@Tags({ "mongodb", "read", "get" }) @Tags({ "mongodb", "read", "get" })
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_ALLOWED)
@CapabilityDescription("Creates FlowFiles from documents in MongoDB") @CapabilityDescription("Creates FlowFiles from documents in MongoDB")
public class GetMongo extends AbstractMongoProcessor { public class GetMongo extends AbstractMongoProcessor {
public static final Validator DOCUMENT_VALIDATOR = (subject, value, context) -> { public static final Validator DOCUMENT_VALIDATOR = (subject, value, context) -> {
@ -76,13 +77,28 @@ public class GetMongo extends AbstractMongoProcessor {
return builder.explanation(reason).valid(reason == null).build(); return builder.explanation(reason).valid(reason == null).build();
}; };
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All input flowfiles that are part of a failed query execution go here.")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("All input flowfiles that are part of a successful query execution go here.")
.build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query") .name("Query")
.description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried") .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
" an incoming connection from another processor to provide the query as a valid JSON document inside of " +
"the flowfile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
"that will result in a full collection fetch using a \"{}\" query.")
.required(false) .required(false)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.addValidator(DOCUMENT_VALIDATOR) .addValidator(DOCUMENT_VALIDATOR)
.build(); .build();
static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
.name("Projection") .name("Projection")
.description("The fields to be returned from the documents in the result set; must be a valid BSON document") .description("The fields to be returned from the documents in the result set; must be a valid BSON document")
@ -155,8 +171,6 @@ public class GetMongo extends AbstractMongoProcessor {
private final static Set<Relationship> relationships; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
static { static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors); _propertyDescriptors.addAll(descriptors);
@ -176,6 +190,8 @@ public class GetMongo extends AbstractMongoProcessor {
final Set<Relationship> _relationships = new HashSet<>(); final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS); _relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
_relationships.add(REL_ORIGINAL);
relationships = Collections.unmodifiableSet(_relationships); relationships = Collections.unmodifiableSet(_relationships);
} }
@ -226,22 +242,55 @@ public class GetMongo extends AbstractMongoProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
Map attributes = new HashMap(); Map attributes = new HashMap();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY).isSet() && context.getProperty(QUERY_ATTRIBUTE).isSet()) { final Document query;
attributes.put(context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions().getValue(), String queryStr;
context.getProperty(QUERY).evaluateAttributeExpressions().getValue()); if (context.getProperty(QUERY).isSet()) {
queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
query = Document.parse(queryStr);
} else if (!context.getProperty(QUERY).isSet() && input == null) {
queryStr = "{}";
query = Document.parse("{}");
} else {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
queryStr = new String(out.toByteArray());
query = Document.parse(queryStr);
} catch (Exception ex) {
getLogger().error("Error reading flowfile", ex);
if (input != null) { //Likely culprit is a bad query
session.transfer(input, REL_FAILURE);
return;
} else {
throw new ProcessException(ex);
}
}
}
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
attributes.put(queryAttr, queryStr);
} }
final Document query = context.getProperty(QUERY).isSet()
? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null;
final Document projection = context.getProperty(PROJECTION).isSet() final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null; ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet() final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue(); final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
configureMapper(jsonTypeSetting); configureMapper(jsonTypeSetting);
@ -258,10 +307,10 @@ public class GetMongo extends AbstractMongoProcessor {
it.sort(sort); it.sort(sort);
} }
if (context.getProperty(LIMIT).isSet()) { if (context.getProperty(LIMIT).isSet()) {
it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions().asInteger()); it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
} }
if (context.getProperty(BATCH_SIZE).isSet()) { if (context.getProperty(BATCH_SIZE).isSet()) {
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()); it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
} }
final MongoCursor<Document> cursor = it.iterator(); final MongoCursor<Document> cursor = it.iterator();
@ -269,7 +318,7 @@ public class GetMongo extends AbstractMongoProcessor {
try { try {
FlowFile flowFile = null; FlowFile flowFile = null;
if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger(); int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
List<Document> batch = new ArrayList<>(); List<Document> batch = new ArrayList<>();
while (cursor.hasNext()) { while (cursor.hasNext()) {
@ -313,15 +362,19 @@ public class GetMongo extends AbstractMongoProcessor {
} }
} }
session.commit(); if (input != null) {
session.transfer(input, REL_ORIGINAL);
}
} finally { } finally {
cursor.close(); cursor.close();
} }
} catch (final RuntimeException e) { } catch (final RuntimeException e) {
if (input != null) {
session.transfer(input, REL_FAILURE);
}
context.yield(); context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e); logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e);
} }
} }

View File

@ -40,6 +40,7 @@ import org.junit.Test;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Calendar; import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -76,7 +77,7 @@ public class GetMongoTest {
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
runner.setIncomingConnection(true); runner.setIncomingConnection(false);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
@ -243,8 +244,11 @@ public class GetMongoTest {
public void testResultsPerFlowfile() throws Exception { public void testResultsPerFlowfile() throws Exception {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}"); runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}");
runner.setVariable("results.per.flowfile", "2"); runner.setVariable("results.per.flowfile", "2");
runner.enqueue("{}");
runner.setIncomingConnection(true);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); runner.assertTransferCount(GetMongo.REL_SUCCESS, 2);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json");
@ -255,8 +259,11 @@ public class GetMongoTest {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}"); runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}");
runner.setVariable("batch.size", "1"); runner.setVariable("batch.size", "1");
runner.enqueue("{}");
runner.setIncomingConnection(true);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); runner.assertTransferCount(GetMongo.REL_SUCCESS, 2);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json");
@ -266,34 +273,164 @@ public class GetMongoTest {
public void testConfigurablePrettyPrint() { public void testConfigurablePrettyPrint() {
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.setProperty(GetMongo.LIMIT, "1"); runner.setProperty(GetMongo.LIMIT, "1");
runner.enqueue("{}");
runner.setIncomingConnection(true);
runner.run(); runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
byte[] raw = runner.getContentAsByteArray(flowFiles.get(0)); byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
String json = new String(raw); String json = new String(raw);
Assert.assertTrue("JSON did not have new lines.", json.contains("\n")); Assert.assertTrue("JSON did not have new lines.", json.contains("\n"));
runner.clearTransferState(); runner.clearTransferState();
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP); runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP);
runner.enqueue("{}");
runner.run(); runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
raw = runner.getContentAsByteArray(flowFiles.get(0)); raw = runner.getContentAsByteArray(flowFiles.get(0));
json = new String(raw); json = new String(raw);
Assert.assertFalse("New lines detected", json.contains("\n")); Assert.assertFalse("New lines detected", json.contains("\n"));
} }
private void testQueryAttribute(String attr, String expected) {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (MockFlowFile mff : flowFiles) {
String val = mff.getAttribute(attr);
Assert.assertNotNull("Missing query attribute", val);
Assert.assertEquals("Value was wrong", expected, val);
}
}
@Test @Test
public void testQueryAttribute() { public void testQueryAttribute() {
/*
* Test original behavior; Manually set query of {}, no input
*/
final String attr = "query.attr"; final String attr = "query.attr";
runner.setProperty(GetMongo.QUERY, "{}"); runner.setProperty(GetMongo.QUERY, "{}");
runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr); runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr);
runner.run(); runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); testQueryAttribute(attr, "{}");
for (MockFlowFile mff : flowFiles) {
String val = mff.getAttribute(attr); runner.clearTransferState();
Assert.assertNotNull("Missing query attribute", val);
Assert.assertEquals("Value was wrong", val, "{}"); /*
* Test original behavior; No Input/Empty val = {}
*/
runner.removeProperty(GetMongo.QUERY);
runner.setIncomingConnection(false);
runner.run();
testQueryAttribute(attr, "{}");
runner.clearTransferState();
/*
* Input flowfile with {} as the query
*/
runner.setIncomingConnection(true);
runner.enqueue("{}");
runner.run();
testQueryAttribute(attr, "{}");
/*
* Input flowfile with invalid query
*/
runner.clearTransferState();
runner.enqueue("invalid query");
runner.run();
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetMongo.REL_FAILURE, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
} }
/*
* Query read behavior tests
*/
@Test
public void testReadQueryFromBodyWithEL() {
Map attributes = new HashMap();
attributes.put("field", "c");
attributes.put("value", "4");
String query = "{ \"${field}\": { \"$gte\": ${value}}}";
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, query);
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "10");
runner.setValidateExpressionUsage(true);
runner.enqueue("test", attributes);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
} }
@Test
public void testReadQueryFromBodyNoEL() {
String query = "{ \"c\": { \"$gte\": 4 }}";
runner.setIncomingConnection(true);
runner.removeProperty(GetMongo.QUERY);
runner.enqueue(query);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testReadQueryFromQueryParamNoConnection() {
String query = "{ \"c\": { \"$gte\": 4 }}";
runner.setProperty(GetMongo.QUERY, query);
runner.setIncomingConnection(false);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testReadQueryFromQueryParamWithConnection() {
String query = "{ \"c\": { \"$gte\": ${value} }}";
Map<String, String> attrs = new HashMap<>();
attrs.put("value", "4");
runner.setProperty(GetMongo.QUERY, query);
runner.setIncomingConnection(true);
runner.enqueue("test", attrs);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testQueryParamMissingWithNoFlowfile() {
Exception ex = null;
try {
runner.assertValid();
runner.setIncomingConnection(false);
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "1");
runner.run(1, true, true);
} catch (Exception pe) {
ex = pe;
}
Assert.assertNull("An exception was thrown!", ex);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
}
/*
* End query read behavior tests
*/
} }