NIFI-3029: QueryDatabaseTable supports max fragments property

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1213
This commit is contained in:
Byunghwa Yun 2016-11-13 13:45:43 +09:00 committed by Matt Burgess
parent e59cf86656
commit 0f462a7c49
2 changed files with 63 additions and 0 deletions

View File

@ -120,6 +120,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
.name("qdbt-max-frags")
.displayName("Maximum Number of Fragments")
.description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
"This prevents OutOfMemoryError when this processor ingests huge table.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
@ -134,6 +144,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(MAX_FRAGMENTS);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -179,6 +190,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).asInteger()
: 0;
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
@ -283,6 +297,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
fragmentIndex++;
if (maxFragments > 0 && fragmentIndex >= maxFragments) {
break;
}
}
for (int i = 0; i < resultSetFlowFiles.size(); i++) {

View File

@ -598,6 +598,52 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
}
@Test
public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
int rowCount=0;
//create larger row set
for (int batch = 0; batch < 100; batch++) {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");
Integer maxFragments = 3;
runner.setProperty(QueryDatabaseTable.MAX_FRAGMENTS, maxFragments.toString());
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, maxFragments);
for (int i = 0; i < maxFragments; i++) {
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(i);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(i), mff.getAttribute("fragment.index"));
assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count"));
}
runner.clearTransferState();
}
@Test
public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException {