NIFI-5495 Made date format configurable.

This closes #2969

Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
Mike Thomsen 2018-08-27 08:22:26 -04:00 committed by zenfenan
parent fc1461298a
commit 57820d0d88
6 changed files with 67 additions and 4 deletions

View File

@ -32,6 +32,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@ -51,6 +52,7 @@ import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -173,6 +175,30 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("mongo-date-format")
.displayName("Date Format")
.description("The date format string to use for formatting Date fields that are returned from Mongo. It is only " +
"applied when the JSON output format is set to Standard JSON. Full documentation for format characters can be " +
"found here: https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html")
.defaultValue("yyyy-MM-dd'T'HH:mm:ss'Z'")
.addValidator((subject, input, context) -> {
ValidationResult.Builder result = new ValidationResult.Builder()
.subject(subject)
.input(input);
try {
new SimpleDateFormat(input).format(new Date());
result.valid(true);
} catch (Exception ex) {
result.valid(false)
.explanation(ex.getMessage());
}
return result.build();
})
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static List<PropertyDescriptor> descriptors = new ArrayList<>();
static {
@ -311,12 +337,12 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
session.transfer(flowFile, rel);
}
protected synchronized void configureMapper(String setting) {
protected synchronized void configureMapper(String setting, String dateFormat) {
objectMapper = new ObjectMapper();
if (setting.equals(JSON_TYPE_STANDARD)) {
objectMapper.registerModule(ObjectIdSerializer.getModule());
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
DateFormat df = new SimpleDateFormat(dateFormat);
objectMapper.setDateFormat(df);
}
}

View File

@ -164,6 +164,7 @@ public class GetMongo extends AbstractMongoProcessor {
_propertyDescriptors.add(LIMIT);
_propertyDescriptors.add(BATCH_SIZE);
_propertyDescriptors.add(RESULTS_PER_FLOWFILE);
_propertyDescriptors.add(DATE_FORMAT);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@ -244,6 +245,9 @@ public class GetMongo extends AbstractMongoProcessor {
final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
final String dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions(input).getValue();
configureMapper(jsonTypeSetting, dateFormat);
final MongoCollection<Document> collection = getCollection(context, input);
final FindIterable<Document> it = collection.find(query);
@ -264,7 +268,7 @@ public class GetMongo extends AbstractMongoProcessor {
}
try (MongoCursor<Document> cursor = it.iterator()) {
configureMapper(jsonTypeSetting);
configureMapper(jsonTypeSetting, dateFormat);
if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
int sizePerBatch = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();

View File

@ -100,6 +100,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
_propertyDescriptors.add(QUERY_ATTRIBUTE);
_propertyDescriptors.add(BATCH_SIZE);
_propertyDescriptors.add(RESULTS_PER_FLOWFILE);
_propertyDescriptors.add(DATE_FORMAT);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@ -148,8 +149,9 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
configureMapper(jsonTypeSetting);
configureMapper(jsonTypeSetting, dateFormat);
Map<String, String> attrs = new HashMap<>();
if (queryAttr != null && queryAttr.trim().length() > 0) {

View File

@ -46,6 +46,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class GetMongoIT {
private static final String MONGO_URI = "mongodb://localhost";
@ -556,4 +557,26 @@ public class GetMongoIT {
Assert.assertEquals(COLLECTION_NAME, col);
}
}
@Test
public void testDateFormat() throws Exception {
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.setProperty(GetMongo.DATE_FORMAT, "yyyy-MM-dd");
runner.enqueue("{ \"_id\": \"doc_2\" }");
runner.run();
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0);
byte[] content = runner.getContentAsByteArray(ff);
String json = new String(content);
Map<String, Object> result = new ObjectMapper().readValue(json, Map.class);
Pattern format = Pattern.compile("([\\d]{4})-([\\d]{2})-([\\d]{2})");
Assert.assertTrue(result.containsKey("date_field"));
Assert.assertTrue(format.matcher((String)result.get("date_field")).matches());
}
}

View File

@ -79,6 +79,12 @@
<artifactId>nifi-schema-registry-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@ -26,6 +26,7 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
import org.apache.nifi.serialization.record.MapRecord;
@ -83,6 +84,7 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
.displayName("Projection")
.description("Specifies a projection for limiting which fields will be returned.")
.required(false)
.addValidator(JsonValidator.INSTANCE)
.build();
private String lookupValueField;