From 6e7dfb9935622a5215115df93503ef4bfba75949 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 29 Jan 2018 06:44:14 -0500 Subject: [PATCH] NIFI-4823 Made pretty printing configurable in GetMongo. This closes #2441 Signed-off-by: Jeremy Dyer --- .../nifi/processors/mongodb/GetMongo.java | 46 +++++++++++++------ .../nifi/processors/mongodb/GetMongoTest.java | 21 +++++++++ 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 898824583a..5fa4550275 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.mongodb; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -126,6 +127,19 @@ public class GetMongo extends AbstractMongoProcessor { .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + static final AllowableValue YES_PP = new AllowableValue("true", "True"); + static final AllowableValue NO_PP = new AllowableValue("false", "False"); + static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder() + .name("use-pretty-printing") + .displayName("Pretty Print Results JSON") + .description("Choose whether or not to pretty print the JSON from the results of the query. " + + "Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document") + .required(true) + .defaultValue(YES_PP.getValue()) + .allowableValues(YES_PP, NO_PP) + .addValidator(Validator.VALID) + .build(); + static final String JSON_TYPE_EXTENDED = "Extended"; static final String JSON_TYPE_STANDARD = "Standard"; static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", @@ -151,6 +165,7 @@ public class GetMongo extends AbstractMongoProcessor { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(JSON_TYPE); + _propertyDescriptors.add(USE_PRETTY_PRINTING); _propertyDescriptors.add(QUERY); _propertyDescriptors.add(PROJECTION); _propertyDescriptors.add(SORT); @@ -179,13 +194,13 @@ public class GetMongo extends AbstractMongoProcessor { private ObjectMapper mapper; //Turn a list of Mongo result documents into a String representation of a JSON array - private String buildBatch(List documents, String jsonTypeSetting) throws IOException { + private String buildBatch(List documents, String jsonTypeSetting, String prettyPrintSetting) throws IOException { StringBuilder builder = new StringBuilder(); for (int index = 0; index < documents.size(); index++) { Document document = documents.get(index); String asJson; if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - asJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(document); + asJson = getObjectWriter(mapper, prettyPrintSetting).writeValueAsString(document); } else { asJson = document.toJson(new JsonWriterSettings(true)); } @@ -206,6 +221,11 @@ public class GetMongo extends AbstractMongoProcessor { } } + private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { + return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() + : mapper.writer(); + } + private void writeBatch(String payload, ProcessContext context, ProcessSession session) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @@ -230,6 +250,7 @@ public class GetMongo extends AbstractMongoProcessor { final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); + final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue(); configureMapper(jsonTypeSetting); @@ -265,7 +286,7 @@ public class GetMongo extends AbstractMongoProcessor { if (log.isDebugEnabled()) { log.debug("Writing batch..."); } - String payload = buildBatch(batch, jsonTypeSetting); + String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint); writeBatch(payload, context, session); batch = new ArrayList<>(); } catch (IOException ex) { @@ -275,7 +296,7 @@ public class GetMongo extends AbstractMongoProcessor { } if (batch.size() > 0) { try { - writeBatch(buildBatch(batch, jsonTypeSetting), context, session); + writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), context, session); } catch (IOException ex) { getLogger().error("Error sending remainder of batch", ex); } @@ -283,17 +304,14 @@ public class GetMongo extends AbstractMongoProcessor { } else { while (cursor.hasNext()) { flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - String json; - if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next()); - } else { - json = cursor.next().toJson(); - } - IOUtils.write(json, out); + flowFile = session.write(flowFile, out -> { + String json; + if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { + json = getObjectWriter(mapper, usePrettyPrint).writeValueAsString(cursor.next()); + } else { + json = cursor.next().toJson(); } + IOUtils.write(json, out); }); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index 2de0c97729..0e168132e9 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -75,6 +75,7 @@ public class GetMongoTest { runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); + runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); @@ -259,4 +260,24 @@ public class GetMongoTest { Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); } + + @Test + public void testConfigurablePrettyPrint() { + runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); + runner.setProperty(GetMongo.LIMIT, "1"); + runner.run(); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + byte[] raw = runner.getContentAsByteArray(flowFiles.get(0)); + String json = new String(raw); + Assert.assertTrue("JSON did not have new lines.", json.contains("\n")); + runner.clearTransferState(); + runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP); + runner.run(); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + raw = runner.getContentAsByteArray(flowFiles.get(0)); + json = new String(raw); + Assert.assertFalse("New lines detected", json.contains("\n")); + } }