NIFI-6551: Improve PutKudu timestamp handling

Uses `DataTypeUtils.toTimestamp` when writing to Kudu
timestamp (`UNIXTIME_MICROS`) columns. This allows
us to use the `row.addTimestamp` API and get much more
intuitive and predictable timestamp ingest behavior.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4054.
This commit is contained in:
Grant Henke 2020-02-13 17:03:20 -06:00 committed by Pierre Villard
parent 09c7406d18
commit 4098404596
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
3 changed files with 40 additions and 28 deletions

View File

@ -47,9 +47,11 @@ import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import javax.security.auth.login.LoginException; import javax.security.auth.login.LoginException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.List; import java.util.List;
@ -182,47 +184,50 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
if (schema.getColumnByIndex(colIdx).isKey()) { if (schema.getColumnByIndex(colIdx).isKey()) {
throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
} else if(!schema.getColumnByIndex(colIdx).isNullable()) { } else if(!schema.getColumnByIndex(colIdx).isNullable()) {
throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName));
} }
if (!ignoreNull) { if (!ignoreNull) {
row.setNull(colName); row.setNull(colName);
continue;
} }
} else { } else {
switch (colType.getDataType(colSchema.getTypeAttributes())) { Object value = record.getValue(recordFieldName);
switch (colType) {
case BOOL: case BOOL:
row.addBoolean(colIdx, record.getAsBoolean(recordFieldName)); row.addBoolean(colIdx, DataTypeUtils.toBoolean(value, recordFieldName));
break;
case FLOAT:
row.addFloat(colIdx, record.getAsFloat(recordFieldName));
break;
case DOUBLE:
row.addDouble(colIdx, record.getAsDouble(recordFieldName));
break;
case BINARY:
row.addBinary(colIdx, record.getAsString(recordFieldName).getBytes());
break; break;
case INT8: case INT8:
row.addByte(colIdx, record.getAsInt(recordFieldName).byteValue()); row.addByte(colIdx, DataTypeUtils.toByte(value, recordFieldName));
break; break;
case INT16: case INT16:
row.addShort(colIdx, record.getAsInt(recordFieldName).shortValue()); row.addShort(colIdx, DataTypeUtils.toShort(value, recordFieldName));
break; break;
case INT32: case INT32:
row.addInt(colIdx, record.getAsInt(recordFieldName)); row.addInt(colIdx, DataTypeUtils.toInteger(value, recordFieldName));
break; break;
case INT64: case INT64:
row.addLong(colIdx, DataTypeUtils.toLong(value, recordFieldName));
break;
case UNIXTIME_MICROS: case UNIXTIME_MICROS:
row.addLong(colIdx, record.getAsLong(recordFieldName)); DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
() -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
row.addTimestamp(colIdx, timestamp);
break; break;
case STRING: case STRING:
row.addString(colIdx, record.getAsString(recordFieldName)); row.addString(colIdx, DataTypeUtils.toString(value, recordFieldName));
break; break;
case DECIMAL32: case BINARY:
case DECIMAL64: row.addBinary(colIdx, DataTypeUtils.toString(value, recordFieldName).getBytes());
case DECIMAL128: break;
row.addDecimal(colIdx, new BigDecimal(record.getAsString(recordFieldName))); case FLOAT:
row.addFloat(colIdx, DataTypeUtils.toFloat(value, recordFieldName));
break;
case DOUBLE:
row.addDouble(colIdx, DataTypeUtils.toDouble(value, recordFieldName));
break;
case DECIMAL:
row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
break; break;
default: default:
throw new IllegalStateException(String.format("unknown column type %s", colType)); throw new IllegalStateException(String.format("unknown column type %s", colType));

View File

@ -43,6 +43,7 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -54,6 +55,8 @@ public class ITPutKudu {
public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
public static final Timestamp NOW = new Timestamp(System.currentTimeMillis());
// The KuduTestHarness automatically starts and stops a real Kudu cluster // The KuduTestHarness automatically starts and stops a real Kudu cluster
// when each test is run. Kudu persists its on-disk state in a temporary // when each test is run. Kudu persists its on-disk state in a temporary
// directory under a location defined by the environment variable TEST_TMPDIR // directory under a location defined by the environment variable TEST_TMPDIR
@ -101,6 +104,7 @@ public class ITPutKudu {
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build());
Schema schema = new Schema(columns); Schema schema = new Schema(columns);
CreateTableOptions opts = new CreateTableOptions() CreateTableOptions opts = new CreateTableOptions()
.addHashPartitions(Collections.singletonList("id"), 4); .addHashPartitions(Collections.singletonList("id"), 4);
@ -112,12 +116,13 @@ public class ITPutKudu {
readerFactory.addSchemaField("id", RecordFieldType.INT); readerFactory.addSchemaField("id", RecordFieldType.INT);
readerFactory.addSchemaField("stringVal", RecordFieldType.STRING); readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
readerFactory.addSchemaField("num32Val", RecordFieldType.INT); readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP);
// Add two extra columns to test handleSchemaDrift = true. // Add two extra columns to test handleSchemaDrift = true.
readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT); readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
for (int i = 0; i < numOfRecord; i++) { for (int i = 0; i < numOfRecord; i++) {
readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, 100.88 + i); readerFactory.addRecord(i, "val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i);
} }
testRunner.addControllerService("mock-reader-factory", readerFactory); testRunner.addControllerService("mock-reader-factory", readerFactory);
@ -183,14 +188,15 @@ public class ITPutKudu {
KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME); KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
// Verify the extra field was added. // Verify the extra field was added.
Assert.assertEquals(5, kuduTable.getSchema().getColumnCount()); Assert.assertEquals(6, kuduTable.getSchema().getColumnCount());
Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval")); Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval")); Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
// Verify Kudu record count. // Verify Kudu record count.
KuduScanner scanner = client.newScannerBuilder(kuduTable).build(); KuduScanner scanner = client.newScannerBuilder(kuduTable).build();
int count = 0; int count = 0;
for (RowResult unused : scanner) { for (RowResult row : scanner) {
Assert.assertEquals(NOW, row.getTimestamp("timestampval"));
count++; count++;
} }
Assert.assertEquals(recordCount, count); Assert.assertEquals(recordCount, count);

View File

@ -60,6 +60,7 @@ import org.mockito.stubbing.OngoingStubbing;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -424,7 +425,7 @@ public class TestPutKudu {
new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()), new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.SHORT.getDataType()), new RecordField("age", RecordFieldType.SHORT.getDataType()),
new RecordField("updated_at", RecordFieldType.BIGINT.getDataType()), new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()),
new RecordField("score", RecordFieldType.LONG.getDataType()))); new RecordField("score", RecordFieldType.LONG.getDataType())));
Map<String, Object> values = new HashMap<>(); Map<String, Object> values = new HashMap<>();
@ -432,7 +433,7 @@ public class TestPutKudu {
values.put(recordIdName, id); values.put(recordIdName, id);
values.put("name", name); values.put("name", name);
values.put("age", age); values.put("age", age);
values.put("updated_at", System.currentTimeMillis() * 1000); values.put("updated_at", new Timestamp(System.currentTimeMillis()));
values.put("score", 10000L); values.put("score", 10000L);
processor.buildPartialRow( processor.buildPartialRow(
kuduSchema, kuduSchema,