NIFI-2583

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2582

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2582

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #877
This commit is contained in:
Peter Wicks 2016-08-17 06:59:23 -06:00 committed by Matt Burgess
parent 1470a52b1b
commit a0d1aae603
2 changed files with 44 additions and 7 deletions

View File

@ -53,7 +53,15 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -75,7 +83,11 @@ import java.util.concurrent.atomic.AtomicLong;
@WritesAttribute(attribute = "querydbtable.row.count"),
@WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
@WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of "
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."),
@WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
@ -152,7 +164,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession();
final Set<FlowFile> resultSetFlowFiles = new HashSet<>();
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
@ -261,6 +273,14 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
fragmentIndex++;
}
//set count on all FlowFiles
if(maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
}
}
} catch (final SQLException e) {
throw e;
}
@ -322,7 +342,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();
if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { continue; }
if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) {
continue;
}
defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
}

View File

@ -30,6 +30,7 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
@ -387,6 +388,7 @@ public class QueryDatabaseTableTest {
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
@ -412,13 +414,22 @@ public class QueryDatabaseTableTest {
//ensure all but the last file have 9 records each
for(int ff=0;ff<11;ff++) {
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray());
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
}
//last file should have 1 record
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11).toByteArray());
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
@ -434,7 +445,11 @@ public class QueryDatabaseTableTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
in = new ByteArrayInputStream(mff.toByteArray());
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(0), mff.getAttribute("fragment.index"));
assertEquals("1", mff.getAttribute("fragment.count"));
assertEquals(5, getNumberOfRecordsFromStream(in));
runner.clearTransferState();