mirror of https://github.com/apache/nifi.git
NIFI-8748 Corrected PutKudu String to java.sql.Date parsing
- Added getDateFormat() using default time zone instead of GMT time zone from DataTypeUtils.getDateFormat() NIFI-8748 Adjusted Date Format to use DataType.getFormat() Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #5194.
This commit is contained in:
parent
70d8fe8e00
commit
b27c2b500e
|
@ -55,11 +55,15 @@ import org.apache.nifi.util.StringUtils;
|
||||||
|
|
||||||
import javax.security.auth.login.LoginException;
|
import javax.security.auth.login.LoginException;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
|
import java.sql.Date;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
|
import java.text.DateFormat;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
@ -114,7 +118,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
.displayName("Kudu Operation Timeout")
|
.displayName("Kudu Operation Timeout")
|
||||||
.description("Default timeout used for user operations (using sessions and scanners)")
|
.description("Default timeout used for user operations (using sessions and scanners)")
|
||||||
.required(false)
|
.required(false)
|
||||||
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + "ms")
|
.defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms")
|
||||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.build();
|
.build();
|
||||||
|
@ -124,7 +128,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
.displayName("Kudu Keep Alive Period Timeout")
|
.displayName("Kudu Keep Alive Period Timeout")
|
||||||
.description("Default timeout used for user operations")
|
.description("Default timeout used for user operations")
|
||||||
.required(false)
|
.required(false)
|
||||||
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS) + "ms")
|
.defaultValue(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS + "ms")
|
||||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.build();
|
.build();
|
||||||
|
@ -403,7 +407,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
|
row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
|
||||||
break;
|
break;
|
||||||
case DATE:
|
case DATE:
|
||||||
row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
|
final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
|
||||||
|
final String format = fieldDataType.isPresent() ? fieldDataType.get().getFormat() : RecordFieldType.DATE.getDefaultFormat();
|
||||||
|
row.addDate(columnIndex, getDate(value, recordFieldName, format));
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new IllegalStateException(String.format("unknown column type %s", colType));
|
throw new IllegalStateException(String.format("unknown column type %s", colType));
|
||||||
|
@ -412,6 +418,28 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get java.sql.Date from Record Field Value with optional parsing when input value is a String
|
||||||
|
*
|
||||||
|
* @param value Record Field Value
|
||||||
|
* @param recordFieldName Record Field Name
|
||||||
|
* @param format Date Format Pattern
|
||||||
|
* @return Date object or null when value is null
|
||||||
|
*/
|
||||||
|
private Date getDate(final Object value, final String recordFieldName, final String format) {
|
||||||
|
return DataTypeUtils.toDate(value, () -> getDateFormat(format), recordFieldName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Date Format using Date Record Field default pattern and system time zone to avoid unnecessary conversion
|
||||||
|
*
|
||||||
|
* @param format Date Format Pattern
|
||||||
|
* @return Date Format used to parsing date fields
|
||||||
|
*/
|
||||||
|
private DateFormat getDateFormat(final String format) {
|
||||||
|
return new SimpleDateFormat(format);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts a NiFi DataType to it's equivalent Kudu Type.
|
* Converts a NiFi DataType to it's equivalent Kudu Type.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -75,6 +75,7 @@ import java.util.stream.IntStream;
|
||||||
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
|
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
|
||||||
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
|
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
|
||||||
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
|
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
@ -478,6 +479,31 @@ public class TestPutKudu {
|
||||||
row.getDate("sql_date").toString(), today.toString());
|
row.getDate("sql_date").toString(), today.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuildPartialRowWithDateString() {
|
||||||
|
final String dateFieldName = "created";
|
||||||
|
final String dateFieldValue = "2000-01-01";
|
||||||
|
|
||||||
|
final Schema kuduSchema = new Schema(Collections.singletonList(
|
||||||
|
new ColumnSchema.ColumnSchemaBuilder(dateFieldName, Type.DATE).nullable(true).build()
|
||||||
|
));
|
||||||
|
|
||||||
|
final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
|
||||||
|
new RecordField(dateFieldName, RecordFieldType.DATE.getDataType())
|
||||||
|
));
|
||||||
|
|
||||||
|
final Map<String, Object> values = new HashMap<>();
|
||||||
|
values.put(dateFieldName, dateFieldValue);
|
||||||
|
final MapRecord record = new MapRecord(schema, values);
|
||||||
|
|
||||||
|
final PartialRow row = kuduSchema.newPartialRow();
|
||||||
|
|
||||||
|
processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true);
|
||||||
|
|
||||||
|
final java.sql.Date rowDate = row.getDate(dateFieldName);
|
||||||
|
assertEquals("Partial Row Date Field not matched", dateFieldValue, rowDate.toString());
|
||||||
|
}
|
||||||
|
|
||||||
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) {
|
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) {
|
||||||
final Schema kuduSchema = new Schema(Arrays.asList(
|
final Schema kuduSchema = new Schema(Arrays.asList(
|
||||||
new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
|
new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
|
||||||
|
|
Loading…
Reference in New Issue