diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index fc00a080db..8b73b02f03 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -24,8 +24,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -36,6 +40,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -44,10 +49,8 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; +import org.codehaus.jackson.map.ObjectMapper; -import com.mongodb.client.FindIterable; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -99,6 +102,13 @@ public class GetMongo extends AbstractMongoProcessor { .required(false) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() + .name("results-per-flowfile") + .displayName("Results Per FlowFile") + .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); private final static Set relationships; private final static List propertyDescriptors; @@ -111,6 +121,7 @@ public class GetMongo extends AbstractMongoProcessor { _propertyDescriptors.add(SORT); _propertyDescriptors.add(LIMIT); _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(RESULTS_PER_FLOWFILE); _propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(CLIENT_AUTH); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); @@ -130,6 +141,32 @@ public class GetMongo extends AbstractMongoProcessor { return propertyDescriptors; } + private ObjectMapper mapper = new ObjectMapper(); + + //Turn a list of Mongo result documents into a String representation of a JSON array + private String buildBatch(List documents) throws IOException { + List docs = new ArrayList<>(); + for (Document document : documents) { + String asJson = document.toJson(); + docs.add(mapper.readValue(asJson, Map.class)); + } + + return mapper.writeValueAsString(docs); + } + + private void writeBatch(String payload, ProcessContext context, ProcessSession session) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(payload.getBytes("UTF-8")); + } + }); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); + session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.transfer(flowFile, REL_SUCCESS); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final ComponentLog logger = getLogger(); @@ -156,19 +193,49 @@ public class GetMongo extends AbstractMongoProcessor { } final MongoCursor cursor = it.iterator(); + ComponentLog log = getLogger(); try { FlowFile flowFile = null; - while (cursor.hasNext()) { - flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - IOUtils.write(cursor.next().toJson(), out); - } - }); + if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { + int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + List batch = new ArrayList<>(); - session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); - session.transfer(flowFile, REL_SUCCESS); + while (cursor.hasNext()) { + batch.add(cursor.next()); + if (batch.size() == ceiling) { + try { + if (log.isDebugEnabled()) { + log.debug("Writing batch..."); + } + String payload = buildBatch(batch); + writeBatch(payload, context, session); + batch = new ArrayList<>(); + } catch (IOException ex) { + getLogger().error("Error building batch", ex); + } + } + } + if (batch.size() > 0) { + try { + writeBatch(buildBatch(batch), context, session); + } catch (IOException ex) { + getLogger().error("Error sending remainder of batch", ex); + } + } + } else { + while (cursor.hasNext()) { + flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + IOUtils.write(cursor.next().toJson(), out); + } + }); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); + + session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.transfer(flowFile, REL_SUCCESS); + } } session.commit(); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index 810fc4d2b9..e39148d17e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; @@ -200,4 +201,14 @@ public class GetMongoTest { List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson()); } + + @Test + public void testResultsPerFlowfile() throws Exception { + runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); + runner.run(); + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); + List results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); + Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); + } }