NIFI-9457 Support microseconds for String Timestamps in PutKudu

- Implemented override for Timestamp Record Field Type format handling to add support for optional microseconds
- Added FieldConverter and ObjectTimestampFieldConverter implementation for generalized Timestamp parsing using DateTimeFormatter
- Updated PutKudu unit tests for standard Timestamp and Timestamp with microseconds

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5589.
This commit is contained in:
exceptionfactory 2021-12-09 12:22:04 -06:00 committed by Pierre Villard
parent 3d3f6ac070
commit b7ad1f924d
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 304 additions and 3 deletions

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.field;
import java.util.Optional;
/**
* Generalized Field Converter interface for handling type conversion with optional format parsing
*
* @param <I> Input Field Type
* @param <O> Output Field Type
*/
public interface FieldConverter<I, O> {
/**
* Convert Field using Output Field Type with optional format parsing
*
* @param field Input field to be converted
* @param pattern Format pattern optional for parsing
* @param name Input field name for tracking
* @return Converted Field can be null when input field is null or empty
*/
O convertField(I field, Optional<String> pattern, String name);
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.field;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Date;
import java.util.Optional;
/**
* Convert Object to java.sql.Timestamp using instanceof evaluation and optional format pattern for DateTimeFormatter
*/
public class ObjectTimestampFieldConverter implements FieldConverter<Object, Timestamp> {
/**
* Convert Object field to java.sql.Timestamp using optional format supported in DateTimeFormatter
*
* @param field Field can be null or a supported input type
* @param pattern Format pattern optional for parsing
* @param name Field name for tracking
* @return Timestamp or null when input field is null or empty string
* @throws IllegalTypeConversionException Thrown on parsing failures or unsupported types of input fields
*/
@Override
public Timestamp convertField(final Object field, final Optional<String> pattern, final String name) {
if (field == null) {
return null;
}
if (field instanceof Timestamp) {
return (Timestamp) field;
}
if (field instanceof Date) {
final Date date = (Date) field;
return new Timestamp(date.getTime());
}
if (field instanceof Number) {
final Number number = (Number) field;
return new Timestamp(number.longValue());
}
if (field instanceof String) {
final String string = field.toString().trim();
if (string.isEmpty()) {
return null;
}
if (pattern.isPresent()) {
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern.get());
try {
final LocalDateTime localDateTime = LocalDateTime.parse(string, formatter);
return Timestamp.valueOf(localDateTime);
} catch (final DateTimeParseException e) {
final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp LocalDateTime parsing failed: %s", name, field, e.getMessage());
throw new IllegalTypeConversionException(message);
}
} else {
try {
final long number = Long.parseLong(string);
return new Timestamp(number);
} catch (final NumberFormatException e) {
final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp Long parsing failed: %s", name, field, e.getMessage());
throw new IllegalTypeConversionException(message);
}
}
}
final String message = String.format("Convert Field Name [%s] Value [%s] Class [%s] to Timestamp not supported", name, field, field.getClass());
throw new IllegalTypeConversionException(message);
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.field;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ObjectTimestampFieldConverterTest {
private static final ObjectTimestampFieldConverter CONVERTER = new ObjectTimestampFieldConverter();
private static final Optional<String> DEFAULT_PATTERN = Optional.of(RecordFieldType.TIMESTAMP.getDefaultFormat());
private static final String FIELD_NAME = Timestamp.class.getSimpleName();
private static final String EMPTY = "";
private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00";
private static final Optional<String> DATE_TIME_NANOSECONDS_PATTERN = Optional.of("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
private static final String DATE_TIME_NANOSECONDS = "2000-01-01 12:00:00.123456789";
@Test
public void testConvertFieldNull() {
final Timestamp timestamp = CONVERTER.convertField(null, DEFAULT_PATTERN, FIELD_NAME);
assertNull(timestamp);
}
@Test
public void testConvertFieldTimestamp() {
final Timestamp field = new Timestamp(System.currentTimeMillis());
final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
assertEquals(field, timestamp);
}
@Test
public void testConvertFieldDate() {
final Date field = new Date();
final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
assertEquals(field.getTime(), timestamp.getTime());
}
@Test
public void testConvertFieldLong() {
final long field = System.currentTimeMillis();
final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
assertEquals(field, timestamp.getTime());
}
@Test
public void testConvertFieldStringEmpty() {
final Timestamp timestamp = CONVERTER.convertField(EMPTY, DEFAULT_PATTERN, FIELD_NAME);
assertNull(timestamp);
}
@Test
public void testConvertFieldStringFormatNull() {
final long currentTime = System.currentTimeMillis();
final String field = Long.toString(currentTime);
final Timestamp timestamp = CONVERTER.convertField(field, Optional.empty(), FIELD_NAME);
assertEquals(currentTime, timestamp.getTime());
}
@Test
public void testConvertFieldStringFormatNullNumberFormatException() {
final String field = String.class.getSimpleName();
final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(field, Optional.empty(), FIELD_NAME));
assertTrue(exception.getMessage().contains(field));
}
@Test
public void testConvertFieldStringFormatDefault() {
final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_DEFAULT, DEFAULT_PATTERN, FIELD_NAME);
final Timestamp expected = Timestamp.valueOf(DATE_TIME_DEFAULT);
assertEquals(expected, timestamp);
}
@Test
public void testConvertFieldStringFormatCustomNanoseconds() {
final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_NANOSECONDS, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME);
final Timestamp expected = Timestamp.valueOf(DATE_TIME_NANOSECONDS);
assertEquals(expected, timestamp);
}
@Test
public void testConvertFieldStringFormatCustomFormatterException() {
final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(DATE_TIME_DEFAULT, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME));
assertTrue(exception.getMessage().contains(DATE_TIME_DEFAULT));
}
}

