NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size is set

This closes #3355.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matthew Burgess 2019-03-06 10:48:25 -05:00 committed by Bryan Bende
parent 4f2b0156fb
commit c32ea618c5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 173 additions and 23 deletions

View File

@ -312,6 +312,11 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
// If we've reached the batch size, send out the flow files // If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS); session.transfer(resultSetFlowFiles, REL_SUCCESS);
// Need to remove the original input file if it exists
if (fileToProcess != null) {
session.remove(fileToProcess);
fileToProcess = null;
}
session.commit(); session.commit();
resultSetFlowFiles.clear(); resultSetFlowFiles.clear();
} }

View File

@ -282,6 +282,51 @@ public class TestExecuteSQL {
lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
} }
@Test
public void testWithOutputBatchingAndIncomingFlowFile() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
for (int i = 0; i < 1000; i++) {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
}
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1");
runner.enqueue("SELECT * FROM TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
}
@Test @Test
public void testMaxRowsPerFlowFile() throws SQLException { public void testMaxRowsPerFlowFile() throws SQLException {
// remove previous test database, if any // remove previous test database, if any

View File

@ -169,6 +169,106 @@ public class TestExecuteSQLRecord {
assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType()); assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType());
} }
@Test
public void testWithOutputBatching() throws InitializationException, SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
for (int i = 0; i < 1000; i++) {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
}
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.enableControllerService(recordWriter);
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "5");
runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200);
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
}
@Test
public void testWithOutputBatchingAndIncomingFlowFile() throws InitializationException, SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
for (int i = 0; i < 1000; i++) {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
}
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.enableControllerService(recordWriter);
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1");
runner.enqueue("SELECT * FROM TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200);
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
}
@Test @Test
public void testMaxRowsPerFlowFile() throws Exception { public void testMaxRowsPerFlowFile() throws Exception {
// remove previous test database, if any // remove previous test database, if any
@ -333,7 +433,7 @@ public class TestExecuteSQLRecord {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(true); runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter); runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@ -341,9 +441,9 @@ public class TestExecuteSQLRecord {
runner.enqueue("Hello".getBytes()); runner.enqueue("Hello".getBytes());
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0"); firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "0");
firstFlowFile.assertContentEquals(""); firstFlowFile.assertContentEquals("");
} }
@ -488,8 +588,8 @@ public class TestExecuteSQLRecord {
stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
runner.setIncomingConnection(true); runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter); runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@ -497,9 +597,9 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes()); runner.enqueue("test".getBytes());
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "1");
} }
@Test @Test
@ -521,9 +621,9 @@ public class TestExecuteSQLRecord {
stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
runner.setIncomingConnection(true); runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)"); runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter); runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@ -531,9 +631,9 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes()); runner.enqueue("test".getBytes());
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "1");
} }
@Test @Test
@ -555,8 +655,8 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(true); runner.setIncomingConnection(true);
// Simulate failure by not provide parameter // Simulate failure by not provide parameter
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter); runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@ -564,7 +664,7 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes()); runner.enqueue("test".getBytes());
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
} }
@Test @Test
@ -585,10 +685,10 @@ public class TestExecuteSQLRecord {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(true); runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
// Simulate failure by not provide parameter // Simulate failure by not provide parameter
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter); runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@ -596,8 +696,8 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes()); runner.enqueue("test".getBytes());
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0); MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_FAILURE).get(0);
firstFlowFile.assertContentEquals("test"); firstFlowFile.assertContentEquals("test");
} }