diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 339bee70e5..fa3bc10811 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.nifi.processors.mongodb; +import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientOptions.Builder; @@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.authentication.exception.ProviderCreationException; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -45,6 +47,8 @@ import javax.net.ssl.SSLContext; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -57,6 +61,13 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; + protected static final String JSON_TYPE_EXTENDED = "Extended"; + protected static final String JSON_TYPE_STANDARD = "Standard"; + protected static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", + "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); + protected static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", + "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); + protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() .name("Mongo URI") .displayName("Mongo URI") @@ -65,6 +76,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("Mongo Database Name") .displayName("Mongo Database Name") @@ -73,6 +85,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() .name("Mongo Collection Name") .description("The name of the collection to use") @@ -80,6 +93,19 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + + protected static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() + .allowableValues(JSON_EXTENDED, JSON_STANDARD) + .defaultValue(JSON_TYPE_EXTENDED) + .displayName("JSON Type") + .name("json-type") + .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + + " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + + " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl-context-service") .displayName("SSL Context Service") @@ -88,6 +114,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .required(false) .identifiesControllerService(SSLContextService.class) .build(); + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("ssl-client-auth") .displayName("Client Auth") @@ -155,6 +182,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { descriptors.add(CLIENT_AUTH); } + protected ObjectMapper objectMapper; protected MongoClient mongoClient; @OnScheduled @@ -275,4 +303,14 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, rel); } + + protected synchronized void configureMapper(String setting) { + objectMapper = new ObjectMapper(); + + if (setting.equals(JSON_TYPE_STANDARD)) { + objectMapper.registerModule(ObjectIdSerializer.getModule()); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + objectMapper.setDateFormat(df); + } + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 10eb0c723a..356f3f4712 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -45,8 +45,6 @@ import org.bson.json.JsonWriterSettings; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -55,7 +53,6 @@ import java.util.List; import java.util.Map; import java.util.Set; - @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_ALLOWED) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") @@ -134,24 +131,6 @@ public class GetMongo extends AbstractMongoProcessor { .addValidator(Validator.VALID) .build(); - static final String JSON_TYPE_EXTENDED = "Extended"; - static final String JSON_TYPE_STANDARD = "Standard"; - static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", - "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); - static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", - "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); - static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() - .allowableValues(JSON_EXTENDED, JSON_STANDARD) - .defaultValue(JSON_TYPE_EXTENDED) - .displayName("JSON Type") - .name("json-type") - .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + - " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + - " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(true) - .build(); - private final static Set relationships; private final static List propertyDescriptors; @@ -189,8 +168,6 @@ public class GetMongo extends AbstractMongoProcessor { return propertyDescriptors; } - private ObjectMapper mapper; - //Turn a list of Mongo result documents into a String representation of a JSON array private String buildBatch(List documents, String jsonTypeSetting, String prettyPrintSetting) throws IOException { StringBuilder builder = new StringBuilder(); @@ -198,7 +175,7 @@ public class GetMongo extends AbstractMongoProcessor { Document document = documents.get(index); String asJson; if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - asJson = getObjectWriter(mapper, prettyPrintSetting).writeValueAsString(document); + asJson = getObjectWriter(objectMapper, prettyPrintSetting).writeValueAsString(document); } else { asJson = document.toJson(new JsonWriterSettings(true)); } @@ -210,15 +187,6 @@ public class GetMongo extends AbstractMongoProcessor { return "[" + builder.toString() + "]"; } - private void configureMapper(String setting) { - mapper = new ObjectMapper(); - if (setting.equals(JSON_TYPE_STANDARD)) { - mapper.registerModule(ObjectIdSerializer.getModule()); - DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - mapper.setDateFormat(df); - } - } - private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); @@ -237,7 +205,7 @@ public class GetMongo extends AbstractMongoProcessor { final ComponentLog logger = getLogger(); - Map attributes = new HashMap(); + Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); final Document query; @@ -334,7 +302,7 @@ public class GetMongo extends AbstractMongoProcessor { flowFile = session.write(flowFile, out -> { String json; if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - json = getObjectWriter(mapper, usePrettyPrint).writeValueAsString(cursor.next()); + json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()); } else { json = cursor.next().toJson(); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index 19969c420a..684c5f500c 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -96,6 +96,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(CHARSET); _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(QUERY_ATTRIBUTE); _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(RESULTS_PER_FLOWFILE); @@ -120,11 +121,10 @@ public class RunMongoAggregation extends AbstractMongoProcessor { return propertyDescriptors; } - static String buildBatch(List batch) { - ObjectMapper mapper = new ObjectMapper(); + private String buildBatch(List batch) { String retVal; try { - retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); + retVal = objectMapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); } catch (Exception e) { retVal = null; } @@ -143,12 +143,15 @@ public class RunMongoAggregation extends AbstractMongoProcessor { } } - String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); - String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); - Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); - Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); + final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); - Map attrs = new HashMap(); + configureMapper(jsonTypeSetting); + + Map attrs = new HashMap<>(); if (queryAttr != null && queryAttr.trim().length() > 0) { attrs.put(queryAttr, query); } @@ -162,13 +165,13 @@ public class RunMongoAggregation extends AbstractMongoProcessor { it.batchSize(batchSize != null ? batchSize : 1); iter = it.iterator(); - List batch = new ArrayList(); + List batch = new ArrayList<>(); while (iter.hasNext()) { batch.add(iter.next()); if (batch.size() == resultsPerFlowfile) { writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); - batch = new ArrayList(); + batch = new ArrayList<>(); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java index f2ddbca13c..02d9ad4619 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.HashMap; import java.util.List; @@ -48,6 +49,7 @@ public class RunMongoAggregationIT { private TestRunner runner; private MongoClient mongoClient; private Map mappings; + private Calendar now = Calendar.getInstance(); @Before public void setup() { @@ -68,7 +70,7 @@ public class RunMongoAggregationIT { for (int x = 0; x < values.length; x++) { for (int y = 0; y < x + 2; y++) { - Document doc = new Document().append("val", values[x]); + Document doc = new Document().append("val", values[x]).append("date", now.getTime()); collection.insertOne(doc); } mappings.put(values[x], x + 2); @@ -78,7 +80,6 @@ public class RunMongoAggregationIT { @After public void teardown() { runner = null; - mongoClient.getDatabase(DB_NAME).drop(); } @@ -164,6 +165,37 @@ public class RunMongoAggregationIT { runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1); } + @Test + public void testJsonTypes() throws IOException { + + runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_STANDARD); + runner.setProperty(RunMongoAggregation.QUERY, "[ { \"$project\": { \"myArray\": [ \"$val\", \"$date\" ] } } ]"); + runner.enqueue("test"); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + ObjectMapper mapper = new ObjectMapper(); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + for (MockFlowFile mockFlowFile : flowFiles) { + byte[] raw = runner.getContentAsByteArray(mockFlowFile); + Map> read = mapper.readValue(raw, Map.class); + Assert.assertTrue(read.get("myArray").get(1).equalsIgnoreCase( format.format(now.getTime()))); + } + + runner.clearTransferState(); + + runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_EXTENDED); + runner.enqueue("test"); + runner.run(1, true, true); + + flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + for (MockFlowFile mockFlowFile : flowFiles) { + byte[] raw = runner.getContentAsByteArray(mockFlowFile); + Map> read = mapper.readValue(raw, Map.class); + Assert.assertTrue(read.get("myArray").get(1) == now.getTimeInMillis()); + } + } + private void evaluateRunner(int original) throws IOException { runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size()); runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original);