From 08189596d27af7bb4518646245549b480d4bb05a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 9 Nov 2018 11:40:59 -0500 Subject: [PATCH] NIFI-5809: If QueryRecord has a single-column projection and that results in a null value, do not confuse that with a null value being returned from the Record Reader This closes #3163. Signed-off-by: Koji Kawamura --- .../nifi/queryrecord/FlowFileEnumerator.java | 36 ++++++------ .../processors/standard/TestQueryRecord.java | 57 +++++++++++++++---- 2 files changed, 63 insertions(+), 30 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java index 963b85e98c..5f92311a57 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java @@ -17,8 +17,6 @@ package org.apache.nifi.queryrecord; -import java.io.InputStream; - import org.apache.calcite.linq4j.Enumerator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -28,6 +26,8 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; +import java.io.InputStream; + public class FlowFileEnumerator implements Enumerator { private final ProcessSession session; private final FlowFile flowFile; @@ -57,26 +57,24 @@ public class FlowFileEnumerator implements Enumerator { @Override public boolean moveNext() { currentRow = null; - while (currentRow == null) { - try { - currentRow = filterColumns(recordParser.nextRecord()); - break; - } catch (final Exception e) { - throw new ProcessException("Failed to read next record in stream for " + flowFile + " due to " + e.getMessage(), e); - } - } + try { + final Record record = recordParser.nextRecord(); + if (record == null) { + // If we are out of data, close the InputStream. We do this because + // Calcite does not necessarily call our close() method. + close(); + try { + onFinish(); + } catch (final Exception e) { + logger.error("Failed to perform tasks when enumerator was finished", e); + } - if (currentRow == null) { - // If we are out of data, close the InputStream. We do this because - // Calcite does not necessarily call our close() method. - close(); - try { - onFinish(); - } catch (final Exception e) { - logger.error("Failed to perform tasks when enumerator was finished", e); + return false; } - return false; + currentRow = filterColumns(record); + } catch (final Exception e) { + throw new ProcessException("Failed to read next record in stream for " + flowFile + " due to " + e.getMessage(), e); } recordsRead++; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 60fefef5a7..ce710f5f53 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.OutputStream; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.InitializationException; @@ -46,6 +37,15 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + public class TestQueryRecord { static { @@ -289,12 +289,47 @@ public class TestQueryRecord { runner.run(); runner.assertTransferCount(REL_NAME, 1); - final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0); + final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0); flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n"); } @Test - public void testColumnNames() throws InitializationException, IOException { + public void testNullValueInSingleField() throws InitializationException, IOException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("points", RecordFieldType.INT); + parser.addRecord("Tom", 1); + parser.addRecord("Jerry", null); + parser.addRecord("Tom", null); + parser.addRecord("Jerry", 3); + + final MockRecordWriter writer = new MockRecordWriter(null, false); + + TestRunner runner = getRunner(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select points from FLOWFILE"); + runner.setProperty("count", "select count(*) as c from flowfile where points is null"); + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + runner.assertTransferCount("count", 1); + final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0); + flowFileOut.assertContentEquals("1\n\n\n3\n"); + + final MockFlowFile countFlowFile = runner.getFlowFilesForRelationship("count").get(0); + countFlowFile.assertContentEquals("2\n"); + } + + @Test + public void testColumnNames() throws InitializationException { final MockRecordParser parser = new MockRecordParser(); parser.addSchemaField("name", RecordFieldType.STRING); parser.addSchemaField("points", RecordFieldType.INT);