From 78a1cb7c5eb84da5d25d50d0df69970b6d732f0a Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 8 Nov 2018 12:10:36 +0900 Subject: [PATCH] NIFI-5802: Add QueryRecord nullable field support Signed-off-by: Pierre Villard This closes #3158. --- .../nifi/queryrecord/FlowFileTable.java | 3 +- .../processors/standard/TestQueryRecord.java | 43 +++++++++++++++++-- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java index c40e36482b..9e10377037 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java @@ -189,7 +189,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory; for (final RecordField field : schema.getFields()) { names.add(field.getFieldName()); - types.add(getRelDataType(field.getDataType(), javaTypeFactory)); + final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory); + types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable())); } logger.debug("Found Schema: {}", new Object[] {schema}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index b266b4780e..60fefef5a7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -130,6 +130,41 @@ public class TestQueryRecord { out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n"); } + @Test + public void testNullable() throws InitializationException, IOException, SQLException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING, true); + parser.addSchemaField("age", RecordFieldType.INT, true); + parser.addRecord("Tom", 49); + parser.addRecord("Alice", null); + parser.addRecord(null, 36); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + TestRunner runner = getRunner(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, age from FLOWFILE"); + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + + final int numIterations = 1; + for (int i = 0; i < numIterations; i++) { + runner.enqueue(new byte[0]); + } + + runner.setThreadCount(4); + runner.run(2 * numIterations); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); + System.out.println(new String(out.toByteArray())); + out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n\"Alice\",\n,\"36\"\n"); + } + @Test public void testParseFailure() throws InitializationException, IOException, SQLException { final MockRecordParser parser = new MockRecordParser(); @@ -172,10 +207,10 @@ public class TestQueryRecord { parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT); parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT); - parser.addRecord("008", 10.05F, 15.45F, 89.99F); - parser.addRecord("100", 20.25F, 25.25F, 45.25F); - parser.addRecord("105", 20.05F, 25.05F, 45.05F); - parser.addRecord("200", 34.05F, 25.05F, 75.05F); + parser.addRecord(8, 10.05F, 15.45F, 89.99F); + parser.addRecord(100, 20.25F, 25.25F, 45.25F); + parser.addRecord(105, 20.05F, 25.05F, 45.05F); + parser.addRecord(200, 34.05F, 25.05F, 75.05F); final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");