mirror of https://github.com/apache/nifi.git
NIFI-7551 Add support for VARCHAR to Kudu NAR bundle
- update Kudu dependencies to Kudu 1.12.0 - add VARCHAR to Kudu Lookup Service and Processor - add tests for VARCHAR columns Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4347.
This commit is contained in:
parent
27b5bb7a20
commit
a1b245e051
|
@ -27,7 +27,7 @@
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<exclude.tests>None</exclude.tests>
|
<exclude.tests>None</exclude.tests>
|
||||||
<kudu.version>1.10.0</kudu.version>
|
<kudu.version>1.12.0</kudu.version>
|
||||||
</properties>
|
</properties>
|
||||||
<build>
|
<build>
|
||||||
<extensions>
|
<extensions>
|
||||||
|
|
|
@ -321,6 +321,7 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
||||||
break;
|
break;
|
||||||
case BINARY:
|
case BINARY:
|
||||||
case STRING:
|
case STRING:
|
||||||
|
case VARCHAR:
|
||||||
fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
|
fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
|
||||||
break;
|
break;
|
||||||
case DOUBLE:
|
case DOUBLE:
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.controller.kudu;
|
package org.apache.nifi.controller.kudu;
|
||||||
|
|
||||||
import org.apache.kudu.ColumnSchema;
|
import org.apache.kudu.ColumnSchema;
|
||||||
|
import org.apache.kudu.ColumnTypeAttributes;
|
||||||
import org.apache.kudu.Schema;
|
import org.apache.kudu.Schema;
|
||||||
import org.apache.kudu.Type;
|
import org.apache.kudu.Type;
|
||||||
import org.apache.kudu.client.CreateTableOptions;
|
import org.apache.kudu.client.CreateTableOptions;
|
||||||
|
@ -99,6 +100,9 @@ public class ITestKuduLookupService {
|
||||||
columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build());
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build());
|
||||||
columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build());
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build());
|
||||||
columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build());
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build());
|
||||||
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("varchar_3", Type.VARCHAR).typeAttributes(
|
||||||
|
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
|
||||||
|
).build());
|
||||||
Schema schema = new Schema(columns);
|
Schema schema = new Schema(columns);
|
||||||
|
|
||||||
CreateTableOptions opts = new CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string"));
|
CreateTableOptions opts = new CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string"));
|
||||||
|
@ -120,6 +124,7 @@ public class ITestKuduLookupService {
|
||||||
row.addInt("int32",3);
|
row.addInt("int32",3);
|
||||||
row.addLong("int64",4L);
|
row.addLong("int64",4L);
|
||||||
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
|
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
|
||||||
|
row.addVarchar("varchar_3", "SFO");
|
||||||
session.apply(insert);
|
session.apply(insert);
|
||||||
|
|
||||||
insert = table.newInsert();
|
insert = table.newInsert();
|
||||||
|
@ -135,6 +140,7 @@ public class ITestKuduLookupService {
|
||||||
row.addInt("int32",13);
|
row.addInt("int32",13);
|
||||||
row.addLong("int64",14L);
|
row.addLong("int64",14L);
|
||||||
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year
|
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year
|
||||||
|
row.addVarchar("varchar_3", "SJC");
|
||||||
session.apply(insert);
|
session.apply(insert);
|
||||||
|
|
||||||
session.close();
|
session.close();
|
||||||
|
@ -198,6 +204,7 @@ public class ITestKuduLookupService {
|
||||||
map.put("int32",3);
|
map.put("int32",3);
|
||||||
map.put("int64",4L);
|
map.put("int64",4L);
|
||||||
map.put("unixtime_micros", new Timestamp(nowMillis));
|
map.put("unixtime_micros", new Timestamp(nowMillis));
|
||||||
|
map.put("varchar_3", "SFO");
|
||||||
Record result = kuduLookupService.lookup(map).get();
|
Record result = kuduLookupService.lookup(map).get();
|
||||||
validateRow1(result);
|
validateRow1(result);
|
||||||
}
|
}
|
||||||
|
@ -229,6 +236,7 @@ public class ITestKuduLookupService {
|
||||||
assertEquals(3, (int)result.getAsInt("int32"));
|
assertEquals(3, (int)result.getAsInt("int32"));
|
||||||
assertEquals(4L, (long)result.getAsLong("int64"));
|
assertEquals(4L, (long)result.getAsLong("int64"));
|
||||||
assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros"));
|
assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros"));
|
||||||
|
assertEquals("SFO", result.getValue("varchar_3"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<exclude.tests>None</exclude.tests>
|
<exclude.tests>None</exclude.tests>
|
||||||
<kudu.version>1.10.0</kudu.version>
|
<kudu.version>1.12.0</kudu.version>
|
||||||
</properties>
|
</properties>
|
||||||
<build>
|
<build>
|
||||||
<extensions>
|
<extensions>
|
||||||
|
|
|
@ -349,6 +349,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
case DECIMAL:
|
case DECIMAL:
|
||||||
row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
|
row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
|
||||||
break;
|
break;
|
||||||
|
case VARCHAR:
|
||||||
|
row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName));
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new IllegalStateException(String.format("unknown column type %s", colType));
|
throw new IllegalStateException(String.format("unknown column type %s", colType));
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.processors.kudu;
|
package org.apache.nifi.processors.kudu;
|
||||||
|
|
||||||
import org.apache.kudu.ColumnSchema;
|
import org.apache.kudu.ColumnSchema;
|
||||||
|
import org.apache.kudu.ColumnTypeAttributes;
|
||||||
import org.apache.kudu.Schema;
|
import org.apache.kudu.Schema;
|
||||||
import org.apache.kudu.Type;
|
import org.apache.kudu.Type;
|
||||||
import org.apache.kudu.client.CreateTableOptions;
|
import org.apache.kudu.client.CreateTableOptions;
|
||||||
|
@ -103,6 +104,9 @@ public class ITPutKudu {
|
||||||
List<ColumnSchema> columns = new ArrayList<>();
|
List<ColumnSchema> columns = new ArrayList<>();
|
||||||
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
|
||||||
columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build());
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build());
|
||||||
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("varcharval", Type.VARCHAR).typeAttributes(
|
||||||
|
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(256).build()
|
||||||
|
).build());
|
||||||
columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build());
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build());
|
||||||
columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build());
|
columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build());
|
||||||
Schema schema = new Schema(columns);
|
Schema schema = new Schema(columns);
|
||||||
|
@ -115,6 +119,7 @@ public class ITPutKudu {
|
||||||
readerFactory = new MockRecordParser();
|
readerFactory = new MockRecordParser();
|
||||||
readerFactory.addSchemaField("id", RecordFieldType.INT);
|
readerFactory.addSchemaField("id", RecordFieldType.INT);
|
||||||
readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
|
readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
|
||||||
|
readerFactory.addSchemaField("varcharval", RecordFieldType.STRING);
|
||||||
readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
|
readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
|
||||||
readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP);
|
readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP);
|
||||||
// Add two extra columns to test handleSchemaDrift = true.
|
// Add two extra columns to test handleSchemaDrift = true.
|
||||||
|
@ -122,7 +127,7 @@ public class ITPutKudu {
|
||||||
readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
|
readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
|
||||||
|
|
||||||
for (int i = 0; i < numOfRecord; i++) {
|
for (int i = 0; i < numOfRecord; i++) {
|
||||||
readerFactory.addRecord(i, "val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i);
|
readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
testRunner.addControllerService("mock-reader-factory", readerFactory);
|
testRunner.addControllerService("mock-reader-factory", readerFactory);
|
||||||
|
@ -188,7 +193,7 @@ public class ITPutKudu {
|
||||||
KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
|
KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
|
||||||
|
|
||||||
// Verify the extra field was added.
|
// Verify the extra field was added.
|
||||||
Assert.assertEquals(6, kuduTable.getSchema().getColumnCount());
|
Assert.assertEquals(7, kuduTable.getSchema().getColumnCount());
|
||||||
Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
|
Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
|
||||||
Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
|
Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
|
||||||
|
|
||||||
|
|
|
@ -417,49 +417,55 @@ public class TestPutKudu {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuildRow() {
|
public void testBuildRow() {
|
||||||
buildPartialRow((long) 1, "foo", (short) 10, "id", "id", false);
|
buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO",false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuildPartialRowNullable() {
|
public void testBuildPartialRowNullable() {
|
||||||
buildPartialRow((long) 1, null, (short) 10, "id", "id", false);
|
buildPartialRow((long) 1, null, (short) 10, "id", "id", null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
public void testBuildPartialRowNullPrimaryKey() {
|
public void testBuildPartialRowNullPrimaryKey() {
|
||||||
buildPartialRow(null, "foo", (short) 10, "id", "id", false);
|
buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
public void testBuildPartialRowNotNullable() {
|
public void testBuildPartialRowNotNullable() {
|
||||||
buildPartialRow((long) 1, "foo", null, "id", "id", false);
|
buildPartialRow((long) 1, "foo", null, "id", "id", "SFO",false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuildPartialRowLowercaseFields() {
|
public void testBuildPartialRowLowercaseFields() {
|
||||||
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", true);
|
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",true);
|
||||||
row.getLong("id");
|
row.getLong("id");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
public void testBuildPartialRowLowercaseFieldsFalse() {
|
public void testBuildPartialRowLowercaseFieldsFalse() {
|
||||||
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", false);
|
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",false);
|
||||||
row.getLong("id");
|
row.getLong("id");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuildPartialRowLowercaseFieldsKuduUpper() {
|
public void testBuildPartialRowLowercaseFieldsKuduUpper() {
|
||||||
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", false);
|
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", false);
|
||||||
row.getLong("ID");
|
row.getLong("ID");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
public void testBuildPartialRowLowercaseFieldsKuduUpperFail() {
|
public void testBuildPartialRowLowercaseFieldsKuduUpperFail() {
|
||||||
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", true);
|
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", true);
|
||||||
row.getLong("ID");
|
row.getLong("ID");
|
||||||
}
|
}
|
||||||
|
|
||||||
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, Boolean lowercaseFields) {
|
@Test
|
||||||
|
public void testBuildPartialRowVarCharTooLong() {
|
||||||
|
PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", true);
|
||||||
|
Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, Boolean lowercaseFields) {
|
||||||
final Schema kuduSchema = new Schema(Arrays.asList(
|
final Schema kuduSchema = new Schema(Arrays.asList(
|
||||||
new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
|
new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
|
||||||
new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
|
new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
|
||||||
|
@ -467,6 +473,9 @@ public class TestPutKudu {
|
||||||
new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
|
new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
|
||||||
new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
|
new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
|
||||||
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
|
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
|
||||||
|
).build(),
|
||||||
|
new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes(
|
||||||
|
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
|
||||||
).build()));
|
).build()));
|
||||||
|
|
||||||
final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
|
final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
|
||||||
|
@ -474,7 +483,8 @@ public class TestPutKudu {
|
||||||
new RecordField("name", RecordFieldType.STRING.getDataType()),
|
new RecordField("name", RecordFieldType.STRING.getDataType()),
|
||||||
new RecordField("age", RecordFieldType.SHORT.getDataType()),
|
new RecordField("age", RecordFieldType.SHORT.getDataType()),
|
||||||
new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()),
|
new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()),
|
||||||
new RecordField("score", RecordFieldType.LONG.getDataType())));
|
new RecordField("score", RecordFieldType.LONG.getDataType()),
|
||||||
|
new RecordField("airport_code", RecordFieldType.STRING.getDataType())));
|
||||||
|
|
||||||
Map<String, Object> values = new HashMap<>();
|
Map<String, Object> values = new HashMap<>();
|
||||||
PartialRow row = kuduSchema.newPartialRow();
|
PartialRow row = kuduSchema.newPartialRow();
|
||||||
|
@ -483,6 +493,7 @@ public class TestPutKudu {
|
||||||
values.put("age", age);
|
values.put("age", age);
|
||||||
values.put("updated_at", new Timestamp(System.currentTimeMillis()));
|
values.put("updated_at", new Timestamp(System.currentTimeMillis()));
|
||||||
values.put("score", 10000L);
|
values.put("score", 10000L);
|
||||||
|
values.put("airport_code", airport_code);
|
||||||
processor.buildPartialRow(
|
processor.buildPartialRow(
|
||||||
kuduSchema,
|
kuduSchema,
|
||||||
row,
|
row,
|
||||||
|
|
Loading…
Reference in New Issue