NIFI-5809: If QueryRecord has a single-column projection and that results in a null value, do not confuse that with a null value being returned from the Record Reader

This closes #3163.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mark Payne 2018-11-09 11:40:59 -05:00 committed by Koji Kawamura
parent 878a0b8b74
commit 08189596d2
2 changed files with 63 additions and 30 deletions

View File

@ -17,8 +17,6 @@
package org.apache.nifi.queryrecord;
import java.io.InputStream;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -28,6 +26,8 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
private final ProcessSession session;
private final FlowFile flowFile;
@ -57,26 +57,24 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
@Override
public boolean moveNext() {
currentRow = null;
while (currentRow == null) {
try {
currentRow = filterColumns(recordParser.nextRecord());
break;
} catch (final Exception e) {
throw new ProcessException("Failed to read next record in stream for " + flowFile + " due to " + e.getMessage(), e);
}
}
try {
final Record record = recordParser.nextRecord();
if (record == null) {
// If we are out of data, close the InputStream. We do this because
// Calcite does not necessarily call our close() method.
close();
try {
onFinish();
} catch (final Exception e) {
logger.error("Failed to perform tasks when enumerator was finished", e);
}
if (currentRow == null) {
// If we are out of data, close the InputStream. We do this because
// Calcite does not necessarily call our close() method.
close();
try {
onFinish();
} catch (final Exception e) {
logger.error("Failed to perform tasks when enumerator was finished", e);
return false;
}
return false;
currentRow = filterColumns(record);
} catch (final Exception e) {
throw new ProcessException("Failed to read next record in stream for " + flowFile + " due to " + e.getMessage(), e);
}
recordsRead++;

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
@ -46,6 +37,15 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TestQueryRecord {
static {
@ -289,12 +289,47 @@ public class TestQueryRecord {
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
}
@Test
public void testColumnNames() throws InitializationException, IOException {
public void testNullValueInSingleField() throws InitializationException, IOException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("points", RecordFieldType.INT);
parser.addRecord("Tom", 1);
parser.addRecord("Jerry", null);
parser.addRecord("Tom", null);
parser.addRecord("Jerry", 3);
final MockRecordWriter writer = new MockRecordWriter(null, false);
TestRunner runner = getRunner();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(REL_NAME, "select points from FLOWFILE");
runner.setProperty("count", "select count(*) as c from flowfile where points is null");
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.enqueue("");
runner.run();
runner.assertTransferCount(REL_NAME, 1);
runner.assertTransferCount("count", 1);
final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
flowFileOut.assertContentEquals("1\n\n\n3\n");
final MockFlowFile countFlowFile = runner.getFlowFilesForRelationship("count").get(0);
countFlowFile.assertContentEquals("2\n");
}
@Test
public void testColumnNames() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("points", RecordFieldType.INT);