diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index 571bf77bf7..a0f44e5808 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,10 +46,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable { private final Set rsColumnNames; private boolean moreRows; - public ResultSetRecordSet(final ResultSet rs) throws SQLException { + public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { this.rs = rs; moreRows = rs.next(); - this.schema = createSchema(rs); + this.schema = createSchema(rs, readerSchema); rsColumnNames = new HashSet<>(); final ResultSetMetaData metadata = rs.getMetaData(); @@ -118,7 +119,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return value; } - private static RecordSchema createSchema(final ResultSet rs) throws SQLException { + private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { final ResultSetMetaData metadata = rs.getMetaData(); final int numCols = metadata.getColumnCount(); final List fields = new ArrayList<>(numCols); @@ -127,7 +128,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final int column = i + 1; final int sqlType = metadata.getColumnType(column); - final DataType dataType = getDataType(sqlType, rs, column); + final DataType dataType = getDataType(sqlType, rs, column, readerSchema); final String fieldName = metadata.getColumnLabel(column); final int nullableFlag = metadata.isNullable(column); @@ -145,7 +146,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return new SimpleRecordSchema(fields); } - private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException { + private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex, final RecordSchema readerSchema) throws SQLException { switch (sqlType) { case Types.ARRAY: // The JDBC API does not allow us to know what the base type of an array is through the metadata. @@ -168,12 +169,18 @@ public class ResultSetRecordSet implements RecordSet, Closeable { case Types.LONGVARBINARY: case Types.VARBINARY: return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); - case Types.OTHER: + case Types.OTHER: { // If we have no records to inspect, we can't really know its schema so we simply use the default data type. if (rs.isAfterLast()) { return RecordFieldType.RECORD.getDataType(); } + final String columnName = rs.getMetaData().getColumnName(columnIndex); + Optional dataType = readerSchema.getDataType(columnName); + if (dataType.isPresent()) { + return dataType.get(); + } + final Object obj = rs.getObject(columnIndex); if (obj == null || !(obj instanceof Record)) { final List dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE, @@ -188,8 +195,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final Record record = (Record) obj; final RecordSchema recordSchema = record.getSchema(); return RecordFieldType.RECORD.getRecordDataType(recordSchema); - default: + } + default: { + final String columnName = rs.getMetaData().getColumnName(columnIndex); + Optional dataType = readerSchema.getDataType(columnName); + if (dataType.isPresent()) { + return dataType.get(); + } + return getFieldType(sqlType).getDataType(); + } } } @@ -286,7 +301,8 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return RecordFieldType.TIMESTAMP.getDataType(); } if (valueToLookAt instanceof Record) { - return RecordFieldType.RECORD.getDataType(); + final Record record = (Record) valueToLookAt; + return RecordFieldType.RECORD.getRecordDataType(record.getSchema()); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 5798323bba..50e5dd7eab 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.sql.Connection; import java.sql.DriverManager; @@ -74,6 +75,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.queryrecord.FlowFileTable; import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; @@ -256,6 +258,20 @@ public class QueryRecord extends AbstractProcessor { final Map transformedFlowFiles = new HashMap<>(); final Set createdFlowFiles = new HashSet<>(); + // Determine the Record Reader's schema + final RecordSchema readerSchema; + try (final InputStream rawIn = session.read(original)) { + final Map originalAttributes = original.getAttributes(); + final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger()); + final RecordSchema inputSchema = reader.getSchema(); + + readerSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema); + } catch (final Exception e) { + getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e}); + session.transfer(original, REL_FAILURE); + return; + } + // Determine the schema for writing the data final Map originalAttributes = original.getAttributes(); int recordsRead = 0; @@ -294,7 +310,7 @@ public class QueryRecord extends AbstractProcessor { final RecordSchema writeSchema; try { - recordSet = new ResultSetRecordSet(rs); + recordSet = new ResultSetRecordSet(rs, readerSchema); final RecordSchema resultSetSchema = recordSet.getSchema(); writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema); } catch (final SQLException | SchemaNotFoundException e) {