NIFI-9471 Corrected PutKudu usage of DataTypeUtils.toString()

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5590.
This commit is contained in:
exceptionfactory 2021-12-09 15:00:24 -06:00 committed by Pierre Villard
parent f5dccb5522
commit 3d3f6ac070
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 21 additions and 10 deletions

View File

@ -400,6 +400,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
}
} else {
Object value = record.getValue(recordFieldName);
final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
final String dataTypeFormat = fieldDataType.map(DataType::getFormat).orElse(null);
switch (colType) {
case BOOL:
row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName));
@ -423,10 +425,10 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
row.addTimestamp(columnIndex, timestamp);
break;
case STRING:
row.addString(columnIndex, DataTypeUtils.toString(value, recordFieldName));
row.addString(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
break;
case BINARY:
row.addBinary(columnIndex, DataTypeUtils.toString(value, recordFieldName).getBytes());
row.addBinary(columnIndex, DataTypeUtils.toString(value, dataTypeFormat).getBytes());
break;
case FLOAT:
row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName));
@ -435,15 +437,14 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName));
break;
case DECIMAL:
row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
break;
case VARCHAR:
row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
row.addVarchar(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
break;
case DATE:
final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
final String format = fieldDataType.isPresent() ? fieldDataType.get().getFormat() : RecordFieldType.DATE.getDefaultFormat();
row.addDate(columnIndex, getDate(value, recordFieldName, format));
final String dateFormat = dataTypeFormat == null ? RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat;
row.addDate(columnIndex, getDate(value, recordFieldName, dateFormat));
break;
default:
throw new IllegalStateException(String.format("unknown column type %s", colType));

View File

@ -510,20 +510,30 @@ public class TestPutKudu {
assertPartialRowDateFieldEquals(dateFieldValue);
}
@Test
public void testBuildPartialRowWithDateToString() throws ParseException {
final SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_8601_YEAR_MONTH_DAY_PATTERN);
final java.util.Date dateFieldValue = dateFormat.parse(ISO_8601_YEAR_MONTH_DAY);
final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.STRING);
final String column = row.getString(DATE_FIELD);
assertEquals("Partial Row Field not matched", ISO_8601_YEAR_MONTH_DAY, column);
}
@Test
public void testBuildPartialRowWithDateString() {
assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY);
}
private void assertPartialRowDateFieldEquals(final Object dateFieldValue) {
final PartialRow row = buildPartialRowDateField(dateFieldValue);
final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE);
final java.sql.Date rowDate = row.getDate(DATE_FIELD);
assertEquals("Partial Row Date Field not matched", ISO_8601_YEAR_MONTH_DAY, rowDate.toString());
}
private PartialRow buildPartialRowDateField(final Object dateFieldValue) {
private PartialRow buildPartialRowDateField(final Object dateFieldValue, final Type columnType) {
final Schema kuduSchema = new Schema(Collections.singletonList(
new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, Type.DATE).nullable(true).build()
new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, columnType).nullable(true).build()
));
final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(