NIFI-7565 Add support for DATE to Kudu NAR bundle

- update Kudu dependencies to Kudu 1.13.0
- add support for passing java.sql.Date for Kudu DATE columns
- add tests for passing java.sql.Date to DATE columns

more about DATE type support in Kudu:
https://issues.apache.org/jira/browse/KUDU-2632

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4549.
This commit is contained in:
Greg Solovyev 2020-06-22 20:32:23 -07:00 committed by Pierre Villard
parent 7e0bcb98e1
commit e09317223e
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
7 changed files with 110 additions and 69 deletions

View File

@ -27,7 +27,7 @@
<properties> <properties>
<exclude.tests>None</exclude.tests> <exclude.tests>None</exclude.tests>
<kudu.version>1.12.0</kudu.version> <kudu.version>1.13.0</kudu.version>
</properties> </properties>
<build> <build>
<extensions> <extensions>

View File

@ -333,6 +333,9 @@ public class KuduLookupService extends AbstractControllerService implements Reco
case FLOAT: case FLOAT:
fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType())); fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType()));
break; break;
case DATE:
fields.add(new RecordField(cs.getName(), RecordFieldType.DATE.getDataType()));
break;
} }
} }
return new SimpleRecordSchema(fields); return new SimpleRecordSchema(fields);

View File

@ -67,9 +67,12 @@ public class ITestKuduLookupService {
.addTabletServerFlag("--use_hybrid_clock=false") .addTabletServerFlag("--use_hybrid_clock=false")
); );
private TestRunner testRunner; private TestRunner testRunner;
private long nowMillis = System.currentTimeMillis();
private KuduLookupService kuduLookupService; private KuduLookupService kuduLookupService;
private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis());
private final java.sql.Date pastDate = java.sql.Date.valueOf("2019-01-01");
private long nowMillis = System.currentTimeMillis();
public static class SampleProcessor extends AbstractProcessor { public static class SampleProcessor extends AbstractProcessor {
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@ -100,6 +103,7 @@ public class ITestKuduLookupService {
columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("sql_date", Type.DATE).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("varchar_3", Type.VARCHAR).typeAttributes( columns.add(new ColumnSchema.ColumnSchemaBuilder("varchar_3", Type.VARCHAR).typeAttributes(
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
).build()); ).build());
@ -124,6 +128,7 @@ public class ITestKuduLookupService {
row.addInt("int32",3); row.addInt("int32",3);
row.addLong("int64",4L); row.addLong("int64",4L);
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis)); row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
row.addDate("sql_date", today);
row.addVarchar("varchar_3", "SFO"); row.addVarchar("varchar_3", "SFO");
session.apply(insert); session.apply(insert);
@ -140,6 +145,7 @@ public class ITestKuduLookupService {
row.addInt("int32",13); row.addInt("int32",13);
row.addLong("int64",14L); row.addLong("int64",14L);
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year
row.addDate("sql_date", pastDate);
row.addVarchar("varchar_3", "SJC"); row.addVarchar("varchar_3", "SJC");
session.apply(insert); session.apply(insert);
@ -204,6 +210,7 @@ public class ITestKuduLookupService {
map.put("int32",3); map.put("int32",3);
map.put("int64",4L); map.put("int64",4L);
map.put("unixtime_micros", new Timestamp(nowMillis)); map.put("unixtime_micros", new Timestamp(nowMillis));
map.put("sql_date", today);
map.put("varchar_3", "SFO"); map.put("varchar_3", "SFO");
Record result = kuduLookupService.lookup(map).get(); Record result = kuduLookupService.lookup(map).get();
validateRow1(result); validateRow1(result);
@ -224,7 +231,6 @@ public class ITestKuduLookupService {
assertEquals(true, result.getAsBoolean("bool")); assertEquals(true, result.getAsBoolean("bool"));
} }
private void validateRow1(Record result){ private void validateRow1(Record result){
assertEquals("string1", result.getAsString("string")); assertEquals("string1", result.getAsString("string"));
assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary")); assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary"));
assertEquals(true, result.getAsBoolean("bool")); assertEquals(true, result.getAsBoolean("bool"));
@ -237,6 +243,7 @@ public class ITestKuduLookupService {
assertEquals(4L, (long)result.getAsLong("int64")); assertEquals(4L, (long)result.getAsLong("int64"));
assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros")); assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros"));
assertEquals("SFO", result.getValue("varchar_3")); assertEquals("SFO", result.getValue("varchar_3"));
assertEquals(today.toString(), result.getValue("sql_date").toString());
} }
} }

View File

@ -26,7 +26,7 @@
<properties> <properties>
<exclude.tests>None</exclude.tests> <exclude.tests>None</exclude.tests>
<kudu.version>1.12.0</kudu.version> <kudu.version>1.13.0</kudu.version>
</properties> </properties>
<build> <build>
<extensions> <extensions>

