NIFI-5290 - PutKudu should support unixtime_micros columns

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2781.
This commit is contained in:
Junegunn Choi 2018-06-11 12:19:13 +09:00 committed by Pierre Villard
parent 1803c15bcd
commit 18a4819d51
2 changed files with 4 additions and 0 deletions

View File

@ -125,6 +125,7 @@ public class PutKudu extends AbstractKudu {
row.addInt(colIdx, record.getAsInt(colName)); row.addInt(colIdx, record.getAsInt(colName));
break; break;
case INT64: case INT64:
case UNIXTIME_MICROS:
row.addLong(colIdx, record.getAsLong(colName)); row.addLong(colIdx, record.getAsLong(colName));
break; break;
case STRING: case STRING:

View File

@ -330,6 +330,7 @@ public class TestPutKudu {
new ColumnSchemaBuilder("id", Type.INT64).key(true).build(), new ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
new ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(), new ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
new ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(), new ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
new ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
new ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes( new ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build() new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
).build())); ).build()));
@ -338,12 +339,14 @@ public class TestPutKudu {
new RecordField("id", RecordFieldType.BIGINT.getDataType()), new RecordField("id", RecordFieldType.BIGINT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.SHORT.getDataType()), new RecordField("age", RecordFieldType.SHORT.getDataType()),
new RecordField("updated_at", RecordFieldType.BIGINT.getDataType()),
new RecordField("score", RecordFieldType.LONG.getDataType()))); new RecordField("score", RecordFieldType.LONG.getDataType())));
Map<String, Object> values = new HashMap<>(); Map<String, Object> values = new HashMap<>();
values.put("id", id); values.put("id", id);
values.put("name", name); values.put("name", name);
values.put("age", age); values.put("age", age);
values.put("updated_at", System.currentTimeMillis() * 1000);
values.put("score", 10000L); values.put("score", 10000L);
new PutKudu().buildPartialRow( new PutKudu().buildPartialRow(
kuduSchema, kuduSchema,