NIFI-4823 Made pretty printing configurable in GetMongo.

This closes #2441

Signed-off-by: Jeremy Dyer <jeremydyer@apache.org>
This commit is contained in:
Mike Thomsen 2018-01-29 06:44:14 -05:00 committed by Jeremy Dyer
parent d8cfb8e6c5
commit 6e7dfb9935
2 changed files with 53 additions and 14 deletions

View File

@ -19,6 +19,7 @@
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.mongodb.client.FindIterable; import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoCursor;
@ -126,6 +127,19 @@ public class GetMongo extends AbstractMongoProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
static final AllowableValue YES_PP = new AllowableValue("true", "True");
static final AllowableValue NO_PP = new AllowableValue("false", "False");
static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder()
.name("use-pretty-printing")
.displayName("Pretty Print Results JSON")
.description("Choose whether or not to pretty print the JSON from the results of the query. " +
"Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document")
.required(true)
.defaultValue(YES_PP.getValue())
.allowableValues(YES_PP, NO_PP)
.addValidator(Validator.VALID)
.build();
static final String JSON_TYPE_EXTENDED = "Extended"; static final String JSON_TYPE_EXTENDED = "Extended";
static final String JSON_TYPE_STANDARD = "Standard"; static final String JSON_TYPE_STANDARD = "Standard";
static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
@ -151,6 +165,7 @@ public class GetMongo extends AbstractMongoProcessor {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors); _propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(JSON_TYPE);
_propertyDescriptors.add(USE_PRETTY_PRINTING);
_propertyDescriptors.add(QUERY); _propertyDescriptors.add(QUERY);
_propertyDescriptors.add(PROJECTION); _propertyDescriptors.add(PROJECTION);
_propertyDescriptors.add(SORT); _propertyDescriptors.add(SORT);
@ -179,13 +194,13 @@ public class GetMongo extends AbstractMongoProcessor {
private ObjectMapper mapper; private ObjectMapper mapper;
//Turn a list of Mongo result documents into a String representation of a JSON array //Turn a list of Mongo result documents into a String representation of a JSON array
private String buildBatch(List<Document> documents, String jsonTypeSetting) throws IOException { private String buildBatch(List<Document> documents, String jsonTypeSetting, String prettyPrintSetting) throws IOException {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (int index = 0; index < documents.size(); index++) { for (int index = 0; index < documents.size(); index++) {
Document document = documents.get(index); Document document = documents.get(index);
String asJson; String asJson;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
asJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(document); asJson = getObjectWriter(mapper, prettyPrintSetting).writeValueAsString(document);
} else { } else {
asJson = document.toJson(new JsonWriterSettings(true)); asJson = document.toJson(new JsonWriterSettings(true));
} }
@ -206,6 +221,11 @@ public class GetMongo extends AbstractMongoProcessor {
} }
} }
private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter()
: mapper.writer();
}
private void writeBatch(String payload, ProcessContext context, ProcessSession session) { private void writeBatch(String payload, ProcessContext context, ProcessSession session) {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() { flowFile = session.write(flowFile, new OutputStreamCallback() {
@ -230,6 +250,7 @@ public class GetMongo extends AbstractMongoProcessor {
final Document sort = context.getProperty(SORT).isSet() final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null;
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
configureMapper(jsonTypeSetting); configureMapper(jsonTypeSetting);
@ -265,7 +286,7 @@ public class GetMongo extends AbstractMongoProcessor {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Writing batch..."); log.debug("Writing batch...");
} }
String payload = buildBatch(batch, jsonTypeSetting); String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint);
writeBatch(payload, context, session); writeBatch(payload, context, session);
batch = new ArrayList<>(); batch = new ArrayList<>();
} catch (IOException ex) { } catch (IOException ex) {
@ -275,7 +296,7 @@ public class GetMongo extends AbstractMongoProcessor {
} }
if (batch.size() > 0) { if (batch.size() > 0) {
try { try {
writeBatch(buildBatch(batch, jsonTypeSetting), context, session); writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), context, session);
} catch (IOException ex) { } catch (IOException ex) {
getLogger().error("Error sending remainder of batch", ex); getLogger().error("Error sending remainder of batch", ex);
} }
@ -283,17 +304,14 @@ public class GetMongo extends AbstractMongoProcessor {
} else { } else {
while (cursor.hasNext()) { while (cursor.hasNext()) {
flowFile = session.create(); flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() { flowFile = session.write(flowFile, out -> {
@Override
public void process(OutputStream out) throws IOException {
String json; String json;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next()); json = getObjectWriter(mapper, usePrettyPrint).writeValueAsString(cursor.next());
} else { } else {
json = cursor.next().toJson(); json = cursor.next().toJson();
} }
IOUtils.write(json, out); IOUtils.write(json, out);
}
}); });
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");

View File

@ -75,6 +75,7 @@ public class GetMongoTest {
runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
@ -259,4 +260,24 @@ public class GetMongoTest {
Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json");
} }
@Test
public void testConfigurablePrettyPrint() {
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.setProperty(GetMongo.LIMIT, "1");
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
String json = new String(raw);
Assert.assertTrue("JSON did not have new lines.", json.contains("\n"));
runner.clearTransferState();
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP);
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
raw = runner.getContentAsByteArray(flowFiles.get(0));
json = new String(raw);
Assert.assertFalse("New lines detected", json.contains("\n"));
}
} }