NIFI-4749: This closes #2386. Pass the record reader's schema along to the ResultSetRecordSet so that it is able to resolve the schema for Record fields

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2018-01-08 16:18:34 -05:00 committed by joewitt
parent a7f1eb89c2
commit 953e922d32
2 changed files with 41 additions and 9 deletions

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -45,10 +46,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private final Set<String> rsColumnNames; private final Set<String> rsColumnNames;
private boolean moreRows; private boolean moreRows;
public ResultSetRecordSet(final ResultSet rs) throws SQLException { public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
this.rs = rs; this.rs = rs;
moreRows = rs.next(); moreRows = rs.next();
this.schema = createSchema(rs); this.schema = createSchema(rs, readerSchema);
rsColumnNames = new HashSet<>(); rsColumnNames = new HashSet<>();
final ResultSetMetaData metadata = rs.getMetaData(); final ResultSetMetaData metadata = rs.getMetaData();
@ -118,7 +119,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return value; return value;
} }
private static RecordSchema createSchema(final ResultSet rs) throws SQLException { private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData(); final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount(); final int numCols = metadata.getColumnCount();
final List<RecordField> fields = new ArrayList<>(numCols); final List<RecordField> fields = new ArrayList<>(numCols);
@ -127,7 +128,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final int column = i + 1; final int column = i + 1;
final int sqlType = metadata.getColumnType(column); final int sqlType = metadata.getColumnType(column);
final DataType dataType = getDataType(sqlType, rs, column); final DataType dataType = getDataType(sqlType, rs, column, readerSchema);
final String fieldName = metadata.getColumnLabel(column); final String fieldName = metadata.getColumnLabel(column);
final int nullableFlag = metadata.isNullable(column); final int nullableFlag = metadata.isNullable(column);
@ -145,7 +146,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return new SimpleRecordSchema(fields); return new SimpleRecordSchema(fields);
} }
private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException { private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex, final RecordSchema readerSchema) throws SQLException {
switch (sqlType) { switch (sqlType) {
case Types.ARRAY: case Types.ARRAY:
// The JDBC API does not allow us to know what the base type of an array is through the metadata. // The JDBC API does not allow us to know what the base type of an array is through the metadata.
@ -168,12 +169,18 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
case Types.LONGVARBINARY: case Types.LONGVARBINARY:
case Types.VARBINARY: case Types.VARBINARY:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
case Types.OTHER: case Types.OTHER: {
// If we have no records to inspect, we can't really know its schema so we simply use the default data type. // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
if (rs.isAfterLast()) { if (rs.isAfterLast()) {
return RecordFieldType.RECORD.getDataType(); return RecordFieldType.RECORD.getDataType();
} }
final String columnName = rs.getMetaData().getColumnName(columnIndex);
Optional<DataType> dataType = readerSchema.getDataType(columnName);
if (dataType.isPresent()) {
return dataType.get();
}
final Object obj = rs.getObject(columnIndex); final Object obj = rs.getObject(columnIndex);
if (obj == null || !(obj instanceof Record)) { if (obj == null || !(obj instanceof Record)) {
final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE, final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
@ -188,8 +195,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final Record record = (Record) obj; final Record record = (Record) obj;
final RecordSchema recordSchema = record.getSchema(); final RecordSchema recordSchema = record.getSchema();
return RecordFieldType.RECORD.getRecordDataType(recordSchema); return RecordFieldType.RECORD.getRecordDataType(recordSchema);
default: }
default: {
final String columnName = rs.getMetaData().getColumnName(columnIndex);
Optional<DataType> dataType = readerSchema.getDataType(columnName);
if (dataType.isPresent()) {
return dataType.get();
}
return getFieldType(sqlType).getDataType(); return getFieldType(sqlType).getDataType();
}
} }
} }
@ -286,7 +301,8 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return RecordFieldType.TIMESTAMP.getDataType(); return RecordFieldType.TIMESTAMP.getDataType();
} }
if (valueToLookAt instanceof Record) { if (valueToLookAt instanceof Record) {
return RecordFieldType.RECORD.getDataType(); final Record record = (Record) valueToLookAt;
return RecordFieldType.RECORD.getRecordDataType(record.getSchema());
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
@ -74,6 +75,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable; import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
@ -256,6 +258,20 @@ public class QueryRecord extends AbstractProcessor {
final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>(); final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
final Set<FlowFile> createdFlowFiles = new HashSet<>(); final Set<FlowFile> createdFlowFiles = new HashSet<>();
// Determine the Record Reader's schema
final RecordSchema readerSchema;
try (final InputStream rawIn = session.read(original)) {
final Map<String, String> originalAttributes = original.getAttributes();
final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger());
final RecordSchema inputSchema = reader.getSchema();
readerSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema);
} catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
return;
}
// Determine the schema for writing the data // Determine the schema for writing the data
final Map<String, String> originalAttributes = original.getAttributes(); final Map<String, String> originalAttributes = original.getAttributes();
int recordsRead = 0; int recordsRead = 0;
@ -294,7 +310,7 @@ public class QueryRecord extends AbstractProcessor {
final RecordSchema writeSchema; final RecordSchema writeSchema;
try { try {
recordSet = new ResultSetRecordSet(rs); recordSet = new ResultSetRecordSet(rs, readerSchema);
final RecordSchema resultSetSchema = recordSet.getSchema(); final RecordSchema resultSetSchema = recordSet.getSchema();
writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema); writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
} catch (final SQLException | SchemaNotFoundException e) { } catch (final SQLException | SchemaNotFoundException e) {