mirror of https://github.com/apache/nifi.git
NIFI-5601: Add fragment.* attributes to GenerateTableFetch
Signed-off-by: Peter Wicks <patricker@gmail.com> This closes #3074
This commit is contained in:
parent
fdd8cdbb31
commit
d8d220ccb8
|
@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.dbcp.DBCPService;
|
import org.apache.nifi.dbcp.DBCPService;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
|
||||||
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
|
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
@ -89,6 +90,9 @@ import static java.sql.Types.VARCHAR;
|
||||||
public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor {
|
public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor {
|
||||||
|
|
||||||
public static final String INITIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
|
public static final String INITIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
|
||||||
|
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
|
||||||
|
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
|
||||||
|
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
|
||||||
|
|
||||||
// Relationships
|
// Relationships
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.nifi.dbcp.DBCPService;
|
||||||
import org.apache.nifi.expression.AttributeExpression;
|
import org.apache.nifi.expression.AttributeExpression;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
|
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
@ -66,9 +65,6 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
|
||||||
public static final String RESULT_TABLENAME = "tablename";
|
public static final String RESULT_TABLENAME = "tablename";
|
||||||
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
|
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
|
||||||
|
|
||||||
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
|
|
||||||
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
|
||||||
.name("Fetch Size")
|
.name("Fetch Size")
|
||||||
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
|
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
|
||||||
|
@ -338,7 +334,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
|
||||||
//set count on all FlowFiles
|
//set count on all FlowFiles
|
||||||
if (maxRowsPerFlowFile > 0) {
|
if (maxRowsPerFlowFile > 0) {
|
||||||
resultSetFlowFiles.set(i,
|
resultSetFlowFiles.set(i,
|
||||||
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
|
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,6 +65,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
@ -97,7 +98,16 @@ import java.util.stream.IntStream;
|
||||||
@WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
|
@WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
|
||||||
+ "that has been returned since the processor started running."),
|
+ "that has been returned since the processor started running."),
|
||||||
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
|
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
|
||||||
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
|
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition."),
|
||||||
|
@WritesAttribute(attribute="fragment.identifier", description="All FlowFiles generated from the same query result set "
|
||||||
|
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
|
||||||
|
@WritesAttribute(attribute = "fragment.count", description = "This is the total number of "
|
||||||
|
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
|
||||||
|
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."),
|
||||||
|
@WritesAttribute(attribute="fragment.index", description="This is the position of this FlowFile in the list of "
|
||||||
|
+ "outgoing FlowFiles that were all generated from the same execution. This can be "
|
||||||
|
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same execution and in what order "
|
||||||
|
+ "FlowFiles were produced"),
|
||||||
})
|
})
|
||||||
@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
|
@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
|
||||||
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies an initial "
|
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies an initial "
|
||||||
|
@ -426,6 +436,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
|
||||||
|
|
||||||
// Generate SQL statements to read "pages" of data
|
// Generate SQL statements to read "pages" of data
|
||||||
Long limit = partitionSize == 0 ? null : (long) partitionSize;
|
Long limit = partitionSize == 0 ? null : (long) partitionSize;
|
||||||
|
final String fragmentIdentifier = UUID.randomUUID().toString();
|
||||||
for (long i = 0; i < numberOfFetches; i++) {
|
for (long i = 0; i < numberOfFetches; i++) {
|
||||||
// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
|
// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
|
||||||
if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
|
if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
|
||||||
|
@ -442,20 +453,28 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
|
||||||
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
|
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
|
||||||
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
|
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
|
||||||
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
|
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
|
||||||
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
|
Map<String,String> attributesToAdd = new HashMap<>();
|
||||||
|
|
||||||
|
attributesToAdd.put("generatetablefetch.tableName", tableName);
|
||||||
if (columnNames != null) {
|
if (columnNames != null) {
|
||||||
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
|
attributesToAdd.put("generatetablefetch.columnNames", columnNames);
|
||||||
}
|
}
|
||||||
if (StringUtils.isNotBlank(whereClause)) {
|
if (StringUtils.isNotBlank(whereClause)) {
|
||||||
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
|
attributesToAdd.put("generatetablefetch.whereClause", whereClause);
|
||||||
}
|
}
|
||||||
if (StringUtils.isNotBlank(maxColumnNames)) {
|
if (StringUtils.isNotBlank(maxColumnNames)) {
|
||||||
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
|
attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames);
|
||||||
}
|
}
|
||||||
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
|
attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit));
|
||||||
if (partitionSize != 0) {
|
if (partitionSize != 0) {
|
||||||
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
|
attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset));
|
||||||
}
|
}
|
||||||
|
// Add fragment attributes
|
||||||
|
attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
|
||||||
|
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
|
||||||
|
attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches));
|
||||||
|
|
||||||
|
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd);
|
||||||
session.transfer(sqlFlowFile, REL_SUCCESS);
|
session.transfer(sqlFlowFile, REL_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,9 +44,13 @@ import java.sql.SQLNonTransientConnectionException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.sql.Types;
|
import java.sql.Types;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE;
|
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE;
|
||||||
|
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_COUNT;
|
||||||
|
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_ID;
|
||||||
|
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_INDEX;
|
||||||
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS;
|
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
@ -114,7 +118,7 @@ public class TestGenerateTableFetch {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException {
|
public void testAddedRows() throws SQLException, IOException {
|
||||||
|
|
||||||
// load test data to database
|
// load test data to database
|
||||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||||
|
@ -140,6 +144,8 @@ public class TestGenerateTableFetch {
|
||||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
|
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
|
||||||
String query = new String(flowFile.toByteArray());
|
String query = new String(flowFile.toByteArray());
|
||||||
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
|
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
|
||||||
|
flowFile.assertAttributeEquals(FRAGMENT_INDEX, "0");
|
||||||
|
flowFile.assertAttributeEquals(FRAGMENT_COUNT, "1");
|
||||||
ResultSet resultSet = stmt.executeQuery(query);
|
ResultSet resultSet = stmt.executeQuery(query);
|
||||||
// Should be three records
|
// Should be three records
|
||||||
assertTrue(resultSet.next());
|
assertTrue(resultSet.next());
|
||||||
|
@ -160,6 +166,15 @@ public class TestGenerateTableFetch {
|
||||||
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
|
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
|
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
|
||||||
|
// Check fragment attributes
|
||||||
|
List<MockFlowFile> resultFFs = runner.getFlowFilesForRelationship(REL_SUCCESS);
|
||||||
|
MockFlowFile ff1 = resultFFs.get(0);
|
||||||
|
MockFlowFile ff2 = resultFFs.get(1);
|
||||||
|
assertEquals(ff1.getAttribute(FRAGMENT_ID), ff2.getAttribute(FRAGMENT_ID));
|
||||||
|
assertEquals(ff1.getAttribute(FRAGMENT_INDEX), "0");
|
||||||
|
assertEquals(ff1.getAttribute(FRAGMENT_COUNT), "2");
|
||||||
|
assertEquals(ff2.getAttribute(FRAGMENT_INDEX), "1");
|
||||||
|
assertEquals(ff2.getAttribute(FRAGMENT_COUNT), "2");
|
||||||
|
|
||||||
// Verify first flow file's contents
|
// Verify first flow file's contents
|
||||||
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
|
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
|
||||||
|
|
Loading…
Reference in New Issue