From b27c2b500e1c2d889078d0a63fcb10556cfbe621 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Tue, 29 Jun 2021 09:54:05 -0500 Subject: [PATCH] NIFI-8748 Corrected PutKudu String to java.sql.Date parsing - Added getDateFormat() using default time zone instead of GMT time zone from DataTypeUtils.getDateFormat() NIFI-8748 Adjusted Date Format to use DataType.getFormat() Signed-off-by: Pierre Villard This closes #5194. --- .../kudu/AbstractKuduProcessor.java | 34 +++++++++++++++++-- .../nifi/processors/kudu/TestPutKudu.java | 26 ++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index b0d4566790..774430ee56 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -55,11 +55,15 @@ import org.apache.nifi.util.StringUtils; import javax.security.auth.login.LoginException; import java.math.BigDecimal; +import java.sql.Date; import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; @@ -114,7 +118,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .displayName("Kudu Operation Timeout") .description("Default timeout used for user operations (using sessions and scanners)") .required(false) - .defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + "ms") + .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); @@ -124,7 +128,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .displayName("Kudu Keep Alive Period Timeout") .description("Default timeout used for user operations") .required(false) - .defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS) + "ms") + .defaultValue(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS + "ms") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); @@ -403,7 +407,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName)); break; case DATE: - row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName)); + final Optional fieldDataType = record.getSchema().getDataType(recordFieldName); + final String format = fieldDataType.isPresent() ? fieldDataType.get().getFormat() : RecordFieldType.DATE.getDefaultFormat(); + row.addDate(columnIndex, getDate(value, recordFieldName, format)); break; default: throw new IllegalStateException(String.format("unknown column type %s", colType)); @@ -412,6 +418,28 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { } } + /** + * Get java.sql.Date from Record Field Value with optional parsing when input value is a String + * + * @param value Record Field Value + * @param recordFieldName Record Field Name + * @param format Date Format Pattern + * @return Date object or null when value is null + */ + private Date getDate(final Object value, final String recordFieldName, final String format) { + return DataTypeUtils.toDate(value, () -> getDateFormat(format), recordFieldName); + } + + /** + * Get Date Format using Date Record Field default pattern and system time zone to avoid unnecessary conversion + * + * @param format Date Format Pattern + * @return Date Format used to parsing date fields + */ + private DateFormat getDateFormat(final String format) { + return new SimpleDateFormat(format); + } + /** * Converts a NiFi DataType to it's equivalent Kudu Type. */ diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index 0c739ea096..1d397d9d69 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -75,6 +75,7 @@ import java.util.stream.IntStream; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -478,6 +479,31 @@ public class TestPutKudu { row.getDate("sql_date").toString(), today.toString()); } + @Test + public void testBuildPartialRowWithDateString() { + final String dateFieldName = "created"; + final String dateFieldValue = "2000-01-01"; + + final Schema kuduSchema = new Schema(Collections.singletonList( + new ColumnSchema.ColumnSchemaBuilder(dateFieldName, Type.DATE).nullable(true).build() + )); + + final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList( + new RecordField(dateFieldName, RecordFieldType.DATE.getDataType()) + )); + + final Map values = new HashMap<>(); + values.put(dateFieldName, dateFieldValue); + final MapRecord record = new MapRecord(schema, values); + + final PartialRow row = kuduSchema.newPartialRow(); + + processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true); + + final java.sql.Date rowDate = row.getDate(dateFieldName); + assertEquals("Partial Row Date Field not matched", dateFieldValue, rowDate.toString()); + } + private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) { final Schema kuduSchema = new Schema(Arrays.asList( new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),