NIFI-4122 Added the ability to combine multiple Mongo result documents into a single output JSON array.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1948.
This commit is contained in:
Mike Thomsen 2017-06-26 11:06:35 -04:00 committed by Pierre Villard
parent 82ef671953
commit 5172797448
2 changed files with 91 additions and 13 deletions

View File

@ -24,8 +24,12 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -36,6 +40,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -44,10 +49,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document; import org.bson.Document;
import org.codehaus.jackson.map.ObjectMapper;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@Tags({ "mongodb", "read", "get" }) @Tags({ "mongodb", "read", "get" })
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -99,6 +102,13 @@ public class GetMongo extends AbstractMongoProcessor {
.required(false) .required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("results-per-flowfile")
.displayName("Results Per FlowFile")
.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
private final static Set<Relationship> relationships; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
@ -111,6 +121,7 @@ public class GetMongo extends AbstractMongoProcessor {
_propertyDescriptors.add(SORT); _propertyDescriptors.add(SORT);
_propertyDescriptors.add(LIMIT); _propertyDescriptors.add(LIMIT);
_propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(BATCH_SIZE);
_propertyDescriptors.add(RESULTS_PER_FLOWFILE);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH); _propertyDescriptors.add(CLIENT_AUTH);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@ -130,6 +141,32 @@ public class GetMongo extends AbstractMongoProcessor {
return propertyDescriptors; return propertyDescriptors;
} }
private ObjectMapper mapper = new ObjectMapper();
//Turn a list of Mongo result documents into a String representation of a JSON array
private String buildBatch(List<Document> documents) throws IOException {
List<Map> docs = new ArrayList<>();
for (Document document : documents) {
String asJson = document.toJson();
docs.add(mapper.readValue(asJson, Map.class));
}
return mapper.writeValueAsString(docs);
}
private void writeBatch(String payload, ProcessContext context, ProcessSession session) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(payload.getBytes("UTF-8"));
}
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS);
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
@ -156,8 +193,36 @@ public class GetMongo extends AbstractMongoProcessor {
} }
final MongoCursor<Document> cursor = it.iterator(); final MongoCursor<Document> cursor = it.iterator();
ComponentLog log = getLogger();
try { try {
FlowFile flowFile = null; FlowFile flowFile = null;
if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
List<Document> batch = new ArrayList<>();
while (cursor.hasNext()) {
batch.add(cursor.next());
if (batch.size() == ceiling) {
try {
if (log.isDebugEnabled()) {
log.debug("Writing batch...");
}
String payload = buildBatch(batch);
writeBatch(payload, context, session);
batch = new ArrayList<>();
} catch (IOException ex) {
getLogger().error("Error building batch", ex);
}
}
}
if (batch.size() > 0) {
try {
writeBatch(buildBatch(batch), context, session);
} catch (IOException ex) {
getLogger().error("Error sending remainder of batch", ex);
}
}
} else {
while (cursor.hasNext()) { while (cursor.hasNext()) {
flowFile = session.create(); flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() { flowFile = session.write(flowFile, new OutputStreamCallback() {
@ -166,10 +231,12 @@ public class GetMongo extends AbstractMongoProcessor {
IOUtils.write(cursor.next().toJson(), out); IOUtils.write(cursor.next().toJson(), out);
} }
}); });
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
}
session.commit(); session.commit();

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockProcessContext;
@ -200,4 +201,14 @@ public class GetMongoTest {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson()); flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson());
} }
@Test
public void testResultsPerFlowfile() throws Exception {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json");
}
} }