From 1470a52b1bf3a1c2f9101306c70db0b08692301f Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Tue, 16 Aug 2016 19:10:16 -0600 Subject: [PATCH] NIFI-2582 NIFI-2583 Signed-off-by: Matt Burgess NIFI-2583 Tweaks Signed-off-by: Matt Burgess This closes #877 --- .../standard/QueryDatabaseTable.java | 166 ++++++++++++----- .../processors/standard/util/JdbcCommon.java | 9 +- .../standard/QueryDatabaseTableTest.java | 167 +++++++++++++++++- 3 files changed, 289 insertions(+), 53 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index ea24dd006c..4f852829fd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -17,11 +17,13 @@ package org.apache.nifi.processors.standard; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -30,6 +32,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; @@ -50,14 +53,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.text.ParseException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -75,10 +71,19 @@ import java.util.concurrent.atomic.AtomicLong; + "to fetch only those records that have max values greater than the retained values. This can be used for " + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") -@WritesAttribute(attribute = "querydbtable.row.count") +@WritesAttributes({ + @WritesAttribute(attribute = "querydbtable.row.count"), + @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced")}) +@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; + public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue."; public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() @@ -90,6 +95,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("qdbt-max-rows") + .displayName("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. " + + "This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + public QueryDatabaseTable() { final Set r = new HashSet<>(); r.add(REL_SUCCESS); @@ -103,6 +118,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); + pds.add(MAX_ROWS_PER_FLOW_FILE); propDescriptors = Collections.unmodifiableList(pds); } @@ -116,6 +132,18 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { return propDescriptors; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + @OnScheduled public void setup(final ProcessContext context) { super.setup(context); @@ -124,7 +152,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { ProcessSession session = sessionFactory.createSession(); - FlowFile fileToProcess = null; + final Set resultSetFlowFiles = new HashSet<>(); final ComponentLog logger = getLogger(); @@ -134,6 +162,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger(); + + final Map maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -150,11 +181,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { // set as the current state map (after the session has been committed) final Map statePropertyMap = new HashMap<>(stateMap.toMap()); + //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map + for(final Map.Entry maxProp : maxValueProperties.entrySet()){ + if(!statePropertyMap.containsKey(maxProp.getKey())){ + statePropertyMap.put(maxProp.getKey(), maxProp.getValue()); + } + } + List maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) ? null : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); - final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, stateMap); + final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, statePropertyMap); final StopWatch stopWatch = new StopWatch(true); + final String fragmentIdentifier = UUID.randomUUID().toString(); try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { @@ -168,52 +207,70 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } } + String jdbcURL = "DBCPService"; + try { + DatabaseMetaData databaseMetaData = con.getMetaData(); + if (databaseMetaData != null) { + jdbcURL = databaseMetaData.getURL(); + } + } catch (SQLException se) { + // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly + } + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds + try { + logger.debug("Executing query {}", new Object[]{selectQuery}); + final ResultSet resultSet = st.executeQuery(selectQuery); + int fragmentIndex=0; + while(true) { + final AtomicLong nrOfRows = new AtomicLong(0L); - final AtomicLong nrOfRows = new AtomicLong(0L); + FlowFile fileToProcess = session.create(); + fileToProcess = session.write(fileToProcess, out -> { + // Max values will be updated in the state property map by the callback + final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); + try { + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); + } catch (SQLException e) { + throw new ProcessException("Error during database query or conversion of records to Avro.", e); + } + }); - fileToProcess = session.create(); - fileToProcess = session.write(fileToProcess, out -> { - try { - logger.debug("Executing query {}", new Object[]{selectQuery}); - final ResultSet resultSet = st.executeQuery(selectQuery); - // Max values will be updated in the state property map by the callback - final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector)); + if (nrOfRows.get() > 0) { + // set attribute how many rows were selected + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - } catch (final SQLException e) { - throw new ProcessException("Error during database query or conversion of records to Avro", e); - } - }); + if(maxRowsPerFlowFile > 0) { + fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier); + fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); + } - if (nrOfRows.get() > 0) { - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); - String jdbcURL = "DBCPService"; - try { - DatabaseMetaData databaseMetaData = con.getMetaData(); - if (databaseMetaData != null) { - jdbcURL = databaseMetaData.getURL(); + session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + + resultSetFlowFiles.add(fileToProcess); + } else { + // If there were no rows returned, don't send the flowfile + session.remove(fileToProcess); + context.yield(); + break; } - } catch (SQLException se) { - // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly + + fragmentIndex++; } - session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(fileToProcess, REL_SUCCESS); - } else { - // If there were no rows returned, don't send the flowfile - session.remove(fileToProcess); - context.yield(); + } catch (final SQLException e) { + throw e; } + session.transfer(resultSetFlowFiles, REL_SUCCESS); + } catch (final ProcessException | SQLException e) { logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); - if (fileToProcess != null) { - session.remove(fileToProcess); + if (!resultSetFlowFiles.isEmpty()) { + session.remove(resultSetFlowFiles); } context.yield(); } finally { @@ -228,18 +285,17 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List maxValColumnNames, - StateMap stateMap) { + Map stateMap) { if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Table name must be specified"); } final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); // Check state map for last max values - if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) { - Map stateProperties = stateMap.toMap(); + if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) { List whereClauses = new ArrayList<>(maxValColumnNames.size()); for (String colName : maxValColumnNames) { - String maxValue = stateProperties.get(colName.toLowerCase()); + String maxValue = stateMap.get(colName.toLowerCase()); if (!StringUtils.isEmpty(maxValue)) { Integer type = columnTypeMap.get(colName.toLowerCase()); if (type == null) { @@ -260,6 +316,20 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } + protected Map getDefaultMaxValueProperties(final Map properties){ + final Map defaultMaxValues = new HashMap(); + + for (final Map.Entry entry : properties.entrySet()) { + final String key = entry.getKey().getName(); + + if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { continue; } + + defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue()); + } + + return defaultMaxValues; + } + protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { DatabaseAdapter dbAdapter; Map newColMap; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 5ece1490ba..8d81b34a01 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -79,7 +79,11 @@ public class JdbcCommon { return convertToAvroStream(rs, outStream, recordName, null); } - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) throws IOException, SQLException { + return convertToAvroStream(rs, outStream, recordName, callback, 0); + } + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows) throws SQLException, IOException { final Schema schema = createSchema(rs, recordName); final GenericRecord rec = new GenericData.Record(schema); @@ -155,6 +159,9 @@ public class JdbcCommon { } dataFileWriter.append(rec); nrOfRows += 1; + + if(maxRows > 0 && nrOfRows == maxRows) + break; } return nrOfRows; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index a373f8fe59..4279ca34de 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -43,6 +43,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.text.SimpleDateFormat; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -50,9 +51,11 @@ import java.sql.SQLNonTransientConnectionException; import java.sql.Statement; import java.sql.Types; import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -133,18 +136,18 @@ public class QueryDatabaseTableTest { StateManager stateManager = runner.getStateManager(); stateManager.setState(maxValues, Scope.CLUSTER); processor.putColumnType("id", Types.INTEGER); - query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER)); + query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509", query); maxValues.put("date_created", "2016-03-07 12:34:56"); stateManager.setState(maxValues, Scope.CLUSTER); processor.putColumnType("date_created", Types.TIMESTAMP); - query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER)); + query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query); // Test Oracle strategy dbAdapter = new OracleDatabaseAdapter(); - query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER)); + query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query); } @@ -329,7 +332,7 @@ public class QueryDatabaseTableTest { runner.clearTransferState(); } - @Test + @Test public void testWithNullIntColumn() throws SQLException { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); @@ -377,6 +380,162 @@ public class QueryDatabaseTableTest { assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty()); } + @Test + public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount=0; + //create larger row set + for(int batch=0;batch<100;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");//Using a non-round number to make sure the last file is ragged + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 12); + + //ensure all but the last file have 9 records each + for(int ff=0;ff<11;ff++) { + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + } + + //last file should have 1 record + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Run again, this time should be a single partial flow file + for(int batch=0;batch<5;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(5, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time should be a full batch and a partial + for(int batch=0;batch<14;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1).toByteArray()); + assertEquals(5, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again with a cleaned state. Should get all rows split into batches + int ffCount = (int) Math.ceil((double)rowCount / 9D); + runner.getStateManager().clear(Scope.CLUSTER); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount); + + //ensure all but the last file have 9 records each + for(int ff=0;ff datumReader = new GenericDatumReader<>(); try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) {