diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 70cc4ca811..7f79e9d507 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -42,6 +42,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; import org.bson.json.JsonWriterSettings; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -55,7 +56,7 @@ import java.util.Set; @Tags({ "mongodb", "read", "get" }) -@InputRequirement(Requirement.INPUT_FORBIDDEN) +@InputRequirement(Requirement.INPUT_ALLOWED) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") public class GetMongo extends AbstractMongoProcessor { public static final Validator DOCUMENT_VALIDATOR = (subject, value, context) -> { @@ -76,13 +77,28 @@ public class GetMongo extends AbstractMongoProcessor { return builder.explanation(reason).valid(reason == null).build(); }; - static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() - .name("Query") - .description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried") - .required(false) - .expressionLanguageSupported(true) - .addValidator(DOCUMENT_VALIDATOR) + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All input flowfiles that are part of a failed query execution go here.") .build(); + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("All input flowfiles that are part of a successful query execution go here.") + .build(); + + static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("Query") + .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" + + " an incoming connection from another processor to provide the query as a valid JSON document inside of " + + "the flowfile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " + + "that will result in a full collection fetch using a \"{}\" query.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(DOCUMENT_VALIDATOR) + .build(); + static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() .name("Projection") .description("The fields to be returned from the documents in the result set; must be a valid BSON document") @@ -155,8 +171,6 @@ public class GetMongo extends AbstractMongoProcessor { private final static Set relationships; private final static List propertyDescriptors; - static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); - static { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); @@ -176,6 +190,8 @@ public class GetMongo extends AbstractMongoProcessor { final Set _relationships = new HashSet<>(); _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + _relationships.add(REL_ORIGINAL); relationships = Collections.unmodifiableSet(_relationships); } @@ -226,22 +242,55 @@ public class GetMongo extends AbstractMongoProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile input = null; + if (context.hasIncomingConnection()) { + input = session.get(); + + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + final ComponentLog logger = getLogger(); Map attributes = new HashMap(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); - if (context.getProperty(QUERY).isSet() && context.getProperty(QUERY_ATTRIBUTE).isSet()) { - attributes.put(context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions().getValue(), - context.getProperty(QUERY).evaluateAttributeExpressions().getValue()); + final Document query; + String queryStr; + if (context.getProperty(QUERY).isSet()) { + queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue(); + query = Document.parse(queryStr); + } else if (!context.getProperty(QUERY).isSet() && input == null) { + queryStr = "{}"; + query = Document.parse("{}"); + } else { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(input, out); + out.close(); + queryStr = new String(out.toByteArray()); + query = Document.parse(queryStr); + } catch (Exception ex) { + getLogger().error("Error reading flowfile", ex); + if (input != null) { //Likely culprit is a bad query + session.transfer(input, REL_FAILURE); + return; + } else { + throw new ProcessException(ex); + } + } + } + + if (context.getProperty(QUERY_ATTRIBUTE).isSet()) { + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue(); + attributes.put(queryAttr, queryStr); } - final Document query = context.getProperty(QUERY).isSet() - ? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null; final Document projection = context.getProperty(PROJECTION).isSet() - ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null; + ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null; final Document sort = context.getProperty(SORT).isSet() - ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; + ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null; final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue(); configureMapper(jsonTypeSetting); @@ -258,10 +307,10 @@ public class GetMongo extends AbstractMongoProcessor { it.sort(sort); } if (context.getProperty(LIMIT).isSet()) { - it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions().asInteger()); + it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger()); } if (context.getProperty(BATCH_SIZE).isSet()) { - it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()); + it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger()); } final MongoCursor cursor = it.iterator(); @@ -269,7 +318,7 @@ public class GetMongo extends AbstractMongoProcessor { try { FlowFile flowFile = null; if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { - int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger(); + int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger(); List batch = new ArrayList<>(); while (cursor.hasNext()) { @@ -313,15 +362,19 @@ public class GetMongo extends AbstractMongoProcessor { } } - session.commit(); + if (input != null) { + session.transfer(input, REL_ORIGINAL); + } } finally { cursor.close(); } } catch (final RuntimeException e) { + if (input != null) { + session.transfer(input, REL_FAILURE); + } context.yield(); - session.rollback(); logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index 8703990bc0..6cf8d62a80 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -40,6 +40,7 @@ import org.junit.Test; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -76,7 +77,7 @@ public class GetMongoTest { runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); - runner.setIncomingConnection(true); + runner.setIncomingConnection(false); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); @@ -243,8 +244,11 @@ public class GetMongoTest { public void testResultsPerFlowfile() throws Exception { runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}"); runner.setVariable("results.per.flowfile", "2"); + runner.enqueue("{}"); + runner.setIncomingConnection(true); runner.run(); - runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 2); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); List results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); @@ -255,8 +259,11 @@ public class GetMongoTest { runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}"); runner.setVariable("batch.size", "1"); + runner.enqueue("{}"); + runner.setIncomingConnection(true); runner.run(); - runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 2); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); List results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); @@ -266,34 +273,164 @@ public class GetMongoTest { public void testConfigurablePrettyPrint() { runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); runner.setProperty(GetMongo.LIMIT, "1"); + runner.enqueue("{}"); + runner.setIncomingConnection(true); runner.run(); runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); byte[] raw = runner.getContentAsByteArray(flowFiles.get(0)); String json = new String(raw); Assert.assertTrue("JSON did not have new lines.", json.contains("\n")); runner.clearTransferState(); runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP); + runner.enqueue("{}"); runner.run(); runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); raw = runner.getContentAsByteArray(flowFiles.get(0)); json = new String(raw); Assert.assertFalse("New lines detected", json.contains("\n")); } + private void testQueryAttribute(String attr, String expected) { + List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + for (MockFlowFile mff : flowFiles) { + String val = mff.getAttribute(attr); + Assert.assertNotNull("Missing query attribute", val); + Assert.assertEquals("Value was wrong", expected, val); + } + } + @Test public void testQueryAttribute() { + /* + * Test original behavior; Manually set query of {}, no input + */ final String attr = "query.attr"; runner.setProperty(GetMongo.QUERY, "{}"); runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr); runner.run(); runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); - List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); - for (MockFlowFile mff : flowFiles) { - String val = mff.getAttribute(attr); - Assert.assertNotNull("Missing query attribute", val); - Assert.assertEquals("Value was wrong", val, "{}"); - } + testQueryAttribute(attr, "{}"); + + runner.clearTransferState(); + + /* + * Test original behavior; No Input/Empty val = {} + */ + runner.removeProperty(GetMongo.QUERY); + runner.setIncomingConnection(false); + runner.run(); + testQueryAttribute(attr, "{}"); + + runner.clearTransferState(); + + /* + * Input flowfile with {} as the query + */ + + runner.setIncomingConnection(true); + runner.enqueue("{}"); + runner.run(); + testQueryAttribute(attr, "{}"); + + /* + * Input flowfile with invalid query + */ + + runner.clearTransferState(); + runner.enqueue("invalid query"); + runner.run(); + + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetMongo.REL_FAILURE, 1); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 0); } + + /* + * Query read behavior tests + */ + @Test + public void testReadQueryFromBodyWithEL() { + Map attributes = new HashMap(); + attributes.put("field", "c"); + attributes.put("value", "4"); + String query = "{ \"${field}\": { \"$gte\": ${value}}}"; + runner.setIncomingConnection(true); + runner.setProperty(GetMongo.QUERY, query); + runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "10"); + runner.setValidateExpressionUsage(true); + runner.enqueue("test", attributes); + runner.run(1, true, true); + + runner.assertTransferCount(GetMongo.REL_FAILURE, 0); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + } + + @Test + public void testReadQueryFromBodyNoEL() { + String query = "{ \"c\": { \"$gte\": 4 }}"; + runner.setIncomingConnection(true); + runner.removeProperty(GetMongo.QUERY); + runner.enqueue(query); + runner.run(1, true, true); + + runner.assertTransferCount(GetMongo.REL_FAILURE, 0); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + + } + + @Test + public void testReadQueryFromQueryParamNoConnection() { + String query = "{ \"c\": { \"$gte\": 4 }}"; + runner.setProperty(GetMongo.QUERY, query); + runner.setIncomingConnection(false); + runner.run(1, true, true); + runner.assertTransferCount(GetMongo.REL_FAILURE, 0); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + + } + + @Test + public void testReadQueryFromQueryParamWithConnection() { + String query = "{ \"c\": { \"$gte\": ${value} }}"; + Map attrs = new HashMap<>(); + attrs.put("value", "4"); + + runner.setProperty(GetMongo.QUERY, query); + runner.setIncomingConnection(true); + runner.enqueue("test", attrs); + runner.run(1, true, true); + runner.assertTransferCount(GetMongo.REL_FAILURE, 0); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + + } + + @Test + public void testQueryParamMissingWithNoFlowfile() { + Exception ex = null; + + try { + runner.assertValid(); + runner.setIncomingConnection(false); + runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "1"); + runner.run(1, true, true); + } catch (Exception pe) { + ex = pe; + } + + Assert.assertNull("An exception was thrown!", ex); + runner.assertTransferCount(GetMongo.REL_FAILURE, 0); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); + } + /* + * End query read behavior tests + */ }