NIFI-5284: Added JSON_TYPE support to RunMongoAggregation

This closes #2776

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
zenfenan 2018-06-09 17:35:59 +05:30 committed by Mike Thomsen
parent cf3c666683
commit 794a642310
4 changed files with 88 additions and 47 deletions

View File

@ -18,6 +18,7 @@
*/
package org.apache.nifi.processors.mongodb;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientOptions.Builder;
@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -45,6 +47,8 @@ import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -57,6 +61,13 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
protected static final String JSON_TYPE_EXTENDED = "Extended";
protected static final String JSON_TYPE_STANDARD = "Standard";
protected static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
"Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver");
protected static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON",
"Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions.");
protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
.name("Mongo URI")
.displayName("Mongo URI")
@ -65,6 +76,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("Mongo Database Name")
.displayName("Mongo Database Name")
@ -73,6 +85,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
.name("Mongo Collection Name")
.description("The name of the collection to use")
@ -80,6 +93,19 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder()
.allowableValues(JSON_EXTENDED, JSON_STANDARD)
.defaultValue(JSON_TYPE_EXTENDED)
.displayName("JSON Type")
.name("json-type")
.description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" +
" may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " +
" controls whether to use extended JSON or provide a clean view that conforms to standard JSON.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
@ -88,6 +114,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("ssl-client-auth")
.displayName("Client Auth")
@ -155,6 +182,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
descriptors.add(CLIENT_AUTH);
}
protected ObjectMapper objectMapper;
protected MongoClient mongoClient;
@OnScheduled
@ -275,4 +303,14 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
session.getProvenanceReporter().receive(flowFile, getURI(context));
session.transfer(flowFile, rel);
}
protected synchronized void configureMapper(String setting) {
objectMapper = new ObjectMapper();
if (setting.equals(JSON_TYPE_STANDARD)) {
objectMapper.registerModule(ObjectIdSerializer.getModule());
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
objectMapper.setDateFormat(df);
}
}
}

View File