View File

@ -50,6 +50,8 @@ import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.field.FieldConverter;
import org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter;
import org.apache.nifi.serialization.record.type.DecimalDataType; import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
@ -161,6 +163,10 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
private static final FieldConverter<Object, Timestamp> TIMESTAMP_FIELD_CONVERTER = new ObjectTimestampFieldConverter();
/** Timestamp Pattern overrides default RecordFieldType.TIMESTAMP pattern of yyyy-MM-dd HH:mm:ss with optional microseconds */
private static final String MICROSECOND_TIMESTAMP_PATTERN = "yyyy-MM-dd HH:mm:ss[.SSSSSS]";
private volatile KuduClient kuduClient; private volatile KuduClient kuduClient;
private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock(); private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock();
private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock(); private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
@ -419,9 +425,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName)); row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName));
break; break;
case UNIXTIME_MICROS: case UNIXTIME_MICROS:
DataType fieldType = record.getSchema().getDataType(recordFieldName).get(); final Optional<DataType> optionalDataType = record.getSchema().getDataType(recordFieldName);
Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName), final Optional<String> optionalPattern = getTimestampPattern(optionalDataType);
() -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName); final Timestamp timestamp = TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName);
row.addTimestamp(columnIndex, timestamp); row.addTimestamp(columnIndex, timestamp);
break; break;
case STRING: case STRING:
@ -453,6 +459,25 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
} }
} }
/**
* Get Timestamp Pattern and override Timestamp Record Field pattern with optional microsecond pattern
*
* @param optionalDataType Optional Data Type
* @return Optional Timestamp Pattern
*/
private Optional<String> getTimestampPattern(final Optional<DataType> optionalDataType) {
String pattern = null;
if (optionalDataType.isPresent()) {
final DataType dataType = optionalDataType.get();
if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) {
pattern = MICROSECOND_TIMESTAMP_PATTERN;
} else {
pattern = dataType.getFormat();
}
}
return Optional.ofNullable(pattern);
}
/** /**
* Get java.sql.Date from Record Field Value with optional parsing when input value is a String * Get java.sql.Date from Record Field Value with optional parsing when input value is a String
* *

View File

@ -96,6 +96,10 @@ public class TestPutKudu {
private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01"; private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01";
private static final String ISO_8601_YEAR_MONTH_DAY_PATTERN = "yyyy-MM-dd"; private static final String ISO_8601_YEAR_MONTH_DAY_PATTERN = "yyyy-MM-dd";
private static final String TIMESTAMP_FIELD = "updated";
private static final String TIMESTAMP_STANDARD = "2000-01-01 12:00:00";
private static final String TIMESTAMP_MICROSECONDS = "2000-01-01 12:00:00.123456";
private TestRunner testRunner; private TestRunner testRunner;
private MockPutKudu processor; private MockPutKudu processor;
@ -525,6 +529,41 @@ public class TestPutKudu {
assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY); assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY);
} }
@Test
public void testBuildPartialRowWithTimestampStandardString() {
assertPartialRowTimestampFieldEquals(TIMESTAMP_STANDARD);
}
@Test
public void testBuildPartialRowWithTimestampMicrosecondsString() {
assertPartialRowTimestampFieldEquals(TIMESTAMP_MICROSECONDS);
}
private void assertPartialRowTimestampFieldEquals(final Object timestampFieldValue) {
final PartialRow row = buildPartialRowTimestampField(timestampFieldValue);
final Timestamp timestamp = row.getTimestamp(TIMESTAMP_FIELD);
final Timestamp expected = Timestamp.valueOf(timestampFieldValue.toString());
assertEquals("Partial Row Timestamp Field not matched", expected, timestamp);
}
private PartialRow buildPartialRowTimestampField(final Object timestampFieldValue) {
final Schema kuduSchema = new Schema(Collections.singletonList(
new ColumnSchema.ColumnSchemaBuilder(TIMESTAMP_FIELD, Type.UNIXTIME_MICROS).nullable(true).build()
));
final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
new RecordField(TIMESTAMP_FIELD, RecordFieldType.TIMESTAMP.getDataType())
));
final Map<String, Object> values = new HashMap<>();
values.put(TIMESTAMP_FIELD, timestampFieldValue);
final MapRecord record = new MapRecord(schema, values);
final PartialRow row = kuduSchema.newPartialRow();
processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true);
return row;
}
private void assertPartialRowDateFieldEquals(final Object dateFieldValue) { private void assertPartialRowDateFieldEquals(final Object dateFieldValue) {
final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE); final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE);
final java.sql.Date rowDate = row.getDate(DATE_FIELD); final java.sql.Date rowDate = row.getDate(DATE_FIELD);