diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml index d40fdb741f..15f0c50b2a 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml @@ -27,7 +27,7 @@ None - 1.12.0 + 1.13.0 diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java index 406c6e9843..93b0ce61aa 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java @@ -333,6 +333,9 @@ public class KuduLookupService extends AbstractControllerService implements Reco case FLOAT: fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType())); break; + case DATE: + fields.add(new RecordField(cs.getName(), RecordFieldType.DATE.getDataType())); + break; } } return new SimpleRecordSchema(fields); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java index 93bf7d7a5f..4ca3546d3e 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java @@ -67,9 +67,12 @@ public class ITestKuduLookupService { .addTabletServerFlag("--use_hybrid_clock=false") ); private TestRunner testRunner; - private long nowMillis = System.currentTimeMillis(); private KuduLookupService kuduLookupService; + private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis()); + private final java.sql.Date pastDate = java.sql.Date.valueOf("2019-01-01"); + private long nowMillis = System.currentTimeMillis(); + public static class SampleProcessor extends AbstractProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { @@ -100,6 +103,7 @@ public class ITestKuduLookupService { columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("sql_date", Type.DATE).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("varchar_3", Type.VARCHAR).typeAttributes( new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() ).build()); @@ -124,6 +128,7 @@ public class ITestKuduLookupService { row.addInt("int32",3); row.addLong("int64",4L); row.addTimestamp("unixtime_micros", new Timestamp(nowMillis)); + row.addDate("sql_date", today); row.addVarchar("varchar_3", "SFO"); session.apply(insert); @@ -140,6 +145,7 @@ public class ITestKuduLookupService { row.addInt("int32",13); row.addLong("int64",14L); row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year + row.addDate("sql_date", pastDate); row.addVarchar("varchar_3", "SJC"); session.apply(insert); @@ -204,6 +210,7 @@ public class ITestKuduLookupService { map.put("int32",3); map.put("int64",4L); map.put("unixtime_micros", new Timestamp(nowMillis)); + map.put("sql_date", today); map.put("varchar_3", "SFO"); Record result = kuduLookupService.lookup(map).get(); validateRow1(result); @@ -224,7 +231,6 @@ public class ITestKuduLookupService { assertEquals(true, result.getAsBoolean("bool")); } private void validateRow1(Record result){ - assertEquals("string1", result.getAsString("string")); assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary")); assertEquals(true, result.getAsBoolean("bool")); @@ -237,6 +243,7 @@ public class ITestKuduLookupService { assertEquals(4L, (long)result.getAsLong("int64")); assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros")); assertEquals("SFO", result.getValue("varchar_3")); + assertEquals(today.toString(), result.getValue("sql_date").toString()); } } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml index cba2878076..34dea1e051 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -26,7 +26,7 @@ None - 1.12.0 + 1.13.0 diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index ea82d85893..520fca394d 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -299,7 +299,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { if (colIdx != -1) { ColumnSchema colSchema = schema.getColumnByIndex(colIdx); Type colType = colSchema.getType(); - if (record.getValue(recordFieldName) == null) { if (schema.getColumnByIndex(colIdx).isKey()) { throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); @@ -352,6 +351,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { case VARCHAR: row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName)); break; + case DATE: + row.addDate(colIdx, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName)); + break; default: throw new IllegalStateException(String.format("unknown column type %s", colType)); } @@ -386,6 +388,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { case CHAR: case STRING: return Type.STRING; + case DATE: + return Type.DATE; default: throw new IllegalArgumentException(String.format("unsupported type %s", nifiType)); } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index 7d6aa0ae79..b6fa409467 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -76,6 +76,8 @@ public class ITPutKudu { private MockRecordParser readerFactory; + private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis()); + @Before public void setUp() throws Exception { processor = new PutKudu(); @@ -109,6 +111,7 @@ public class ITPutKudu { ).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("dateval", Type.DATE).build()); Schema schema = new Schema(columns); CreateTableOptions opts = new CreateTableOptions() .addHashPartitions(Collections.singletonList("id"), 4); @@ -122,12 +125,13 @@ public class ITPutKudu { readerFactory.addSchemaField("varcharval", RecordFieldType.STRING); readerFactory.addSchemaField("num32Val", RecordFieldType.INT); readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP); + readerFactory.addSchemaField("dateval", RecordFieldType.DATE); // Add two extra columns to test handleSchemaDrift = true. readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT); for (int i = 0; i < numOfRecord; i++) { - readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i); + readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, today, 100.88 + i, 100.88 + i); } testRunner.addControllerService("mock-reader-factory", readerFactory); @@ -193,7 +197,7 @@ public class ITPutKudu { KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME); // Verify the extra field was added. - Assert.assertEquals(7, kuduTable.getSchema().getColumnCount()); + Assert.assertEquals(8, kuduTable.getSchema().getColumnCount()); Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval")); Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval")); @@ -202,6 +206,10 @@ public class ITPutKudu { int count = 0; for (RowResult row : scanner) { Assert.assertEquals(NOW, row.getTimestamp("timestampval")); + // Comparing string representations, because java.sql.Date does not override + // java.util.Date.equals method and therefore compares milliseconds instead of + // comparing dates, even though java.sql.Date is supposed to ignore time + Assert.assertEquals(today.toString(), row.getDate("dateval").toString()); count++; } Assert.assertEquals(recordCount, count); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index da42ec8d6e..0c739ea096 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -86,7 +86,7 @@ public class TestPutKudu { public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; public static final String DEFAULT_MASTERS = "testLocalHost:7051"; public static final String SKIP_HEAD_LINE = "false"; - public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal"; + public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal,dateVal"; private TestRunner testRunner; @@ -94,6 +94,8 @@ public class TestPutKudu { private MockRecordParser readerFactory; + private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis()); + @Before public void setUp() throws InitializationException { processor = new MockPutKudu(); @@ -124,9 +126,10 @@ public class TestPutKudu { readerFactory.addSchemaField("num32Val", RecordFieldType.INT); readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3))); - - for (int i=0; i < numOfRecord; i++) { - readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, new BigDecimal(111.111D).add(BigDecimal.valueOf(i))); + readerFactory.addSchemaField("dateVal", RecordFieldType.DATE); + for (int i = 0; i < numOfRecord; i++) { + readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, + new BigDecimal(111.111D).add(BigDecimal.valueOf(i)), today); } testRunner.addControllerService("mock-reader-factory", readerFactory); @@ -165,7 +168,7 @@ public class TestPutKudu { final String filename = "testWriteKudu-" + System.currentTimeMillis(); - final Map flowFileAttributes = new HashMap<>(); + final Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); testRunner.enqueue("trigger", flowFileAttributes); @@ -238,7 +241,7 @@ public class TestPutKudu { final String filename = "testInvalidAvroShouldRouteToFailure-" + System.currentTimeMillis(); - final Map flowFileAttributes = new HashMap<>(); + final Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); testRunner.enqueue("trigger", flowFileAttributes); @@ -252,7 +255,7 @@ public class TestPutKudu { final String filename = "testValidSchemaShouldBeSuccessful-" + System.currentTimeMillis(); // don't provide my.schema as an attribute - final Map flowFileAttributes = new HashMap<>(); + final Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); flowFileAttributes.put("my.schema", TABLE_SCHEMA); @@ -268,7 +271,7 @@ public class TestPutKudu { createRecordReader(5); final String filename = "testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed-" + System.currentTimeMillis(); - final Map flowFileAttributes = new HashMap<>(); + final Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true"); @@ -298,7 +301,7 @@ public class TestPutKudu { final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); - final Map flowFileAttributes = new HashMap<>(); + final Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); testRunner.enqueue("trigger", flowFileAttributes); @@ -310,11 +313,11 @@ public class TestPutKudu { public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException { createRecordReader(0); // add the favorite color as a string - readerFactory.addRecord(1, "name0", "0", "89.89", "111.111"); + readerFactory.addRecord(1, "name0", "0", "89.89", "111.111", today); final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis(); - final Map flowFileAttributes = new HashMap<>(); + final Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); testRunner.enqueue("trigger", flowFileAttributes); @@ -325,11 +328,11 @@ public class TestPutKudu { @Test public void testMissingColumInReader() throws InitializationException, IOException { createRecordReader(0); - readerFactory.addRecord( "name0", "0", "89.89"); //missing id + readerFactory.addRecord("name0", "0", "89.89"); //missing id final String filename = "testMissingColumInReader-" + System.currentTimeMillis(); - final Map flowFileAttributes = new HashMap<>(); + final Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); testRunner.enqueue("trigger", flowFileAttributes); @@ -384,7 +387,7 @@ public class TestPutKudu { createRecordReader(50); testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.delete}"); - final Map attributes = new HashMap<>(); + final Map attributes = new HashMap<>(); attributes.put("kudu.record.delete", "DELETE"); testRunner.enqueue("string".getBytes(), attributes); @@ -402,7 +405,7 @@ public class TestPutKudu { createRecordReader(50); testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.update}"); - final Map attributes = new HashMap<>(); + final Map attributes = new HashMap<>(); attributes.put("kudu.record.update", "UPDATE"); testRunner.enqueue("string".getBytes(), attributes); @@ -417,74 +420,89 @@ public class TestPutKudu { @Test public void testBuildRow() { - buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO",false); + buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO", null, false); } @Test public void testBuildPartialRowNullable() { - buildPartialRow((long) 1, null, (short) 10, "id", "id", null, false); + buildPartialRow((long) 1, null, (short) 10, "id", "id", null, null, false); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowNullPrimaryKey() { - buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", false); + buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", null, false); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowNotNullable() { - buildPartialRow((long) 1, "foo", null, "id", "id", "SFO",false); + buildPartialRow((long) 1, "foo", null, "id", "id", "SFO", null, false); } @Test public void testBuildPartialRowLowercaseFields() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",true); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, true); row.getLong("id"); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowLowercaseFieldsFalse() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",false); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, false); row.getLong("id"); } @Test public void testBuildPartialRowLowercaseFieldsKuduUpper() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", false); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, false); row.getLong("ID"); } @Test(expected = IllegalArgumentException.class) public void testBuildPartialRowLowercaseFieldsKuduUpperFail() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", true); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, true); row.getLong("ID"); } @Test public void testBuildPartialRowVarCharTooLong() { - PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", true); + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", null, true); Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code")); } - private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, Boolean lowercaseFields) { + @Test + public void testBuildPartialRowWithDate() { + PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", today, true); + // Comparing string representations of dates, because java.sql.Date does not override + // java.util.Date.equals method and therefore compares milliseconds instead of + // comparing dates, even though java.sql.Date is supposed to ignore time + Assert.assertEquals(String.format("Expecting the date to be %s, but got %s", today.toString(), row.getDate("sql_date").toString()), + row.getDate("sql_date").toString(), today.toString()); + } + + private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) { final Schema kuduSchema = new Schema(Arrays.asList( - new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(), - new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(), - new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(), - new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(), - new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes( - new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build() - ).build(), - new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes( - new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() - ).build())); + new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(), + new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(), + new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(), + new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build() + ).build(), + new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() + ).build(), + new ColumnSchema.ColumnSchemaBuilder("sql_date", Type.DATE).nullable(true).build() + )); + final RecordSchema schema = new SimpleRecordSchema(Arrays.asList( - new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()), - new RecordField("name", RecordFieldType.STRING.getDataType()), - new RecordField("age", RecordFieldType.SHORT.getDataType()), - new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()), - new RecordField("score", RecordFieldType.LONG.getDataType()), - new RecordField("airport_code", RecordFieldType.STRING.getDataType()))); + new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()), + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("age", RecordFieldType.SHORT.getDataType()), + new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()), + new RecordField("score", RecordFieldType.LONG.getDataType()), + new RecordField("airport_code", RecordFieldType.STRING.getDataType()), + new RecordField("sql_date", RecordFieldType.DATE.getDataType()) + )); Map values = new HashMap<>(); PartialRow row = kuduSchema.newPartialRow(); @@ -494,13 +512,14 @@ public class TestPutKudu { values.put("updated_at", new Timestamp(System.currentTimeMillis())); values.put("score", 10000L); values.put("airport_code", airport_code); + values.put("sql_date", sql_date); processor.buildPartialRow( - kuduSchema, - row, - new MapRecord(schema, values), - schema.getFieldNames(), + kuduSchema, + row, + new MapRecord(schema, values), + schema.getFieldNames(), true, - lowercaseFields + lowercaseFields ); return row; } @@ -555,17 +574,17 @@ public class TestPutKudu { private void testKuduPartialFailure(FlushMode flushMode, int batchSize) throws Exception { final int numFlowFiles = 4; final int numRecordsPerFlowFile = 3; - final ResultCode[][] flowFileResults = new ResultCode[][] { - new ResultCode[]{OK, OK, FAIL}, + final ResultCode[][] flowFileResults = new ResultCode[][]{ + new ResultCode[]{OK, OK, FAIL}, - // The last operation will not be submitted to Kudu if flush mode is AUTO_FLUSH_SYNC - new ResultCode[]{OK, FAIL, OK}, + // The last operation will not be submitted to Kudu if flush mode is AUTO_FLUSH_SYNC + new ResultCode[]{OK, FAIL, OK}, - // Everything's okay - new ResultCode[]{OK, OK, OK}, + // Everything's okay + new ResultCode[]{OK, OK, OK}, - // The last operation will not be submitted due to an exception from apply() call - new ResultCode[]{OK, EXCEPTION, OK}, + // The last operation will not be submitted due to an exception from apply() call + new ResultCode[]{OK, EXCEPTION, OK}, }; KuduSession session = mock(KuduSession.class); @@ -596,10 +615,10 @@ public class TestPutKudu { RowErrorsAndOverflowStatus pendingErrorResponse = mock(RowErrorsAndOverflowStatus.class); RowError[] rowErrors = slice.stream() - .flatMap(List::stream) - .filter(OperationResponse::hasRowError) - .map(OperationResponse::getRowError) - .toArray(RowError[]::new); + .flatMap(List::stream) + .filter(OperationResponse::hasRowError) + .map(OperationResponse::getRowError) + .toArray(RowError[]::new); when(pendingErrorResponse.getRowErrors()).thenReturn(rowErrors); pendingErrorResponses.add(pendingErrorResponse); @@ -665,10 +684,10 @@ public class TestPutKudu { } private void testKuduPartialFailure(FlushMode flushMode) throws Exception { - // Test against different batch sizes (up until the point where every record can be buffered at once) - for (int i = 1; i <= 11; i++) { - testKuduPartialFailure(flushMode, i); - } + // Test against different batch sizes (up until the point where every record can be buffered at once) + for (int i = 1; i <= 11; i++) { + testKuduPartialFailure(flushMode, i); + } } @Test