NIFI-4269 Added the ability to serialize Mongo documents to a clean JSON view instead of just extended JSON.

NIFI-4269 incorporated changes from the code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2063
This commit is contained in:
Mike Thomsen 2017-08-07 10:59:45 -04:00 committed by Matthew Burgess
parent 9ac88d210a
commit 527ce0b4ef
3 changed files with 146 additions and 24 deletions

View File

@ -18,15 +18,6 @@
*/ */
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.mongodb.client.FindIterable; import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoCursor;
@ -35,6 +26,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
@ -49,8 +41,19 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document; import org.bson.Document;
import org.bson.json.JsonWriterSettings;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({ "mongodb", "read", "get" }) @Tags({ "mongodb", "read", "get" })
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -120,12 +123,31 @@ public class GetMongo extends AbstractMongoProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
static final String JSON_TYPE_EXTENDED = "Extended";
static final String JSON_TYPE_STANDARD = "Standard";
static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
"Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver");
static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON",
"Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions.");
static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder()
.allowableValues(JSON_EXTENDED, JSON_STANDARD)
.defaultValue(JSON_TYPE_EXTENDED)
.displayName("JSON Type")
.name("json-type")
.description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" +
" may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " +
" controls whether to use extended JSON or provide a clean view that conforms to standard JSON.")
.expressionLanguageSupported(false)
.required(true)
.build();
private final static Set<Relationship> relationships; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
static { static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors); _propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(JSON_TYPE);
_propertyDescriptors.add(QUERY); _propertyDescriptors.add(QUERY);
_propertyDescriptors.add(PROJECTION); _propertyDescriptors.add(PROJECTION);
_propertyDescriptors.add(SORT); _propertyDescriptors.add(SORT);
@ -151,17 +173,34 @@ public class GetMongo extends AbstractMongoProcessor {
return propertyDescriptors; return propertyDescriptors;
} }
private ObjectMapper mapper = new ObjectMapper(); private ObjectMapper mapper;
//Turn a list of Mongo result documents into a String representation of a JSON array //Turn a list of Mongo result documents into a String representation of a JSON array
private String buildBatch(List<Document> documents) throws IOException { private String buildBatch(List<Document> documents, String jsonTypeSetting) throws IOException {
List<Map> docs = new ArrayList<>(); StringBuilder builder = new StringBuilder();
for (Document document : documents) { for (int index = 0; index < documents.size(); index++) {
String asJson = document.toJson(); Document document = documents.get(index);
docs.add(mapper.readValue(asJson, Map.class)); String asJson;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
asJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(document);
} else {
asJson = document.toJson(new JsonWriterSettings(true));
}
builder
.append(asJson)
.append( (documents.size() > 1 && index + 1 < documents.size()) ? ", " : "" );
} }
return mapper.writeValueAsString(docs); return "[" + builder.toString() + "]";
}
private void configureMapper(String setting) {
mapper = new ObjectMapper();
if (setting.equals(JSON_TYPE_STANDARD)) {
mapper.registerModule(ObjectIdSerializer.getModule());
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
mapper.setDateFormat(df);
}
} }
private void writeBatch(String payload, ProcessContext context, ProcessSession session) { private void writeBatch(String payload, ProcessContext context, ProcessSession session) {
@ -187,6 +226,9 @@ public class GetMongo extends AbstractMongoProcessor {
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null; ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null;
final Document sort = context.getProperty(SORT).isSet() final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null;
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
configureMapper(jsonTypeSetting);
final MongoCollection<Document> collection = getCollection(context); final MongoCollection<Document> collection = getCollection(context);
@ -220,7 +262,7 @@ public class GetMongo extends AbstractMongoProcessor {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Writing batch..."); log.debug("Writing batch...");
} }
String payload = buildBatch(batch); String payload = buildBatch(batch, jsonTypeSetting);
writeBatch(payload, context, session); writeBatch(payload, context, session);
batch = new ArrayList<>(); batch = new ArrayList<>();
} catch (IOException ex) { } catch (IOException ex) {
@ -230,7 +272,7 @@ public class GetMongo extends AbstractMongoProcessor {
} }
if (batch.size() > 0) { if (batch.size() > 0) {
try { try {
writeBatch(buildBatch(batch), context, session); writeBatch(buildBatch(batch, jsonTypeSetting), context, session);
} catch (IOException ex) { } catch (IOException ex) {
getLogger().error("Error sending remainder of batch", ex); getLogger().error("Error sending remainder of batch", ex);
} }
@ -241,7 +283,13 @@ public class GetMongo extends AbstractMongoProcessor {
flowFile = session.write(flowFile, new OutputStreamCallback() { flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override @Override
public void process(OutputStream out) throws IOException { public void process(OutputStream out) throws IOException {
IOUtils.write(cursor.next().toJson(), out); String json;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
} else {
json = cursor.next().toJson();
}
IOUtils.write(json, out);
} }
}); });
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import org.bson.types.ObjectId;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.JsonSerializer;
import org.codehaus.jackson.map.SerializerProvider;
import org.codehaus.jackson.map.module.SimpleModule;
import java.io.IOException;
public class ObjectIdSerializer extends JsonSerializer<ObjectId> {
@Override
public void serialize(ObjectId objectId, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
jsonGenerator.writeString(objectId.toString());
}
public static SimpleModule getModule() {
SimpleModule module = new SimpleModule("ObjectID Serializer", Version.unknownVersion());
ObjectIdSerializer serializer = new ObjectIdSerializer();
module.addSerializer(ObjectId.class, serializer);
return module;
}
}

View File

@ -18,10 +18,13 @@
*/ */
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -31,6 +34,7 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.bson.Document; import org.bson.Document;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -48,11 +52,17 @@ public class GetMongoTest {
private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase(); private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase();
private static final String COLLECTION_NAME = "test"; private static final String COLLECTION_NAME = "test";
private static final List<Document> DOCUMENTS = Lists.newArrayList( private static final List<Document> DOCUMENTS;
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), private static final Calendar CAL;
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
new Document("_id", "doc_3").append("a", 1).append("b", 3) static {
CAL = Calendar.getInstance();
DOCUMENTS = Lists.newArrayList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4).append("date_field", CAL.getTime()),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
); );
}
private TestRunner runner; private TestRunner runner;
private MongoClient mongoClient; private MongoClient mongoClient;
@ -82,6 +92,7 @@ public class GetMongoTest {
@Test @Test
public void testValidators() { public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(GetMongo.class); TestRunner runner = TestRunners.newTestRunner(GetMongo.class);
Collection<ValidationResult> results; Collection<ValidationResult> results;
ProcessContext pc; ProcessContext pc;
@ -120,7 +131,7 @@ public class GetMongoTest {
results = ((MockProcessContext) pc).validate(); results = ((MockProcessContext) pc).validate();
} }
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException")); Assert.assertTrue(results.iterator().next().toString().contains("is invalid because"));
// invalid projection // invalid projection
runner.setVariable("projection", "{a: x,y,z}"); runner.setVariable("projection", "{a: x,y,z}");
@ -148,6 +159,24 @@ public class GetMongoTest {
Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException")); Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException"));
} }
@Test
public void testCleanJson() throws Exception {
runner.setVariable("query", "{\"_id\": \"doc_2\"}");
runner.setProperty(GetMongo.QUERY, "${query}");
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> parsed = mapper.readValue(raw, Map.class);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
Assert.assertTrue(parsed.get("date_field").getClass() == String.class);
Assert.assertTrue(((String)parsed.get("date_field")).startsWith(format.format(CAL.getTime())));
}
@Test @Test
public void testReadOneDocument() throws Exception { public void testReadOneDocument() throws Exception {
runner.setVariable("query", "{a: 1, b: 3}"); runner.setVariable("query", "{a: 1, b: 3}");