From 57820d0d8894aa76b9540e66d485e09bd33e711e Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 27 Aug 2018 08:22:26 -0400 Subject: [PATCH] NIFI-5495 Made date format configurable. This closes #2969 Signed-off-by: zenfenan --- .../mongodb/AbstractMongoProcessor.java | 30 +++++++++++++++++-- .../nifi/processors/mongodb/GetMongo.java | 6 +++- .../mongodb/RunMongoAggregation.java | 4 ++- .../nifi/processors/mongodb/GetMongoIT.java | 23 ++++++++++++++ .../nifi-mongodb-services/pom.xml | 6 ++++ .../nifi/mongodb/MongoDBLookupService.java | 2 ++ 6 files changed, 67 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index cdaaab0105..507aa42f69 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -32,6 +32,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; @@ -51,6 +52,7 @@ import java.io.UnsupportedEncodingException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; @@ -173,6 +175,30 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() + .name("mongo-date-format") + .displayName("Date Format") + .description("The date format string to use for formatting Date fields that are returned from Mongo. It is only " + + "applied when the JSON output format is set to Standard JSON. Full documentation for format characters can be " + + "found here: https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html") + .defaultValue("yyyy-MM-dd'T'HH:mm:ss'Z'") + .addValidator((subject, input, context) -> { + ValidationResult.Builder result = new ValidationResult.Builder() + .subject(subject) + .input(input); + try { + new SimpleDateFormat(input).format(new Date()); + result.valid(true); + } catch (Exception ex) { + result.valid(false) + .explanation(ex.getMessage()); + } + + return result.build(); + }) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + static List descriptors = new ArrayList<>(); static { @@ -311,12 +337,12 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { session.transfer(flowFile, rel); } - protected synchronized void configureMapper(String setting) { + protected synchronized void configureMapper(String setting, String dateFormat) { objectMapper = new ObjectMapper(); if (setting.equals(JSON_TYPE_STANDARD)) { objectMapper.registerModule(ObjectIdSerializer.getModule()); - DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + DateFormat df = new SimpleDateFormat(dateFormat); objectMapper.setDateFormat(df); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 507c9ce3b3..b8d2f8d67f 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -164,6 +164,7 @@ public class GetMongo extends AbstractMongoProcessor { _propertyDescriptors.add(LIMIT); _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(RESULTS_PER_FLOWFILE); + _propertyDescriptors.add(DATE_FORMAT); _propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(CLIENT_AUTH); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); @@ -244,6 +245,9 @@ public class GetMongo extends AbstractMongoProcessor { final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null; + final String dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions(input).getValue(); + configureMapper(jsonTypeSetting, dateFormat); + final MongoCollection collection = getCollection(context, input); final FindIterable it = collection.find(query); @@ -264,7 +268,7 @@ public class GetMongo extends AbstractMongoProcessor { } try (MongoCursor cursor = it.iterator()) { - configureMapper(jsonTypeSetting); + configureMapper(jsonTypeSetting, dateFormat); if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { int sizePerBatch = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger(); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index c39b8ee664..c9e6d56b33 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -100,6 +100,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { _propertyDescriptors.add(QUERY_ATTRIBUTE); _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(RESULTS_PER_FLOWFILE); + _propertyDescriptors.add(DATE_FORMAT); _propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(CLIENT_AUTH); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); @@ -148,8 +149,9 @@ public class RunMongoAggregation extends AbstractMongoProcessor { final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); + final String dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions(flowFile).getValue(); - configureMapper(jsonTypeSetting); + configureMapper(jsonTypeSetting, dateFormat); Map attrs = new HashMap<>(); if (queryAttr != null && queryAttr.trim().length() > 0) { diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java index 08bc8cfaf5..42045c57f7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java @@ -46,6 +46,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; public class GetMongoIT { private static final String MONGO_URI = "mongodb://localhost"; @@ -556,4 +557,26 @@ public class GetMongoIT { Assert.assertEquals(COLLECTION_NAME, col); } } + + @Test + public void testDateFormat() throws Exception { + runner.setIncomingConnection(true); + runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); + runner.setProperty(GetMongo.DATE_FORMAT, "yyyy-MM-dd"); + runner.enqueue("{ \"_id\": \"doc_2\" }"); + runner.run(); + + runner.assertTransferCount(GetMongo.REL_FAILURE, 0); + runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); + runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); + MockFlowFile ff = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0); + byte[] content = runner.getContentAsByteArray(ff); + String json = new String(content); + Map result = new ObjectMapper().readValue(json, Map.class); + + Pattern format = Pattern.compile("([\\d]{4})-([\\d]{2})-([\\d]{2})"); + + Assert.assertTrue(result.containsKey("date_field")); + Assert.assertTrue(format.matcher((String)result.get("date_field")).matches()); + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml index 6db52bb1de..9bfe27f40a 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml @@ -79,6 +79,12 @@ nifi-schema-registry-service-api compile + + org.apache.nifi + nifi-json-utils + 1.8.0-SNAPSHOT + compile + diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index 6c4905a141..cd1829adf7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -26,6 +26,7 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.util.JsonValidator; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService; import org.apache.nifi.serialization.record.MapRecord; @@ -83,6 +84,7 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp .displayName("Projection") .description("Specifies a projection for limiting which fields will be returned.") .required(false) + .addValidator(JsonValidator.INSTANCE) .build(); private String lookupValueField;