diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java new file mode 100644 index 0000000000..d594913fac --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A processor to retrieve a list of tables (and their metadata) from a database connection + */ +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "list", "jdbc", "table", "database"}) +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection. Once " + + "metadata about a table has been fetched, it will not be fetched again until the Refresh Interval (if set) has elapsed, or until state has been " + + "manually cleared.") +@WritesAttributes({ + @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"), + @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"), + @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"), + @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"), + @WritesAttribute(attribute = "db.table.type", + description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", " + + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""), + @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"), + @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table") +}) +@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of tables, the timestamp of the query is stored. " + + "This allows the Processor to not re-list tables the next time that the Processor is run. Specifying the refresh interval in the processor properties will " + + "indicate that when the processor detects the interval has elapsed, the state will be reset and tables will be re-listed as a result. " + + "This processor is meant to be run on the primary node only.") +public class ListDatabaseTables extends AbstractProcessor { + + // Attribute names + public static final String DB_TABLE_NAME = "db.table.name"; + public static final String DB_TABLE_CATALOG = "db.table.catalog"; + public static final String DB_TABLE_SCHEMA = "db.table.schema"; + public static final String DB_TABLE_FULLNAME = "db.table.fullname"; + public static final String DB_TABLE_TYPE = "db.table.type"; + public static final String DB_TABLE_REMARKS = "db.table.remarks"; + public static final String DB_TABLE_COUNT = "db.table.count"; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are received are routed to success") + .build(); + + // Property descriptors + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("list-db-tables-db-connection") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder() + .name("list-db-tables-catalog") + .displayName("Catalog") + .description("The name of a catalog from which to list database tables. The name must match the catalog name as it is stored in the database. " + + "If the property is not set, the catalog name will not be used to narrow the search for tables. If the property is set to an empty string, " + + "tables without a catalog will be listed.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor SCHEMA_PATTERN = new PropertyDescriptor.Builder() + .name("list-db-tables-schema-pattern") + .displayName("Schema Pattern") + .description("A pattern for matching schemas in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, " + + "and \"_\" means match any one character. The pattern must match the schema name as it is stored in the database. " + + "If the property is not set, the schema name will not be used to narrow the search for tables. If the property is set to an empty string, " + + "tables without a schema will be listed.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder() + .name("list-db-tables-name-pattern") + .displayName("Table Name Pattern") + .description("A pattern for matching tables in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, " + + "and \"_\" means match any one character. The pattern must match the table name as it is stored in the database. " + + "If the property is not set, all tables will be retrieved.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor TABLE_TYPES = new PropertyDescriptor.Builder() + .name("list-db-tables-types") + .displayName("Table Types") + .description("A comma-separated list of table types to include. For example, some databases support TABLE and VIEW types. If the property is not set, " + + "tables of all types will be returned.") + .required(false) + .defaultValue("TABLE") + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor INCLUDE_COUNT = new PropertyDescriptor.Builder() + .name("list-db-include-count") + .displayName("Include Count") + .description("Whether to include the table's row count as a flow file attribute. This affects performance as a database query will be generated " + + "for each table in the retrieved list.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor REFRESH_INTERVAL = new PropertyDescriptor.Builder() + .name("list-db-refresh-interval") + .displayName("Refresh Interval") + .description("The amount of time to elapse before resetting the processor state, thereby causing all current tables to be listed. " + + "During this interval, the processor may continue to run, but tables that have already been listed will not be re-listed. However new/added " + + "tables will be listed as the processor runs. A value of zero means the state will never be automatically reset, the user must " + + "Clear State manually.") + .required(true) + .defaultValue("0 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + private static final List propertyDescriptors; + private static final Set relationships; + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(DBCP_SERVICE); + _propertyDescriptors.add(CATALOG); + _propertyDescriptors.add(SCHEMA_PATTERN); + _propertyDescriptors.add(TABLE_NAME_PATTERN); + _propertyDescriptors.add(TABLE_TYPES); + _propertyDescriptors.add(INCLUDE_COUNT); + _propertyDescriptors.add(REFRESH_INTERVAL); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String catalog = context.getProperty(CATALOG).getValue(); + final String schemaPattern = context.getProperty(SCHEMA_PATTERN).getValue(); + final String tableNamePattern = context.getProperty(TABLE_NAME_PATTERN).getValue(); + final String[] tableTypes = context.getProperty(TABLE_TYPES).isSet() + ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*") + : null; + final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean(); + final long refreshInterval = context.getProperty(REFRESH_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + final Map stateMapProperties; + try { + stateMap = stateManager.getState(Scope.CLUSTER); + stateMapProperties = new HashMap<>(stateMap.toMap()); + } catch (IOException ioe) { + throw new ProcessException(ioe); + } + + try (final Connection con = dbcpService.getConnection()) { + + DatabaseMetaData dbMetaData = con.getMetaData(); + ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes); + while (rs.next()) { + final String tableCatalog = rs.getString(1); + final String tableSchema = rs.getString(2); + final String tableName = rs.getString(3); + final String tableType = rs.getString(4); + final String tableRemarks = rs.getString(5); + + // Build fully-qualified name + String fqn = Stream.of(tableCatalog, tableSchema, tableName) + .filter(segment -> !StringUtils.isEmpty(segment)) + .collect(Collectors.joining(".")); + + String lastTimestampForTable = stateMapProperties.get(fqn); + boolean refreshTable = true; + try { + // Refresh state if the interval has elapsed + long lastRefreshed = -1; + final long currentTime = System.currentTimeMillis(); + if (!StringUtils.isEmpty(lastTimestampForTable)) { + lastRefreshed = Long.parseLong(lastTimestampForTable); + } + if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval))) { + stateMapProperties.remove(lastTimestampForTable); + } else { + refreshTable = false; + } + } catch (final NumberFormatException nfe) { + getLogger().error("Failed to retrieve observed last table fetches from the State Manager. Will not perform " + + "query until this is accomplished.", nfe); + context.yield(); + return; + } + if (refreshTable) { + FlowFile flowFile = session.create(); + logger.info("Found {}: {}", new Object[]{tableType, fqn}); + if (includeCount) { + try (Statement st = con.createStatement()) { + final String countQuery = "SELECT COUNT(1) FROM " + fqn; + + logger.debug("Executing query: {}", new Object[]{countQuery}); + ResultSet countResult = st.executeQuery(countQuery); + if (countResult.next()) { + flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1))); + } + } catch (SQLException se) { + logger.error("Couldn't get row count for {}", new Object[]{fqn}); + session.remove(flowFile); + continue; + } + } + if (tableCatalog != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog); + } + if (tableSchema != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema); + } + flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName); + flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn); + flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType); + if (tableRemarks != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks); + } + + String transitUri; + try { + transitUri = dbMetaData.getURL(); + } catch (SQLException sqle) { + transitUri = ""; + } + session.getProvenanceReporter().receive(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis())); + } + } + // Update the timestamps for listed tables + if (stateMap.getVersion() == -1) { + stateManager.setState(stateMapProperties, Scope.CLUSTER); + } else { + stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER); + } + + } catch (final SQLException | IOException e) { + throw new ProcessException(e); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 2ff466e076..d23d88dd19 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -46,6 +46,7 @@ org.apache.nifi.processors.standard.JoltTransformJSON org.apache.nifi.processors.standard.GenerateTableFetch org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic +org.apache.nifi.processors.standard.ListDatabaseTables org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenRELP diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java new file mode 100644 index 0000000000..1ff39f709e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + + +/** + * Unit tests for ListDatabaseTables processor. + */ +public class TestListDatabaseTables { + + TestRunner runner; + ListDatabaseTables processor; + + private final static String DB_LOCATION = "target/db_ldt"; + + @BeforeClass + public static void setupBeforeClass() throws IOException { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @AfterClass + public static void cleanUpAfterClass() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @Before + public void setUp() throws Exception { + processor = new ListDatabaseTables(); + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + runner = TestRunners.newTestRunner(ListDatabaseTables.class); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(ListDatabaseTables.DBCP_SERVICE, "dbcp"); + } + + @Test + public void testListTablesNoCount() throws Exception { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + // Do nothing, may not have existed + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + // Already got these tables, shouldn't get them again + runner.clearTransferState(); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 0); + } + + @Test + public void testListTablesWithCount() throws Exception { + runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true"); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + // Do nothing, may not have existed + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)"); + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + List results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS); + assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + assertEquals("0", results.get(1).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + + } + + @Test + public void testListTablesAfterRefresh() throws Exception { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + // Do nothing, may not have existed + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)"); + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + stmt.close(); + + runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true"); + runner.setProperty(ListDatabaseTables.REFRESH_INTERVAL, "100 millis"); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + List results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS); + assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + assertEquals("0", results.get(1).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + runner.clearTransferState(); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 0); + + // Now wait longer than 100 millis and assert the refresh has happened (the two tables are re-listed) + Thread.sleep(200); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + } + + @Test + public void testListTablesMultipleRefresh() throws Exception { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + // Do nothing, may not have existed + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true"); + runner.setProperty(ListDatabaseTables.REFRESH_INTERVAL, "200 millis"); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 1); + List results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS); + assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + runner.clearTransferState(); + + // Add another table immediately, the first table should not be listed again but the second should + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + stmt.close(); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 1); + results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS); + assertEquals("0", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + runner.clearTransferState(); + + // Now wait longer than the refresh interval and assert the refresh has happened (i.e. the two tables are re-listed) + Thread.sleep(500); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + } + + /** + * Simple implementation only for ListDatabaseTables processor testing. + */ + private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } +} \ No newline at end of file