NIFI-1224: made TestExecuteSQL about 25 seconds faster. Same logic just less to process

Reviewed and Amended (removed trailing whitespace) by Tony Kurc (tkurc@apache.org)
This commit is contained in:
joewitt 2015-11-26 00:23:12 -05:00 committed by Tony Kurc
parent a3cb803147
commit 67aed5eb92
1 changed files with 19 additions and 21 deletions

View File

@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
public class TestExecuteSQL { public class TestExecuteSQL {
private static Logger LOGGER; private static final Logger LOGGER;
static { static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
@ -136,17 +136,17 @@ public class TestExecuteSQL {
// load test data to database // load test data to database
final Connection con = ((DBCPService)runner.getControllerService("dbcp")).getConnection(); final Connection con = ((DBCPService)runner.getControllerService("dbcp")).getConnection();
TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000); TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
LOGGER.info("test data loaded"); LOGGER.info("test data loaded");
// ResultSet size will be 1x2000x1000 = 2 000 000 rows // ResultSet size will be 1x200x100 = 20 000 rows
// because of where PER.ID = ${person.id} // because of where PER.ID = ${person.id}
final int nrOfRows = 2000000; final int nrOfRows = 20000;
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
if (incomingFlowFile) { if (incomingFlowFile) {
// incoming FlowFile content is not used, but attributes are used // incoming FlowFile content is not used, but attributes are used
final Map<String, String> attributes = new HashMap<String, String>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("person.id", "10"); attributes.put("person.id", "10");
runner.enqueue("Hello".getBytes(), attributes); runner.enqueue("Hello".getBytes(), attributes);
} }
@ -154,25 +154,23 @@ public class TestExecuteSQL {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
// read all Avro records and verify created FlowFile contains 1000000
// records
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray()); final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(in, datumReader); try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
GenericRecord record = null; GenericRecord record = null;
long recordsFromStream = 0; long recordsFromStream = 0;
while (dataFileReader.hasNext()) { while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from // Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with // allocating and garbage collecting many objects for files with
// many items. // many items.
record = dataFileReader.next(record); record = dataFileReader.next(record);
recordsFromStream += 1; recordsFromStream += 1;
} }
LOGGER.info("total nr of records from stream: " + recordsFromStream); LOGGER.info("total nr of records from stream: " + recordsFromStream);
assertEquals(nrOfRows, recordsFromStream); assertEquals(nrOfRows, recordsFromStream);
dataFileReader.close(); }
} }
/** /**