diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 570b86d211..9636cf6d06 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -47,9 +47,11 @@ import org.apache.nifi.security.krb.KerberosKeytabUser; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import javax.security.auth.login.LoginException; import java.math.BigDecimal; +import java.sql.Timestamp; import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.List; @@ -182,47 +184,50 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { if (schema.getColumnByIndex(colIdx).isKey()) { throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); } else if(!schema.getColumnByIndex(colIdx).isNullable()) { - throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); + throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName)); } if (!ignoreNull) { row.setNull(colName); - continue; } } else { - switch (colType.getDataType(colSchema.getTypeAttributes())) { + Object value = record.getValue(recordFieldName); + switch (colType) { case BOOL: - row.addBoolean(colIdx, record.getAsBoolean(recordFieldName)); - break; - case FLOAT: - row.addFloat(colIdx, record.getAsFloat(recordFieldName)); - break; - case DOUBLE: - row.addDouble(colIdx, record.getAsDouble(recordFieldName)); - break; - case BINARY: - row.addBinary(colIdx, record.getAsString(recordFieldName).getBytes()); + row.addBoolean(colIdx, DataTypeUtils.toBoolean(value, recordFieldName)); break; case INT8: - row.addByte(colIdx, record.getAsInt(recordFieldName).byteValue()); + row.addByte(colIdx, DataTypeUtils.toByte(value, recordFieldName)); break; case INT16: - row.addShort(colIdx, record.getAsInt(recordFieldName).shortValue()); + row.addShort(colIdx, DataTypeUtils.toShort(value, recordFieldName)); break; case INT32: - row.addInt(colIdx, record.getAsInt(recordFieldName)); + row.addInt(colIdx, DataTypeUtils.toInteger(value, recordFieldName)); break; case INT64: + row.addLong(colIdx, DataTypeUtils.toLong(value, recordFieldName)); + break; case UNIXTIME_MICROS: - row.addLong(colIdx, record.getAsLong(recordFieldName)); + DataType fieldType = record.getSchema().getDataType(recordFieldName).get(); + Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName), + () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName); + row.addTimestamp(colIdx, timestamp); break; case STRING: - row.addString(colIdx, record.getAsString(recordFieldName)); + row.addString(colIdx, DataTypeUtils.toString(value, recordFieldName)); break; - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - row.addDecimal(colIdx, new BigDecimal(record.getAsString(recordFieldName))); + case BINARY: + row.addBinary(colIdx, DataTypeUtils.toString(value, recordFieldName).getBytes()); + break; + case FLOAT: + row.addFloat(colIdx, DataTypeUtils.toFloat(value, recordFieldName)); + break; + case DOUBLE: + row.addDouble(colIdx, DataTypeUtils.toDouble(value, recordFieldName)); + break; + case DECIMAL: + row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName))); break; default: throw new IllegalStateException(String.format("unknown column type %s", colType)); @@ -293,4 +298,4 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { return update; } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index b3ad1700e2..1be21cbee4 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -43,6 +43,7 @@ import org.junit.Rule; import org.junit.Test; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -54,6 +55,8 @@ public class ITPutKudu { public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; + public static final Timestamp NOW = new Timestamp(System.currentTimeMillis()); + // The KuduTestHarness automatically starts and stops a real Kudu cluster // when each test is run. Kudu persists its on-disk state in a temporary // directory under a location defined by the environment variable TEST_TMPDIR @@ -101,6 +104,7 @@ public class ITPutKudu { columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build()); Schema schema = new Schema(columns); CreateTableOptions opts = new CreateTableOptions() .addHashPartitions(Collections.singletonList("id"), 4); @@ -112,12 +116,13 @@ public class ITPutKudu { readerFactory.addSchemaField("id", RecordFieldType.INT); readerFactory.addSchemaField("stringVal", RecordFieldType.STRING); readerFactory.addSchemaField("num32Val", RecordFieldType.INT); + readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP); // Add two extra columns to test handleSchemaDrift = true. readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT); for (int i = 0; i < numOfRecord; i++) { - readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, 100.88 + i); + readerFactory.addRecord(i, "val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i); } testRunner.addControllerService("mock-reader-factory", readerFactory); @@ -183,14 +188,15 @@ public class ITPutKudu { KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME); // Verify the extra field was added. - Assert.assertEquals(5, kuduTable.getSchema().getColumnCount()); + Assert.assertEquals(6, kuduTable.getSchema().getColumnCount()); Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval")); Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval")); // Verify Kudu record count. KuduScanner scanner = client.newScannerBuilder(kuduTable).build(); int count = 0; - for (RowResult unused : scanner) { + for (RowResult row : scanner) { + Assert.assertEquals(NOW, row.getTimestamp("timestampval")); count++; } Assert.assertEquals(recordCount, count); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index 9be6952bb2..b55695f4cc 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -60,6 +60,7 @@ import org.mockito.stubbing.OngoingStubbing; import java.io.IOException; import java.io.InputStream; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -424,7 +425,7 @@ public class TestPutKudu { new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("age", RecordFieldType.SHORT.getDataType()), - new RecordField("updated_at", RecordFieldType.BIGINT.getDataType()), + new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()), new RecordField("score", RecordFieldType.LONG.getDataType()))); Map values = new HashMap<>(); @@ -432,7 +433,7 @@ public class TestPutKudu { values.put(recordIdName, id); values.put("name", name); values.put("age", age); - values.put("updated_at", System.currentTimeMillis() * 1000); + values.put("updated_at", new Timestamp(System.currentTimeMillis())); values.put("score", 10000L); processor.buildPartialRow( kuduSchema,