diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 79f50b239c..185d0125b1 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -18,15 +18,6 @@ */ package org.apache.nifi.processors.mongodb; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -35,6 +26,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -49,8 +41,19 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; +import org.bson.json.JsonWriterSettings; import org.codehaus.jackson.map.ObjectMapper; +import java.io.IOException; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -120,12 +123,31 @@ public class GetMongo extends AbstractMongoProcessor { .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + static final String JSON_TYPE_EXTENDED = "Extended"; + static final String JSON_TYPE_STANDARD = "Standard"; + static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", + "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); + static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", + "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); + static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() + .allowableValues(JSON_EXTENDED, JSON_STANDARD) + .defaultValue(JSON_TYPE_EXTENDED) + .displayName("JSON Type") + .name("json-type") + .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + + " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + + " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") + .expressionLanguageSupported(false) + .required(true) + .build(); + private final static Set relationships; private final static List propertyDescriptors; static { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(QUERY); _propertyDescriptors.add(PROJECTION); _propertyDescriptors.add(SORT); @@ -151,17 +173,34 @@ public class GetMongo extends AbstractMongoProcessor { return propertyDescriptors; } - private ObjectMapper mapper = new ObjectMapper(); + private ObjectMapper mapper; //Turn a list of Mongo result documents into a String representation of a JSON array - private String buildBatch(List documents) throws IOException { - List docs = new ArrayList<>(); - for (Document document : documents) { - String asJson = document.toJson(); - docs.add(mapper.readValue(asJson, Map.class)); + private String buildBatch(List documents, String jsonTypeSetting) throws IOException { + StringBuilder builder = new StringBuilder(); + for (int index = 0; index < documents.size(); index++) { + Document document = documents.get(index); + String asJson; + if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { + asJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(document); + } else { + asJson = document.toJson(new JsonWriterSettings(true)); + } + builder + .append(asJson) + .append( (documents.size() > 1 && index + 1 < documents.size()) ? ", " : "" ); } - return mapper.writeValueAsString(docs); + return "[" + builder.toString() + "]"; + } + + private void configureMapper(String setting) { + mapper = new ObjectMapper(); + if (setting.equals(JSON_TYPE_STANDARD)) { + mapper.registerModule(ObjectIdSerializer.getModule()); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + mapper.setDateFormat(df); + } } private void writeBatch(String payload, ProcessContext context, ProcessSession session) { @@ -187,6 +226,9 @@ public class GetMongo extends AbstractMongoProcessor { ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null; final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; + final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); + configureMapper(jsonTypeSetting); + final MongoCollection collection = getCollection(context); @@ -220,7 +262,7 @@ public class GetMongo extends AbstractMongoProcessor { if (log.isDebugEnabled()) { log.debug("Writing batch..."); } - String payload = buildBatch(batch); + String payload = buildBatch(batch, jsonTypeSetting); writeBatch(payload, context, session); batch = new ArrayList<>(); } catch (IOException ex) { @@ -230,7 +272,7 @@ public class GetMongo extends AbstractMongoProcessor { } if (batch.size() > 0) { try { - writeBatch(buildBatch(batch), context, session); + writeBatch(buildBatch(batch, jsonTypeSetting), context, session); } catch (IOException ex) { getLogger().error("Error sending remainder of batch", ex); } @@ -241,7 +283,13 @@ public class GetMongo extends AbstractMongoProcessor { flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { - IOUtils.write(cursor.next().toJson(), out); + String json; + if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { + json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next()); + } else { + json = cursor.next().toJson(); + } + IOUtils.write(json, out); } }); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/ObjectIdSerializer.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/ObjectIdSerializer.java new file mode 100644 index 0000000000..3f207e31ca --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/ObjectIdSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import org.bson.types.ObjectId; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.SerializerProvider; +import org.codehaus.jackson.map.module.SimpleModule; + +import java.io.IOException; + +public class ObjectIdSerializer extends JsonSerializer { + @Override + public void serialize(ObjectId objectId, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + jsonGenerator.writeString(objectId.toString()); + } + + public static SimpleModule getModule() { + SimpleModule module = new SimpleModule("ObjectID Serializer", Version.unknownVersion()); + ObjectIdSerializer serializer = new ObjectIdSerializer(); + module.addSerializer(ObjectId.class, serializer); + + return module; + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index 455d705bcb..3a045a8024 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -18,10 +18,13 @@ */ package org.apache.nifi.processors.mongodb; +import java.text.SimpleDateFormat; +import java.util.Calendar; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -31,6 +34,7 @@ import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.bson.Document; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -48,11 +52,17 @@ public class GetMongoTest { private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase(); private static final String COLLECTION_NAME = "test"; - private static final List DOCUMENTS = Lists.newArrayList( - new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), - new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4), - new Document("_id", "doc_3").append("a", 1).append("b", 3) + private static final List DOCUMENTS; + private static final Calendar CAL; + + static { + CAL = Calendar.getInstance(); + DOCUMENTS = Lists.newArrayList( + new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), + new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4).append("date_field", CAL.getTime()), + new Document("_id", "doc_3").append("a", 1).append("b", 3) ); + } private TestRunner runner; private MongoClient mongoClient; @@ -82,6 +92,7 @@ public class GetMongoTest { @Test public void testValidators() { + TestRunner runner = TestRunners.newTestRunner(GetMongo.class); Collection results; ProcessContext pc; @@ -120,7 +131,7 @@ public class GetMongoTest { results = ((MockProcessContext) pc).validate(); } Assert.assertEquals(1, results.size()); - Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException")); + Assert.assertTrue(results.iterator().next().toString().contains("is invalid because")); // invalid projection runner.setVariable("projection", "{a: x,y,z}"); @@ -148,6 +159,24 @@ public class GetMongoTest { Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException")); } + @Test + public void testCleanJson() throws Exception { + runner.setVariable("query", "{\"_id\": \"doc_2\"}"); + runner.setProperty(GetMongo.QUERY, "${query}"); + runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); + runner.run(); + + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + byte[] raw = runner.getContentAsByteArray(flowFiles.get(0)); + ObjectMapper mapper = new ObjectMapper(); + Map parsed = mapper.readValue(raw, Map.class); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); + + Assert.assertTrue(parsed.get("date_field").getClass() == String.class); + Assert.assertTrue(((String)parsed.get("date_field")).startsWith(format.format(CAL.getTime()))); + } + @Test public void testReadOneDocument() throws Exception { runner.setVariable("query", "{a: 1, b: 3}");