NIFI-4409: Ensure that when record schema is inherited, the schema from teh ResultSet is used instead of the schema from the RecordReader because the schema from the RecordReader mmay not match the actual data

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2169
This commit is contained in:
Mark Payne 2017-09-22 11:08:22 -04:00 committed by Matthew Burgess
parent 50d018566d
commit 05b5dd1488
1 changed files with 19 additions and 20 deletions

View File

@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
@ -72,7 +71,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@ -95,7 +94,10 @@ import org.apache.nifi.util.StopWatch;
+ "is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. "
+ "The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. "
+ "If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated "
+ "relationship. See the Processor Usage documentation for more information.")
+ "relationship. If the Record Writer chooses to inherit the schema from the Record, it is important to note that the schema that is inherited will be from the "
+ "ResultSet, rather than the input Record. This allows a single instance of the QueryRecord processor to have multiple queries, each of which returns a different "
+ "set of columns and aggregations. As a result, though, the schema that is derived will have no schema name, so it is important that the configured Record Writer not attempt "
+ "to write the Schema Name as an attribute if inheriting the Schema from the Record. See the Processor Usage documentation for more information.")
@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.")
@DynamicProperty(name = "The name of the relationship to route data to", value="A SQL SELECT statement that is used to determine what data should be routed to this "
+ "relationship.", supportsExpressionLanguage=true, description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data "
@ -249,19 +251,7 @@ public class QueryRecord extends AbstractProcessor {
final Set<FlowFile> createdFlowFiles = new HashSet<>();
// Determine the schema for writing the data
final RecordSchema recordSchema;
try (final InputStream rawIn = session.read(original)) {
final Map<String, String> originalAttributes = original.getAttributes();
final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger());
final RecordSchema inputSchema = reader.getSchema();
recordSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema);
} catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
return;
}
final Map<String, String> originalAttributes = original.getAttributes();
int recordsRead = 0;
try {
@ -289,15 +279,24 @@ public class QueryRecord extends AbstractProcessor {
}
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
final FlowFile outFlowFile = transformed;
try {
final ResultSet rs = queryResult.getResultSet();
transformed = session.write(transformed, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out)) {
final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs);
writeResultRef.set(resultSetWriter.write(resultSet));
final ResultSetRecordSet recordSet;
final RecordSchema writeSchema;
try {
recordSet = new ResultSetRecordSet(rs);
final RecordSchema resultSetSchema = recordSet.getSchema();
writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
} catch (final SQLException | SchemaNotFoundException e) {
throw new ProcessException(e);
}
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out)) {
writeResultRef.set(resultSetWriter.write(recordSet));
mimeTypeRef.set(resultSetWriter.getMimeType());
} catch (final Exception e) {
throw new IOException(e);