@ -45,8 +45,6 @@ import org.bson.json.JsonWriterSettings;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -55,7 +53,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({ "mongodb", "read", "get" })
@InputRequirement(Requirement.INPUT_ALLOWED)
@CapabilityDescription("Creates FlowFiles from documents in MongoDB")
@ -134,24 +131,6 @@ public class GetMongo extends AbstractMongoProcessor {
.addValidator(Validator.VALID)
.build();
static final String JSON_TYPE_EXTENDED = "Extended";
static final String JSON_TYPE_STANDARD = "Standard";
static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
"Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver");
static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON",
"Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions.");
static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder()
.allowableValues(JSON_EXTENDED, JSON_STANDARD)
.defaultValue(JSON_TYPE_EXTENDED)
.displayName("JSON Type")
.name("json-type")
.description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" +
" may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " +
" controls whether to use extended JSON or provide a clean view that conforms to standard JSON.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
@ -189,8 +168,6 @@ public class GetMongo extends AbstractMongoProcessor {
return propertyDescriptors;
}
private ObjectMapper mapper;
//Turn a list of Mongo result documents into a String representation of a JSON array
private String buildBatch(List<Document> documents, String jsonTypeSetting, String prettyPrintSetting) throws IOException {
StringBuilder builder = new StringBuilder();
@ -198,7 +175,7 @@ public class GetMongo extends AbstractMongoProcessor {
Document document = documents.get(index);
String asJson;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
asJson = getObjectWriter(mapper, prettyPrintSetting).writeValueAsString(document);
asJson = getObjectWriter(objectMapper, prettyPrintSetting).writeValueAsString(document);
} else {
asJson = document.toJson(new JsonWriterSettings(true));
}
@ -210,15 +187,6 @@ public class GetMongo extends AbstractMongoProcessor {
return "[" + builder.toString() + "]";
}
private void configureMapper(String setting) {
mapper = new ObjectMapper();
if (setting.equals(JSON_TYPE_STANDARD)) {
mapper.registerModule(ObjectIdSerializer.getModule());
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
mapper.setDateFormat(df);
}
}
private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter()
: mapper.writer();
@ -237,7 +205,7 @@ public class GetMongo extends AbstractMongoProcessor {
final ComponentLog logger = getLogger();
Map<String, String> attributes = new HashMap<String, String>();
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
final Document query;
@ -334,7 +302,7 @@ public class GetMongo extends AbstractMongoProcessor {
flowFile = session.write(flowFile, out -> {
String json;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
json = getObjectWriter(mapper, usePrettyPrint).writeValueAsString(cursor.next());
json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next());
} else {
json = cursor.next().toJson();
}

View File

@ -96,6 +96,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(QUERY);
_propertyDescriptors.add(JSON_TYPE);
_propertyDescriptors.add(QUERY_ATTRIBUTE);
_propertyDescriptors.add(BATCH_SIZE);
_propertyDescriptors.add(RESULTS_PER_FLOWFILE);
@ -120,11 +121,10 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
return propertyDescriptors;
}
static String buildBatch(List<Document> batch) {
ObjectMapper mapper = new ObjectMapper();
private String buildBatch(List<Document> batch) {
String retVal;
try {
retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
retVal = objectMapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
} catch (Exception e) {
retVal = null;
}
@ -143,12 +143,15 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
}
}
String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
Map<String, String> attrs = new HashMap<String, String>();
configureMapper(jsonTypeSetting);
Map<String, String> attrs = new HashMap<>();
if (queryAttr != null && queryAttr.trim().length() > 0) {
attrs.put(queryAttr, query);
}
@ -162,13 +165,13 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
it.batchSize(batchSize != null ? batchSize : 1);
iter = it.iterator();
List<Document> batch = new ArrayList<Document>();
List<Document> batch = new ArrayList<>();
while (iter.hasNext()) {
batch.add(iter.next());
if (batch.size() == resultsPerFlowfile) {
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
batch = new ArrayList<Document>();
batch = new ArrayList<>();
}
}

View File

@ -33,6 +33,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
@ -48,6 +49,7 @@ public class RunMongoAggregationIT {
private TestRunner runner;
private MongoClient mongoClient;
private Map<String, Integer> mappings;
private Calendar now = Calendar.getInstance();
@Before
public void setup() {
@ -68,7 +70,7 @@ public class RunMongoAggregationIT {
for (int x = 0; x < values.length; x++) {
for (int y = 0; y < x + 2; y++) {
Document doc = new Document().append("val", values[x]);
Document doc = new Document().append("val", values[x]).append("date", now.getTime());
collection.insertOne(doc);
}
mappings.put(values[x], x + 2);
@ -78,7 +80,6 @@ public class RunMongoAggregationIT {
@After
public void teardown() {
runner = null;
mongoClient.getDatabase(DB_NAME).drop();
}
@ -164,6 +165,37 @@ public class RunMongoAggregationIT {
runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1);
}
@Test
public void testJsonTypes() throws IOException {
runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_STANDARD);
runner.setProperty(RunMongoAggregation.QUERY, "[ { \"$project\": { \"myArray\": [ \"$val\", \"$date\" ] } } ]");
runner.enqueue("test");
runner.run(1, true, true);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
ObjectMapper mapper = new ObjectMapper();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
for (MockFlowFile mockFlowFile : flowFiles) {
byte[] raw = runner.getContentAsByteArray(mockFlowFile);
Map<String, List<String>> read = mapper.readValue(raw, Map.class);
Assert.assertTrue(read.get("myArray").get(1).equalsIgnoreCase( format.format(now.getTime())));
}
runner.clearTransferState();
runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_EXTENDED);
runner.enqueue("test");
runner.run(1, true, true);
flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
for (MockFlowFile mockFlowFile : flowFiles) {
byte[] raw = runner.getContentAsByteArray(mockFlowFile);
Map<String, List<Long>> read = mapper.readValue(raw, Map.class);
Assert.assertTrue(read.get("myArray").get(1) == now.getTimeInMillis());
}
}
private void evaluateRunner(int original) throws IOException {
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size());
runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original);