From a1b245e051245bb6c65e7b5ffc6ee982669b7ab7 Mon Sep 17 00:00:00 2001 From: Greg Solovyev Date: Wed, 17 Jun 2020 18:08:32 -0700 Subject: [PATCH] NIFI-7551 Add support for VARCHAR to Kudu NAR bundle - update Kudu dependencies to Kudu 1.12.0 - add VARCHAR to Kudu Lookup Service and Processor - add tests for VARCHAR columns Signed-off-by: Pierre Villard This closes #4347. --- .../nifi-kudu-controller-service/pom.xml | 2 +- .../controller/kudu/KuduLookupService.java | 1 + .../kudu/ITestKuduLookupService.java | 8 +++++ .../nifi-kudu-processors/pom.xml | 2 +- .../kudu/AbstractKuduProcessor.java | 3 ++ .../nifi/processors/kudu/ITPutKudu.java | 9 ++++-- .../nifi/processors/kudu/TestPutKudu.java | 31 +++++++++++++------ 7 files changed, 42 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml index 2572bae4f2..776f8f7374 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml @@ -27,7 +27,7 @@ None - 1.10.0 + 1.12.0 diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java index bc59b12a68..406c6e9843 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java @@ -321,6 +321,7 @@ public class KuduLookupService extends AbstractControllerService implements Reco break; case BINARY: case STRING: + case VARCHAR: fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType())); break; case DOUBLE: diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java index ae995af78f..93bf7d7a5f 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.kudu; import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.CreateTableOptions; @@ -99,6 +100,9 @@ public class ITestKuduLookupService { columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("varchar_3", Type.VARCHAR).typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() + ).build()); Schema schema = new Schema(columns); CreateTableOptions opts = new CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string")); @@ -120,6 +124,7 @@ public class ITestKuduLookupService { row.addInt("int32",3); row.addLong("int64",4L); row.addTimestamp("unixtime_micros", new Timestamp(nowMillis)); + row.addVarchar("varchar_3", "SFO"); session.apply(insert); insert = table.newInsert(); @@ -135,6 +140,7 @@ public class ITestKuduLookupService { row.addInt("int32",13); row.addLong("int64",14L); row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year + row.addVarchar("varchar_3", "SJC"); session.apply(insert); session.close(); @@ -198,6 +204,7 @@ public class ITestKuduLookupService { map.put("int32",3); map.put("int64",4L); map.put("unixtime_micros", new Timestamp(nowMillis)); + map.put("varchar_3", "SFO"); Record result = kuduLookupService.lookup(map).get(); validateRow1(result); } @@ -229,6 +236,7 @@ public class ITestKuduLookupService { assertEquals(3, (int)result.getAsInt("int32")); assertEquals(4L, (long)result.getAsLong("int64")); assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros")); + assertEquals("SFO", result.getValue("varchar_3")); } } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml index 5c1e752ec8..1bddd07cb6 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -26,7 +26,7 @@ None - 1.10.0 + 1.12.0 diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 36268d5202..ea82d85893 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -349,6 +349,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { case DECIMAL: row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName))); break; + case VARCHAR: + row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName)); + break; default: throw new IllegalStateException(String.format("unknown column type %s", colType)); } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index 1be21cbee4..7d6aa0ae79 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.kudu; import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.CreateTableOptions; @@ -103,6 +104,9 @@ public class ITPutKudu { List columns = new ArrayList<>(); columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("varcharval", Type.VARCHAR).typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(256).build() + ).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build()); Schema schema = new Schema(columns); @@ -115,6 +119,7 @@ public class ITPutKudu { readerFactory = new MockRecordParser(); readerFactory.addSchemaField("id", RecordFieldType.INT); readerFactory.addSchemaField("stringVal", RecordFieldType.STRING); + readerFactory.addSchemaField("varcharval", RecordFieldType.STRING); readerFactory.addSchemaField("num32Val", RecordFieldType.INT); readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP); // Add two extra columns to test handleSchemaDrift = true. @@ -122,7 +127,7 @@ public class ITPutKudu { readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT); for (int i = 0; i < numOfRecord; i++) { - readerFactory.addRecord(i, "val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i); + readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i); } testRunner.addControllerService("mock-reader-factory", readerFactory); @@ -188,7 +193,7 @@ public class ITPutKudu { KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME); // Verify the extra field was added. - Assert.assertEquals(6, kuduTable.getSchema().getColumnCount()); + Assert.assertEquals(7, kuduTable.getSchema().getColumnCount()); Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval")); Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval")); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index 97e3e3d5a8..da42ec8d6e 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -417,49 +417,55 @@ public class TestPutKudu { @Test public void testBuildRow() { - buildPartialRow((long) 1, "foo", (short) 10, "id", "id", false); + buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO",false); } @Test public void testBuildPartialRowNullable() { - buildPartialRow((long) 1, null, (short) 10, "id", "id", false); + buildPartialRow((long) 1, null, (short) 10, "id", "id", null, false); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowNullPrimaryKey() { - buildPartialRow(null, "foo", (short) 10, "id", "id", false); + buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", false); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowNotNullable() { - buildPartialRow((long) 1, "foo", null, "id", "id", false); + buildPartialRow((long) 1, "foo", null, "id", "id", "SFO",false); } @Test public void testBuildPartialRowLowercaseFields() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", true); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",true); row.getLong("id"); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowLowercaseFieldsFalse() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", false); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",false); row.getLong("id"); } @Test public void testBuildPartialRowLowercaseFieldsKuduUpper() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", false); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", false); row.getLong("ID"); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowLowercaseFieldsKuduUpperFail() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", true); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", true); row.getLong("ID"); } - private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, Boolean lowercaseFields) { + @Test + public void testBuildPartialRowVarCharTooLong() { + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", true); + Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code")); + } + + private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, Boolean lowercaseFields) { final Schema kuduSchema = new Schema(Arrays.asList( new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(), new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(), @@ -467,6 +473,9 @@ public class TestPutKudu { new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(), new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes( new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build() + ).build(), + new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() ).build())); final RecordSchema schema = new SimpleRecordSchema(Arrays.asList( @@ -474,7 +483,8 @@ public class TestPutKudu { new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("age", RecordFieldType.SHORT.getDataType()), new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()), - new RecordField("score", RecordFieldType.LONG.getDataType()))); + new RecordField("score", RecordFieldType.LONG.getDataType()), + new RecordField("airport_code", RecordFieldType.STRING.getDataType()))); Map values = new HashMap<>(); PartialRow row = kuduSchema.newPartialRow(); @@ -483,6 +493,7 @@ public class TestPutKudu { values.put("age", age); values.put("updated_at", new Timestamp(System.currentTimeMillis())); values.put("score", 10000L); + values.put("airport_code", airport_code); processor.buildPartialRow( kuduSchema, row,