NIFI-8153 custom date/time format properties for PutElasticsearchRecord

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5552.
This commit is contained in:
Chris Sampson 2021-11-26 12:31:49 +00:00 committed by Joe Gresock
parent 2844652f72
commit d45b23d89f
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
2 changed files with 168 additions and 21 deletions

View File

@ -57,6 +57,7 @@ import org.apache.nifi.serialization.SimpleDateFormatValidator;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@ -207,36 +208,34 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.required(false)
.build();
static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-date-format")
.displayName("@Timestamp Record Path Date Format")
.description("Specifies the format to use when writing Date field for @timestamp. "
.displayName("Date Format")
.description("Specifies the format to use when writing Date fields. "
+ "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder()
static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-time-format")
.displayName("@Timestamp Record Path Time Format")
.description("Specifies the format to use when writing Time field for @timestamp. "
.displayName("Time Format")
.description("Specifies the format to use when writing Time fields. "
+ "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-timestamp-format")
.displayName("@Timestamp Record Path Timestamp Format")
.description("Specifies the format to use when writing Timestamp field for @timestamp. "
.displayName("Timestamp Format")
.description("Specifies the format to use when writing Timestamp fields. "
+ "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
@ -244,14 +243,12 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
ERROR_RECORD_WRITER
DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
@ -305,15 +302,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue();
this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
if (this.dateFormat == null) {
this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
}
this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue();
this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
if (this.timeFormat == null) {
this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
}
this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
if (this.timestampFormat == null) {
this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
}
@ -388,14 +385,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
Record record;
while ((record = recordSet.next()) != null) {
final String idx = getFromRecordPath(record, iPath, index, false);
final String t = getFromRecordPath(record, tPath, type, false);
final String t = getFromRecordPath(record, tPath, type, false);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
final String id = getFromRecordPath(record, path, null, retainId);
final String id = getFromRecordPath(record, path, null, retainId);
final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
@SuppressWarnings("unchecked")
final Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils
.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
formatDateTimeFields(contentMap, record);
contentMap.putIfAbsent("@timestamp", timestamp);
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
@ -501,6 +499,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
return null;
}
private void formatDateTimeFields(final Map<String, Object> contentMap, final Record record) {
for (final RecordField recordField : record.getSchema().getFields()) {
final Object value = contentMap.get(recordField.getFieldName());
if (value != null) {
final DataType chosenDataType = recordField.getDataType().getFieldType() == RecordFieldType.CHOICE
? DataTypeUtils.chooseDataType(record.getValue(recordField), (ChoiceDataType) recordField.getDataType())
: recordField.getDataType();
final String format = determineDateFormat(chosenDataType.getFieldType());
if (format != null) {
final Object formattedValue = coerceStringToLong(
recordField.getFieldName(),
DataTypeUtils.toString(value, () -> DataTypeUtils.getDateFormat(format))
);
contentMap.put(recordField.getFieldName(), formattedValue);
}
}
}
}
private String getFromRecordPath(final Record record, final RecordPath path, final String fallback,
final boolean retain) {
if (path == null) {
@ -599,8 +617,11 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
case TIME:
format = this.timeFormat;
break;
default:
case TIMESTAMP:
format = this.timestampFormat;
break;
default:
format = null;
}
return format;
}

View File

@ -334,7 +334,7 @@ class PutElasticsearchRecordTest {
runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy")
runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest",
@ -443,6 +443,132 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
}
@Test
void testDateTimeFormatting() {
def newSchema = prettyPrint(toJson([
type: "record",
name: "DateTimeFormattingTestType",
fields: [
[ name: "msg", type: ["null", "string"] ],
[ name: "ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ]] ],
[ name: "date", type: ["null", [ type: "int", logicalType: "date" ]] ],
[ name: "time", type: ["null", [ type: "int", logicalType: "time-millis" ]] ],
[ name: "choice_ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ], "string"] ]
]
]))
def flowFileContents = prettyPrint(toJson([
[ msg: "1", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
[ msg: "2", date: Date.valueOf(LOCAL_DATE).getTime() ],
[ msg: "3", time: Time.valueOf(LOCAL_TIME).getTime() ],
[ msg: "4", choice_ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
[ msg: "5",
ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli(),
time: Time.valueOf(LOCAL_TIME).getTime(),
date: Date.valueOf(LOCAL_DATE).getTime(),
choice_ts: "not-timestamp"
]
]))
def evalClosure = { List<IndexOperationRequest> items ->
int msg = items.findAll { (it.fields.get("msg") != null) }.size()
int timestamp = items.findAll { it.fields.get("ts") ==
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())) // "yyyy-MM-dd HH:mm:ss"
}.size()
int date = items.findAll { it.fields.get("date") ==
LOCAL_DATE.format(DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat())) // "yyyy-MM-dd"
}.size()
int time = items.findAll { it.fields.get("time") ==
LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())) // "HH:mm:ss"
}.size()
int choiceTs = items.findAll { it.fields.get("choice_ts") ==
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat()))
}.size()
int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
int tsNull = items.findAll { it.fields.get("ts") == null }.size()
int dateNull = items.findAll { it.fields.get("date") == null }.size()
int timeNull = items.findAll { it.fields.get("time") == null }.size()
int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
Assert.assertEquals(5, msg)
Assert.assertEquals(2, timestamp)
Assert.assertEquals(2, date)
Assert.assertEquals(2, time)
Assert.assertEquals(1, choiceTs)
Assert.assertEquals(1, choiceNotTs)
Assert.assertEquals(3, tsNull)
Assert.assertEquals(3, dateNull)
Assert.assertEquals(3, timeNull)
Assert.assertEquals(3, choiceTsNull)
Assert.assertEquals(5, atTimestampDefault)
}
clientService.evalClosure = evalClosure
registry.addSchema("dateTimeFormattingTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
runner.enqueue(flowFileContents, [
"schema.name": "dateTimeFormattingTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.clearTransferState()
evalClosure = { List<IndexOperationRequest> items ->
String timestampOutput = LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern("yy MMM D H"))
int msg = items.findAll { (it.fields.get("msg") != null) }.size()
int timestamp = items.findAll { it.fields.get("ts") == timestampOutput }.size()
int date = items.findAll { it.fields.get("date") ==
LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy"))
}.size()
int time = items.findAll { it.fields.get("time") ==
// converted to a Long because the output is completely numerical
Long.parseLong(LOCAL_TIME.format(DateTimeFormatter.ofPattern("HHmmss")))
}.size()
int choiceTs = items.findAll { it.fields.get("choice_ts") == timestampOutput }.size()
int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
int atTimestamp = items.findAll { it.fields.get("@timestamp") == timestampOutput }.size()
int tsNull = items.findAll { it.fields.get("ts") == null }.size()
int dateNull = items.findAll { it.fields.get("date") == null }.size()
int timeNull = items.findAll { it.fields.get("time") == null }.size()
int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
Assert.assertEquals(5, msg)
Assert.assertEquals(2, timestamp)
Assert.assertEquals(2, date)
Assert.assertEquals(2, time)
Assert.assertEquals(1, choiceTs)
Assert.assertEquals(1, choiceNotTs)
Assert.assertEquals(3, tsNull)
Assert.assertEquals(3, dateNull)
Assert.assertEquals(3, timeNull)
Assert.assertEquals(3, choiceTsNull)
Assert.assertEquals(2, atTimestamp)
Assert.assertEquals(3, atTimestampDefault)
}
clientService.evalClosure = evalClosure
runner.setProperty(PutElasticsearchRecord.TIMESTAMP_FORMAT, "yy MMM D H")
runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
runner.setProperty(PutElasticsearchRecord.TIME_FORMAT, "HHmmss")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts")
runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
runner.enqueue(flowFileContents, [
"schema.name": "dateTimeFormattingTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
}
@Test
void testInvalidIndexOperation() {
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "not-valid")