NIFI-9476 - Fix QueryRecord when no result and with array type column

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5594
This commit is contained in:
Pierre Villard 2021-12-10 20:53:44 +01:00 committed by Matthew Burgess
parent e7449bf0d3
commit 90930ca197
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 35 additions and 1 deletions

View File

@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -301,6 +302,12 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return RecordFieldType.ARRAY.getArrayDataType(baseType);
} catch (SQLFeatureNotSupportedException sfnse) {
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
} catch (SQLException sqle) {
if (sqle.getCause() instanceof NoSuchElementException) {
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
} else {
throw sqle;
}
}
}

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
@ -1132,6 +1134,31 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
}
@Test
public void testReturnsNoResultWithArrayColumn() throws InitializationException {
TestRunner runner = getRunner();
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.enableControllerService(jsonWriter);
runner.setProperty(REL_NAME, "SELECT * from FLOWFILE WHERE status = 'failure'");
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(QueryRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "true");
runner.enqueue("{\"status\": \"starting\",\"myArray\": [{\"foo\": \"foo\"}]}");
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
flowFileOut.assertContentEquals("[]");
}
private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
private final List<String> columnNames;

View File

@ -364,7 +364,7 @@
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.27.0</version>
<version>1.28.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>