View File

@ -299,7 +299,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
if (colIdx != -1) { if (colIdx != -1) {
ColumnSchema colSchema = schema.getColumnByIndex(colIdx); ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
Type colType = colSchema.getType(); Type colType = colSchema.getType();
if (record.getValue(recordFieldName) == null) { if (record.getValue(recordFieldName) == null) {
if (schema.getColumnByIndex(colIdx).isKey()) { if (schema.getColumnByIndex(colIdx).isKey()) {
throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
@ -352,6 +351,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
case VARCHAR: case VARCHAR:
row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName)); row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName));
break; break;
case DATE:
row.addDate(colIdx, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
break;
default: default:
throw new IllegalStateException(String.format("unknown column type %s", colType)); throw new IllegalStateException(String.format("unknown column type %s", colType));
} }
@ -386,6 +388,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
case CHAR: case CHAR:
case STRING: case STRING:
return Type.STRING; return Type.STRING;
case DATE:
return Type.DATE;
default: default:
throw new IllegalArgumentException(String.format("unsupported type %s", nifiType)); throw new IllegalArgumentException(String.format("unsupported type %s", nifiType));
} }

View File

@ -76,6 +76,8 @@ public class ITPutKudu {
private MockRecordParser readerFactory; private MockRecordParser readerFactory;
private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis());
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
processor = new PutKudu(); processor = new PutKudu();
@ -109,6 +111,7 @@ public class ITPutKudu {
).build()); ).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("dateval", Type.DATE).build());
Schema schema = new Schema(columns); Schema schema = new Schema(columns);
CreateTableOptions opts = new CreateTableOptions() CreateTableOptions opts = new CreateTableOptions()
.addHashPartitions(Collections.singletonList("id"), 4); .addHashPartitions(Collections.singletonList("id"), 4);
@ -122,12 +125,13 @@ public class ITPutKudu {
readerFactory.addSchemaField("varcharval", RecordFieldType.STRING); readerFactory.addSchemaField("varcharval", RecordFieldType.STRING);
readerFactory.addSchemaField("num32Val", RecordFieldType.INT); readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP); readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP);
readerFactory.addSchemaField("dateval", RecordFieldType.DATE);
// Add two extra columns to test handleSchemaDrift = true. // Add two extra columns to test handleSchemaDrift = true.
readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT); readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
for (int i = 0; i < numOfRecord; i++) { for (int i = 0; i < numOfRecord; i++) {
readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i); readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, today, 100.88 + i, 100.88 + i);
} }
testRunner.addControllerService("mock-reader-factory", readerFactory); testRunner.addControllerService("mock-reader-factory", readerFactory);
@ -193,7 +197,7 @@ public class ITPutKudu {
KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME); KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
// Verify the extra field was added. // Verify the extra field was added.
Assert.assertEquals(7, kuduTable.getSchema().getColumnCount()); Assert.assertEquals(8, kuduTable.getSchema().getColumnCount());
Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval")); Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval")); Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
@ -202,6 +206,10 @@ public class ITPutKudu {
int count = 0; int count = 0;
for (RowResult row : scanner) { for (RowResult row : scanner) {
Assert.assertEquals(NOW, row.getTimestamp("timestampval")); Assert.assertEquals(NOW, row.getTimestamp("timestampval"));
// Comparing string representations, because java.sql.Date does not override
// java.util.Date.equals method and therefore compares milliseconds instead of
// comparing dates, even though java.sql.Date is supposed to ignore time
Assert.assertEquals(today.toString(), row.getDate("dateval").toString());
count++; count++;
} }
Assert.assertEquals(recordCount, count); Assert.assertEquals(recordCount, count);

View File

@ -86,7 +86,7 @@ public class TestPutKudu {
public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
public static final String DEFAULT_MASTERS = "testLocalHost:7051"; public static final String DEFAULT_MASTERS = "testLocalHost:7051";
public static final String SKIP_HEAD_LINE = "false"; public static final String SKIP_HEAD_LINE = "false";
public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal"; public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal,dateVal";
private TestRunner testRunner; private TestRunner testRunner;
@ -94,6 +94,8 @@ public class TestPutKudu {
private MockRecordParser readerFactory; private MockRecordParser readerFactory;
private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis());
@Before @Before
public void setUp() throws InitializationException { public void setUp() throws InitializationException {
processor = new MockPutKudu(); processor = new MockPutKudu();
@ -124,9 +126,10 @@ public class TestPutKudu {
readerFactory.addSchemaField("num32Val", RecordFieldType.INT); readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3))); readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3)));
readerFactory.addSchemaField("dateVal", RecordFieldType.DATE);
for (int i = 0; i < numOfRecord; i++) { for (int i = 0; i < numOfRecord; i++) {
readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, new BigDecimal(111.111D).add(BigDecimal.valueOf(i))); readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i,
new BigDecimal(111.111D).add(BigDecimal.valueOf(i)), today);
} }
testRunner.addControllerService("mock-reader-factory", readerFactory); testRunner.addControllerService("mock-reader-factory", readerFactory);
@ -310,7 +313,7 @@ public class TestPutKudu {
public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException { public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException {
createRecordReader(0); createRecordReader(0);
// add the favorite color as a string // add the favorite color as a string
readerFactory.addRecord(1, "name0", "0", "89.89", "111.111"); readerFactory.addRecord(1, "name0", "0", "89.89", "111.111", today);
final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis(); final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis();
@ -417,55 +420,65 @@ public class TestPutKudu {
@Test @Test
public void testBuildRow() { public void testBuildRow() {
buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO",false); buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO", null, false);
} }
@Test @Test
public void testBuildPartialRowNullable() { public void testBuildPartialRowNullable() {
buildPartialRow((long) 1, null, (short) 10, "id", "id", null, false); buildPartialRow((long) 1, null, (short) 10, "id", "id", null, null, false);
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testBuildPartialRowNullPrimaryKey() { public void testBuildPartialRowNullPrimaryKey() {
buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", false); buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", null, false);
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testBuildPartialRowNotNullable() { public void testBuildPartialRowNotNullable() {
buildPartialRow((long) 1, "foo", null, "id", "id", "SFO",false); buildPartialRow((long) 1, "foo", null, "id", "id", "SFO", null, false);
} }
@Test @Test
public void testBuildPartialRowLowercaseFields() { public void testBuildPartialRowLowercaseFields() {
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",true); PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, true);
row.getLong("id"); row.getLong("id");
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testBuildPartialRowLowercaseFieldsFalse() { public void testBuildPartialRowLowercaseFieldsFalse() {
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",false); PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, false);
row.getLong("id"); row.getLong("id");
} }
@Test @Test
public void testBuildPartialRowLowercaseFieldsKuduUpper() { public void testBuildPartialRowLowercaseFieldsKuduUpper() {
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", false); PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, false);
row.getLong("ID"); row.getLong("ID");
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testBuildPartialRowLowercaseFieldsKuduUpperFail() { public void testBuildPartialRowLowercaseFieldsKuduUpperFail() {
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", true); PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, true);
row.getLong("ID"); row.getLong("ID");
} }
@Test @Test
public void testBuildPartialRowVarCharTooLong() { public void testBuildPartialRowVarCharTooLong() {
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", true); PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", null, true);
Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code")); Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code"));
} }
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, Boolean lowercaseFields) { @Test
public void testBuildPartialRowWithDate() {
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", today, true);
// Comparing string representations of dates, because java.sql.Date does not override
// java.util.Date.equals method and therefore compares milliseconds instead of
// comparing dates, even though java.sql.Date is supposed to ignore time
Assert.assertEquals(String.format("Expecting the date to be %s, but got %s", today.toString(), row.getDate("sql_date").toString()),
row.getDate("sql_date").toString(), today.toString());
}
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) {
final Schema kuduSchema = new Schema(Arrays.asList( final Schema kuduSchema = new Schema(Arrays.asList(
new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(), new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(), new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
@ -476,7 +489,10 @@ public class TestPutKudu {
).build(), ).build(),
new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes( new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes(
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
).build())); ).build(),
new ColumnSchema.ColumnSchemaBuilder("sql_date", Type.DATE).nullable(true).build()
));
final RecordSchema schema = new SimpleRecordSchema(Arrays.asList( final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()), new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()),
@ -484,7 +500,9 @@ public class TestPutKudu {
new RecordField("age", RecordFieldType.SHORT.getDataType()), new RecordField("age", RecordFieldType.SHORT.getDataType()),
new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()), new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()),
new RecordField("score", RecordFieldType.LONG.getDataType()), new RecordField("score", RecordFieldType.LONG.getDataType()),
new RecordField("airport_code", RecordFieldType.STRING.getDataType()))); new RecordField("airport_code", RecordFieldType.STRING.getDataType()),
new RecordField("sql_date", RecordFieldType.DATE.getDataType())
));
Map<String, Object> values = new HashMap<>(); Map<String, Object> values = new HashMap<>();
PartialRow row = kuduSchema.newPartialRow(); PartialRow row = kuduSchema.newPartialRow();
@ -494,6 +512,7 @@ public class TestPutKudu {
values.put("updated_at", new Timestamp(System.currentTimeMillis())); values.put("updated_at", new Timestamp(System.currentTimeMillis()));
values.put("score", 10000L); values.put("score", 10000L);
values.put("airport_code", airport_code); values.put("airport_code", airport_code);
values.put("sql_date", sql_date);
processor.buildPartialRow( processor.buildPartialRow(
kuduSchema, kuduSchema,
row, row,