diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java index 6fd1ddb624..fe058c09a9 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java @@ -57,6 +57,7 @@ import org.apache.nifi.serialization.SimpleDateFormatValidator; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.PushBackRecordSet; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; @@ -207,36 +208,34 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic .required(false) .build(); - static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder() + static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() .name("put-es-record-at-timestamp-date-format") - .displayName("@Timestamp Record Path Date Format") - .description("Specifies the format to use when writing Date field for @timestamp. " + .displayName("Date Format") + .description("Specifies the format to use when writing Date fields. " + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. " + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by " + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).") .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(new SimpleDateFormatValidator()) .required(false) - .dependsOn(AT_TIMESTAMP_RECORD_PATH) .build(); - static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder() + static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() .name("put-es-record-at-timestamp-time-format") - .displayName("@Timestamp Record Path Time Format") - .description("Specifies the format to use when writing Time field for @timestamp. " + .displayName("Time Format") + .description("Specifies the format to use when writing Time fields. " + "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. " + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by " + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).") .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(new SimpleDateFormatValidator()) .required(false) - .dependsOn(AT_TIMESTAMP_RECORD_PATH) .build(); - static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() + static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() .name("put-es-record-at-timestamp-timestamp-format") - .displayName("@Timestamp Record Path Timestamp Format") - .description("Specifies the format to use when writing Timestamp field for @timestamp. " + .displayName("Timestamp Format") + .description("Specifies the format to use when writing Timestamp fields. " + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. " + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by " + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by " @@ -244,14 +243,12 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(new SimpleDateFormatValidator()) .required(false) - .dependsOn(AT_TIMESTAMP_RECORD_PATH) .build(); static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD, INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD, - AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, - ERROR_RECORD_WRITER + DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER )); static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS @@ -305,15 +302,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean(); - this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue(); + this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue(); if (this.dateFormat == null) { this.dateFormat = RecordFieldType.DATE.getDefaultFormat(); } - this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue(); + this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue(); if (this.timeFormat == null) { this.timeFormat = RecordFieldType.TIME.getDefaultFormat(); } - this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue(); + this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue(); if (this.timestampFormat == null) { this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); } @@ -388,14 +385,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic Record record; while ((record = recordSet.next()) != null) { final String idx = getFromRecordPath(record, iPath, index, false); - final String t = getFromRecordPath(record, tPath, type, false); + final String t = getFromRecordPath(record, tPath, type, false); final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false)); - final String id = getFromRecordPath(record, path, null, retainId); + final String id = getFromRecordPath(record, path, null, retainId); final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp); @SuppressWarnings("unchecked") final Map contentMap = (Map) DataTypeUtils .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); + formatDateTimeFields(contentMap, record); contentMap.putIfAbsent("@timestamp", timestamp); operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o)); @@ -501,6 +499,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic return null; } + private void formatDateTimeFields(final Map contentMap, final Record record) { + for (final RecordField recordField : record.getSchema().getFields()) { + final Object value = contentMap.get(recordField.getFieldName()); + if (value != null) { + final DataType chosenDataType = recordField.getDataType().getFieldType() == RecordFieldType.CHOICE + ? DataTypeUtils.chooseDataType(record.getValue(recordField), (ChoiceDataType) recordField.getDataType()) + : recordField.getDataType(); + + final String format = determineDateFormat(chosenDataType.getFieldType()); + if (format != null) { + final Object formattedValue = coerceStringToLong( + recordField.getFieldName(), + DataTypeUtils.toString(value, () -> DataTypeUtils.getDateFormat(format)) + ); + contentMap.put(recordField.getFieldName(), formattedValue); + } + } + } + } + private String getFromRecordPath(final Record record, final RecordPath path, final String fallback, final boolean retain) { if (path == null) { @@ -599,8 +617,11 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic case TIME: format = this.timeFormat; break; - default: + case TIMESTAMP: format = this.timestampFormat; + break; + default: + format = null; } return format; } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy index 85a9f2ba60..2bbd51cdcd 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy @@ -334,7 +334,7 @@ class PutElasticsearchRecordTest { runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true") runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100") runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date") - runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy") + runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy") runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true") runner.enqueue(flowFileContents, [ "schema.name": "recordPathTest", @@ -443,6 +443,132 @@ class PutElasticsearchRecordTest { runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) } + @Test + void testDateTimeFormatting() { + def newSchema = prettyPrint(toJson([ + type: "record", + name: "DateTimeFormattingTestType", + fields: [ + [ name: "msg", type: ["null", "string"] ], + [ name: "ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ]] ], + [ name: "date", type: ["null", [ type: "int", logicalType: "date" ]] ], + [ name: "time", type: ["null", [ type: "int", logicalType: "time-millis" ]] ], + [ name: "choice_ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ], "string"] ] + ] + ])) + + def flowFileContents = prettyPrint(toJson([ + [ msg: "1", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ], + [ msg: "2", date: Date.valueOf(LOCAL_DATE).getTime() ], + [ msg: "3", time: Time.valueOf(LOCAL_TIME).getTime() ], + [ msg: "4", choice_ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ], + [ msg: "5", + ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli(), + time: Time.valueOf(LOCAL_TIME).getTime(), + date: Date.valueOf(LOCAL_DATE).getTime(), + choice_ts: "not-timestamp" + ] + ])) + + def evalClosure = { List items -> + int msg = items.findAll { (it.fields.get("msg") != null) }.size() + int timestamp = items.findAll { it.fields.get("ts") == + LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())) // "yyyy-MM-dd HH:mm:ss" + }.size() + int date = items.findAll { it.fields.get("date") == + LOCAL_DATE.format(DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat())) // "yyyy-MM-dd" + }.size() + int time = items.findAll { it.fields.get("time") == + LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())) // "HH:mm:ss" + }.size() + int choiceTs = items.findAll { it.fields.get("choice_ts") == + LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())) + }.size() + int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size() + int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size() + int tsNull = items.findAll { it.fields.get("ts") == null }.size() + int dateNull = items.findAll { it.fields.get("date") == null }.size() + int timeNull = items.findAll { it.fields.get("time") == null }.size() + int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size() + Assert.assertEquals(5, msg) + Assert.assertEquals(2, timestamp) + Assert.assertEquals(2, date) + Assert.assertEquals(2, time) + Assert.assertEquals(1, choiceTs) + Assert.assertEquals(1, choiceNotTs) + Assert.assertEquals(3, tsNull) + Assert.assertEquals(3, dateNull) + Assert.assertEquals(3, timeNull) + Assert.assertEquals(3, choiceTsNull) + Assert.assertEquals(5, atTimestampDefault) + } + + clientService.evalClosure = evalClosure + + registry.addSchema("dateTimeFormattingTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema))) + + runner.enqueue(flowFileContents, [ + "schema.name": "dateTimeFormattingTest" + ]) + + runner.run() + runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1) + runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) + + runner.clearTransferState() + + evalClosure = { List items -> + String timestampOutput = LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern("yy MMM D H")) + int msg = items.findAll { (it.fields.get("msg") != null) }.size() + int timestamp = items.findAll { it.fields.get("ts") == timestampOutput }.size() + int date = items.findAll { it.fields.get("date") == + LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy")) + }.size() + int time = items.findAll { it.fields.get("time") == + // converted to a Long because the output is completely numerical + Long.parseLong(LOCAL_TIME.format(DateTimeFormatter.ofPattern("HHmmss"))) + }.size() + int choiceTs = items.findAll { it.fields.get("choice_ts") == timestampOutput }.size() + int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size() + int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size() + int atTimestamp = items.findAll { it.fields.get("@timestamp") == timestampOutput }.size() + int tsNull = items.findAll { it.fields.get("ts") == null }.size() + int dateNull = items.findAll { it.fields.get("date") == null }.size() + int timeNull = items.findAll { it.fields.get("time") == null }.size() + int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size() + Assert.assertEquals(5, msg) + Assert.assertEquals(2, timestamp) + Assert.assertEquals(2, date) + Assert.assertEquals(2, time) + Assert.assertEquals(1, choiceTs) + Assert.assertEquals(1, choiceNotTs) + Assert.assertEquals(3, tsNull) + Assert.assertEquals(3, dateNull) + Assert.assertEquals(3, timeNull) + Assert.assertEquals(3, choiceTsNull) + Assert.assertEquals(2, atTimestamp) + Assert.assertEquals(3, atTimestampDefault) + } + + clientService.evalClosure = evalClosure + + runner.setProperty(PutElasticsearchRecord.TIMESTAMP_FORMAT, "yy MMM D H") + runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy") + runner.setProperty(PutElasticsearchRecord.TIME_FORMAT, "HHmmss") + runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts") + runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true") + + runner.enqueue(flowFileContents, [ + "schema.name": "dateTimeFormattingTest" + ]) + + runner.run() + runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1) + runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) + } + @Test void testInvalidIndexOperation() { runner.setProperty(PutElasticsearchRecord.INDEX_OP, "not-valid")