NIFI-4561 ExecuteSQL returns no FlowFile for some queries

This closes #2243

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
patricker 2017-11-01 10:25:26 +08:00 committed by Mike Thomsen
parent 5ca6261de0
commit 0390c0f196
2 changed files with 90 additions and 41 deletions

View File

@ -21,6 +21,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -91,12 +92,15 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
})
@WritesAttributes({
@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
@WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds")
@WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds"),
@WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ "the zero based index of this result set.")
})
public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -220,53 +224,60 @@ public class ExecuteSQL extends AbstractProcessor {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery});
boolean results = st.execute();
boolean hasResults = st.execute();
boolean hasUpdateCount = st.getUpdateCount() != -1;
while(hasResults || hasUpdateCount) {
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
if (hasResults) {
FlowFile resultSetFF;
if (fileToProcess == null) {
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
}
while(results){
FlowFile resultSetFF;
if(fileToProcess == null){
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
final AtomicLong nrOfRows = new AtomicLong(0L);
resultSetFF = session.write(resultSetFF, out -> {
try {
final ResultSet resultSet = st.getResultSet();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
});
long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
// set attribute how many rows were selected
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{resultSetFF, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
session.transfer(resultSetFF, REL_SUCCESS);
resultCount++;
}
final AtomicLong nrOfRows = new AtomicLong(0L);
resultSetFF = session.write(resultSetFF, out -> {
try {
final ResultSet resultSet = st.getResultSet();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
});
long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
// set attribute how many rows were selected
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{resultSetFF, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
session.transfer(resultSetFF, REL_SUCCESS);
resultCount++;
// are there anymore result sets?
try{
results = st.getMoreResults();
hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
hasUpdateCount = st.getUpdateCount() != -1;
} catch(SQLException ex){
results = false;
hasResults = false;
hasUpdateCount = false;
}
}
@ -278,8 +289,20 @@ public class ExecuteSQL extends AbstractProcessor {
} else {
fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
session.transfer(fileToProcess, REL_SUCCESS);
}
} else if(resultCount == 0){
//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
// Then generate an empty Output FlowFile
FlowFile resultSetFF = session.create();
resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out));
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
session.transfer(resultSetFF, REL_SUCCESS);
}
} catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original file, but if we had no results then

View File

@ -189,6 +189,32 @@ public class TestExecuteSQL {
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2");
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
}
@Test
public void testInsertStatementCreatesFlowFile() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
}
@Test