NIFI-5802: Add QueryRecord nullable field support

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3158.
This commit is contained in:
Koji Kawamura 2018-11-08 12:10:36 +09:00 committed by Pierre Villard
parent a9045d54a1
commit 78a1cb7c5e
2 changed files with 41 additions and 5 deletions

View File

@ -189,7 +189,8 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
for (final RecordField field : schema.getFields()) {
names.add(field.getFieldName());
types.add(getRelDataType(field.getDataType(), javaTypeFactory));
final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
}
logger.debug("Found Schema: {}", new Object[] {schema});

View File

@ -130,6 +130,41 @@ public class TestQueryRecord {
out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
}
@Test
public void testNullable() throws InitializationException, IOException, SQLException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING, true);
parser.addSchemaField("age", RecordFieldType.INT, true);
parser.addRecord("Tom", 49);
parser.addRecord("Alice", null);
parser.addRecord(null, 36);
final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
TestRunner runner = getRunner();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(REL_NAME, "select name, age from FLOWFILE");
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
final int numIterations = 1;
for (int i = 0; i < numIterations; i++) {
runner.enqueue(new byte[0]);
}
runner.setThreadCount(4);
runner.run(2 * numIterations);
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
System.out.println(new String(out.toByteArray()));
out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n\"Alice\",\n,\"36\"\n");
}
@Test
public void testParseFailure() throws InitializationException, IOException, SQLException {
final MockRecordParser parser = new MockRecordParser();
@ -172,10 +207,10 @@ public class TestQueryRecord {
parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
parser.addRecord("008", 10.05F, 15.45F, 89.99F);
parser.addRecord("100", 20.25F, 25.25F, 45.25F);
parser.addRecord("105", 20.05F, 25.05F, 45.05F);
parser.addRecord("200", 34.05F, 25.05F, 75.05F);
parser.addRecord(8, 10.05F, 15.45F, 89.99F);
parser.addRecord(100, 20.25F, 25.25F, 45.25F);
parser.addRecord(105, 20.05F, 25.05F, 45.05F);
parser.addRecord(200, 34.05F, 25.05F, 75.05F);
final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");