diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index a6382d2312..667058dad5 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -400,6 +400,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { } } else { Object value = record.getValue(recordFieldName); + final Optional fieldDataType = record.getSchema().getDataType(recordFieldName); + final String dataTypeFormat = fieldDataType.map(DataType::getFormat).orElse(null); switch (colType) { case BOOL: row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName)); @@ -423,10 +425,10 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { row.addTimestamp(columnIndex, timestamp); break; case STRING: - row.addString(columnIndex, DataTypeUtils.toString(value, recordFieldName)); + row.addString(columnIndex, DataTypeUtils.toString(value, dataTypeFormat)); break; case BINARY: - row.addBinary(columnIndex, DataTypeUtils.toString(value, recordFieldName).getBytes()); + row.addBinary(columnIndex, DataTypeUtils.toString(value, dataTypeFormat).getBytes()); break; case FLOAT: row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName)); @@ -435,15 +437,14 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName)); break; case DECIMAL: - row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, recordFieldName))); + row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, dataTypeFormat))); break; case VARCHAR: - row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName)); + row.addVarchar(columnIndex, DataTypeUtils.toString(value, dataTypeFormat)); break; case DATE: - final Optional fieldDataType = record.getSchema().getDataType(recordFieldName); - final String format = fieldDataType.isPresent() ? fieldDataType.get().getFormat() : RecordFieldType.DATE.getDefaultFormat(); - row.addDate(columnIndex, getDate(value, recordFieldName, format)); + final String dateFormat = dataTypeFormat == null ? RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat; + row.addDate(columnIndex, getDate(value, recordFieldName, dateFormat)); break; default: throw new IllegalStateException(String.format("unknown column type %s", colType)); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index 73913cf473..126ca6ea7b 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -510,20 +510,30 @@ public class TestPutKudu { assertPartialRowDateFieldEquals(dateFieldValue); } + @Test + public void testBuildPartialRowWithDateToString() throws ParseException { + final SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_8601_YEAR_MONTH_DAY_PATTERN); + final java.util.Date dateFieldValue = dateFormat.parse(ISO_8601_YEAR_MONTH_DAY); + + final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.STRING); + final String column = row.getString(DATE_FIELD); + assertEquals("Partial Row Field not matched", ISO_8601_YEAR_MONTH_DAY, column); + } + @Test public void testBuildPartialRowWithDateString() { assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY); } private void assertPartialRowDateFieldEquals(final Object dateFieldValue) { - final PartialRow row = buildPartialRowDateField(dateFieldValue); + final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE); final java.sql.Date rowDate = row.getDate(DATE_FIELD); assertEquals("Partial Row Date Field not matched", ISO_8601_YEAR_MONTH_DAY, rowDate.toString()); } - private PartialRow buildPartialRowDateField(final Object dateFieldValue) { + private PartialRow buildPartialRowDateField(final Object dateFieldValue, final Type columnType) { final Schema kuduSchema = new Schema(Collections.singletonList( - new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, Type.DATE).nullable(true).build() + new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, columnType).nullable(true).build() )); final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(