NIFI-5121: Added DBCPService API method for passing in flow file attributes when available

This closes #2658

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Matthew Burgess 2018-04-25 14:36:16 -04:00 committed by Mike Thomsen
parent 500a254e3f
commit 099bfcdf3a
15 changed files with 41 additions and 18 deletions

View File

@ -32,7 +32,7 @@ public class PartialFunctions {
@FunctionalInterface @FunctionalInterface
public interface InitConnection<FC, C> { public interface InitConnection<FC, C> {
C apply(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException; C apply(ProcessContext context, ProcessSession session, FC functionContext, FlowFile flowFile) throws ProcessException;
} }
@FunctionalInterface @FunctionalInterface

View File

@ -93,7 +93,8 @@ public class Put<FC, C extends AutoCloseable> {
return; return;
} }
try (C connection = initConnection.apply(context, session, functionContext)) { // Only pass in a flow file if there is a single one present
try (C connection = initConnection.apply(context, session, functionContext, flowFiles.size() == 1 ? flowFiles.get(0) : null)) {
try { try {
// Execute the core function. // Execute the core function.

View File

@ -336,7 +336,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
private void onInitSQL(HashMap SQL) throws SQLException { private void onInitSQL(HashMap SQL) throws SQLException {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) { for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
DBCPService s = (DBCPService) e.getValue(); DBCPService s = (DBCPService) e.getValue();
OSql sql = new OSql(s.getConnection()); OSql sql = new OSql(s.getConnection(Collections.emptyMap()));
//try to set autocommit to false //try to set autocommit to false
try { try {
if (sql.getConnection().getAutoCommit()) { if (sql.getConnection().getAutoCommit()) {

View File

@ -17,7 +17,7 @@
//just check that it's possible to access controller services //just check that it's possible to access controller services
def ff=session.create() def ff=session.create()
def con=CTL.mydbcp.getConnection() def con=CTL.mydbcp.getConnection([:])
assert con instanceof java.sql.Connection assert con instanceof java.sql.Connection
con.close(); con.close();
ff.write('UTF-8', 'OK') ff.write('UTF-8', 'OK')

View File

@ -201,9 +201,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
} }
} }
private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc) -> { private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
final Connection connection = dbcpService.getConnection(); final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
fc.connectionUrl = dbcpService.getConnectionURL(); fc.connectionUrl = dbcpService.getConnectionURL();
return connection; return connection;
}; };

View File

@ -308,7 +308,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean(); final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
final String fragmentIdentifier = UUID.randomUUID().toString(); final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement()) final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement())
) { ) {

View File

@ -46,6 +46,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -260,7 +261,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes());
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {
// Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible

View File

@ -28,6 +28,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types; import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -315,7 +316,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
if (schema == null) { if (schema == null) {
// No schema exists for this table yet. Query the database to determine the schema and put it into the cache. // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) { try (final Connection conn = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes())) {
schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys); schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
schemaCache.put(schemaKey, schema); schemaCache.put(schemaKey, schema);
} catch (final SQLException e) { } catch (final SQLException e) {

View File

@ -216,7 +216,7 @@ public class ExecuteSQL extends AbstractProcessor {
} }
int resultCount=0; int resultCount=0;
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
final PreparedStatement st = con.prepareStatement(selectQuery)) { final PreparedStatement st = con.prepareStatement(selectQuery)) {
st.setQueryTimeout(queryTimeout); // timeout in seconds st.setQueryTimeout(queryTimeout); // timeout in seconds

View File

@ -277,7 +277,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null); final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
long rowCount = 0; long rowCount = 0;
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection(finalFileToProcess == null ? Collections.emptyMap() : finalFileToProcess.getAttributes());
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();

View File

@ -225,7 +225,7 @@ public class ListDatabaseTables extends AbstractProcessor {
throw new ProcessException(ioe); throw new ProcessException(ioe);
} }
try (final Connection con = dbcpService.getConnection()) { try (final Connection con = dbcpService.getConnection(Collections.emptyMap())) {
DatabaseMetaData dbMetaData = con.getMetaData(); DatabaseMetaData dbMetaData = con.getMetaData();
ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes); ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes);

View File

@ -329,8 +329,9 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
.build(); .build();
} }
private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> { private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> {
final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class).getConnection(); final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class)
.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
try { try {
fc.originalAutoCommit = connection.getAutoCommit(); fc.originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);

View File

@ -64,6 +64,7 @@ import java.sql.SQLNonTransientException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet; import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -233,8 +234,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
return poll.getFlowFiles(); return poll.getFlowFiles();
}; };
private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> { private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> {
final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getConnection(); final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)
.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
try { try {
fc.originalAutoCommit = connection.getAutoCommit(); fc.originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);

View File

@ -286,7 +286,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString(); final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {
if (fetchSize != null && fetchSize > 0) { if (fetchSize != null && fetchSize > 0) {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.dbcp; package org.apache.nifi.dbcp;
import java.sql.Connection; import java.sql.Connection;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -30,5 +31,21 @@ import org.apache.nifi.processor.exception.ProcessException;
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"}) @Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.") @CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
public interface DBCPService extends ControllerService { public interface DBCPService extends ControllerService {
public Connection getConnection() throws ProcessException; Connection getConnection() throws ProcessException;
/**
* Allows a Map of attributes to be passed to the DBCPService for use in configuration, etc.
* An implementation will want to override getConnection() to return getConnection(Collections.emptyMap()),
* and override this method (possibly with its existing getConnection() implementation).
* @param attributes a Map of attributes to be passed to the DBCPService. The use of these
* attributes is implementation-specific, and the source of the attributes
* is processor-specific
* @return a Connection from the specifed/configured connection pool(s)
* @throws ProcessException if an error occurs while getting a connection
*/
default Connection getConnection(Map<String,String> attributes) throws ProcessException {
// default implementation (for backwards compatibility) is to call getConnection()
// without attributes
return getConnection();
}
} }