mirror of https://github.com/apache/nifi.git
NIFI-5970 Handle multiple input FlowFiles at Put.initConnection
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #3583
This commit is contained in:
parent
efff1b41f0
commit
b3880a4a06
|
@ -32,7 +32,7 @@ public class PartialFunctions {
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface InitConnection<FC, C> {
|
public interface InitConnection<FC, C> {
|
||||||
C apply(ProcessContext context, ProcessSession session, FC functionContext, FlowFile flowFile) throws ProcessException;
|
C apply(ProcessContext context, ProcessSession session, FC functionContext, List<FlowFile> flowFiles) throws ProcessException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
|
|
|
@ -93,8 +93,8 @@ public class Put<FC, C extends AutoCloseable> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only pass in a flow file if there is a single one present
|
// Pass the FlowFiles to initialize a connection
|
||||||
try (C connection = initConnection.apply(context, session, functionContext, flowFiles.size() == 1 ? flowFiles.get(0) : null)) {
|
try (C connection = initConnection.apply(context, session, functionContext, flowFiles)) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Execute the core function.
|
// Execute the core function.
|
||||||
|
|
|
@ -204,9 +204,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
|
private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> {
|
||||||
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
|
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
|
||||||
final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
|
final Connection connection = dbcpService.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
|
||||||
fc.connectionUrl = dbcpService.getConnectionURL();
|
fc.connectionUrl = dbcpService.getConnectionURL();
|
||||||
return connection;
|
return connection;
|
||||||
};
|
};
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class PutHive3QL extends AbstractHive3QLProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
|
private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> {
|
||||||
final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
|
final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
|
||||||
final Connection connection = dbcpService.getConnection();
|
final Connection connection = dbcpService.getConnection();
|
||||||
fc.connectionUrl = dbcpService.getConnectionURL();
|
fc.connectionUrl = dbcpService.getConnectionURL();
|
||||||
|
|
|
@ -204,9 +204,9 @@ public class PutHive_1_1QL extends AbstractHive_1_1QLProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
|
private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> {
|
||||||
final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class);
|
final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class);
|
||||||
final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
|
final Connection connection = dbcpService.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
|
||||||
fc.connectionUrl = dbcpService.getConnectionURL();
|
fc.connectionUrl = dbcpService.getConnectionURL();
|
||||||
return connection;
|
return connection;
|
||||||
};
|
};
|
||||||
|
|
|
@ -358,9 +358,9 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> {
|
private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> {
|
||||||
final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class)
|
final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class)
|
||||||
.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
|
.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
|
||||||
try {
|
try {
|
||||||
fc.originalAutoCommit = connection.getAutoCommit();
|
fc.originalAutoCommit = connection.getAutoCommit();
|
||||||
connection.setAutoCommit(false);
|
connection.setAutoCommit(false);
|
||||||
|
|
|
@ -275,9 +275,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
|
||||||
return poll.getFlowFiles();
|
return poll.getFlowFiles();
|
||||||
};
|
};
|
||||||
|
|
||||||
private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> {
|
private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> {
|
||||||
final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)
|
final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)
|
||||||
.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
|
.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
|
||||||
try {
|
try {
|
||||||
fc.originalAutoCommit = connection.getAutoCommit();
|
fc.originalAutoCommit = connection.getAutoCommit();
|
||||||
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
|
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
|
||||||
|
@ -621,13 +621,18 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
|
||||||
boolean fragmentedTransaction = false;
|
boolean fragmentedTransaction = false;
|
||||||
|
|
||||||
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
|
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
|
||||||
|
final FlowFileFilter dbcpServiceFlowFileFilter = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getFlowFileFilter(batchSize);
|
||||||
List<FlowFile> flowFiles;
|
List<FlowFile> flowFiles;
|
||||||
if (useTransactions) {
|
if (useTransactions) {
|
||||||
final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter();
|
final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(dbcpServiceFlowFileFilter);
|
||||||
flowFiles = session.get(filter);
|
flowFiles = session.get(filter);
|
||||||
fragmentedTransaction = filter.isFragmentedTransaction();
|
fragmentedTransaction = filter.isFragmentedTransaction();
|
||||||
} else {
|
} else {
|
||||||
|
if (dbcpServiceFlowFileFilter == null) {
|
||||||
flowFiles = session.get(batchSize);
|
flowFiles = session.get(batchSize);
|
||||||
|
} else {
|
||||||
|
flowFiles = session.get(dbcpServiceFlowFileFilter);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (flowFiles.isEmpty()) {
|
if (flowFiles.isEmpty()) {
|
||||||
|
@ -804,14 +809,28 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
|
||||||
* across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction
|
* across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction
|
||||||
*/
|
*/
|
||||||
static class TransactionalFlowFileFilter implements FlowFileFilter {
|
static class TransactionalFlowFileFilter implements FlowFileFilter {
|
||||||
|
private final FlowFileFilter nonFragmentedTransactionFilter;
|
||||||
private String selectedId = null;
|
private String selectedId = null;
|
||||||
private int numSelected = 0;
|
private int numSelected = 0;
|
||||||
private boolean ignoreFragmentIdentifiers = false;
|
private boolean ignoreFragmentIdentifiers = false;
|
||||||
|
|
||||||
|
public TransactionalFlowFileFilter(FlowFileFilter nonFragmentedTransactionFilter) {
|
||||||
|
this.nonFragmentedTransactionFilter = nonFragmentedTransactionFilter;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isFragmentedTransaction() {
|
public boolean isFragmentedTransaction() {
|
||||||
return !ignoreFragmentIdentifiers;
|
return !ignoreFragmentIdentifiers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FlowFileFilterResult filterNonFragmentedTransaction(final FlowFile flowFile) {
|
||||||
|
if (nonFragmentedTransactionFilter == null) {
|
||||||
|
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
|
||||||
|
} else {
|
||||||
|
// Use non-fragmented tx filter for further filtering.
|
||||||
|
return nonFragmentedTransactionFilter.filter(flowFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FlowFileFilterResult filter(final FlowFile flowFile) {
|
public FlowFileFilterResult filter(final FlowFile flowFile) {
|
||||||
final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR);
|
final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR);
|
||||||
|
@ -821,7 +840,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
|
||||||
// we accept any FlowFile that is also not part of a fragmented transaction.
|
// we accept any FlowFile that is also not part of a fragmented transaction.
|
||||||
if (ignoreFragmentIdentifiers) {
|
if (ignoreFragmentIdentifiers) {
|
||||||
if (fragmentId == null || "1".equals(fragCount)) {
|
if (fragmentId == null || "1".equals(fragCount)) {
|
||||||
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
|
return filterNonFragmentedTransaction(flowFile);
|
||||||
} else {
|
} else {
|
||||||
return FlowFileFilterResult.REJECT_AND_CONTINUE;
|
return FlowFileFilterResult.REJECT_AND_CONTINUE;
|
||||||
}
|
}
|
||||||
|
@ -831,7 +850,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
|
||||||
if (selectedId == null) {
|
if (selectedId == null) {
|
||||||
// Only one FlowFile in the transaction.
|
// Only one FlowFile in the transaction.
|
||||||
ignoreFragmentIdentifiers = true;
|
ignoreFragmentIdentifiers = true;
|
||||||
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
|
return filterNonFragmentedTransaction(flowFile);
|
||||||
} else {
|
} else {
|
||||||
// we've already selected 1 FlowFile, and this one doesn't match.
|
// we've already selected 1 FlowFile, and this one doesn't match.
|
||||||
return FlowFileFilterResult.REJECT_AND_CONTINUE;
|
return FlowFileFilterResult.REJECT_AND_CONTINUE;
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
|
||||||
|
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
|
||||||
|
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -38,6 +41,7 @@ import java.time.LocalTime;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -47,6 +51,7 @@ import javax.xml.bind.DatatypeConverter;
|
||||||
import org.apache.commons.lang3.RandomUtils;
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.dbcp.DBCPService;
|
import org.apache.nifi.dbcp.DBCPService;
|
||||||
|
import org.apache.nifi.processor.FlowFileFilter;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
|
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
@ -1458,6 +1463,72 @@ public class TestPutSQL {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, String> createFragmentedTransactionAttributes(String id, int count, int index) {
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("fragment.identifier", id);
|
||||||
|
attributes.put("fragment.count", String.valueOf(count));
|
||||||
|
attributes.put("fragment.index", String.valueOf(index));
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransactionalFlowFileFilter() {
|
||||||
|
final MockFlowFile ff0 = new MockFlowFile(0);
|
||||||
|
final MockFlowFile ff1 = new MockFlowFile(1);
|
||||||
|
final MockFlowFile ff2 = new MockFlowFile(2);
|
||||||
|
final MockFlowFile ff3 = new MockFlowFile(3);
|
||||||
|
final MockFlowFile ff4 = new MockFlowFile(4);
|
||||||
|
|
||||||
|
ff0.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 0));
|
||||||
|
ff1.putAttributes(Collections.singletonMap("accept", "false"));
|
||||||
|
ff2.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 1));
|
||||||
|
ff3.putAttributes(Collections.singletonMap("accept", "true"));
|
||||||
|
ff4.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 2));
|
||||||
|
|
||||||
|
// TEST 1: Fragmented TX with null service filter
|
||||||
|
// Even if the controller service does not have filtering rule, tx filter should work.
|
||||||
|
FlowFileFilter txFilter = new PutSQL.TransactionalFlowFileFilter(null);
|
||||||
|
// Should perform a fragmented tx.
|
||||||
|
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
|
||||||
|
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
|
||||||
|
assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));
|
||||||
|
|
||||||
|
// TEST 2: Non-Fragmented TX with null service filter
|
||||||
|
txFilter = new PutSQL.TransactionalFlowFileFilter(null);
|
||||||
|
// Should perform a non-fragmented tx.
|
||||||
|
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff1));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
|
||||||
|
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));
|
||||||
|
|
||||||
|
|
||||||
|
final FlowFileFilter nonTxFilter = flowFile -> "true".equals(flowFile.getAttribute("accept"))
|
||||||
|
? ACCEPT_AND_CONTINUE
|
||||||
|
: REJECT_AND_CONTINUE;
|
||||||
|
|
||||||
|
// TEST 3: Fragmented TX with a service filter
|
||||||
|
// Even if the controller service does not have filtering rule, tx filter should work.
|
||||||
|
txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
|
||||||
|
// Should perform a fragmented tx. The nonTxFilter doesn't affect in this case.
|
||||||
|
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
|
||||||
|
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
|
||||||
|
assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));
|
||||||
|
|
||||||
|
// TEST 4: Non-Fragmented TX with a service filter
|
||||||
|
txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
|
||||||
|
// Should perform a non-fragmented tx and use the nonTxFilter.
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
|
||||||
|
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
|
||||||
|
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple implementation only for testing purposes
|
* Simple implementation only for testing purposes
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,12 +18,18 @@ package org.apache.nifi.dbcp;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
|
import org.apache.nifi.processor.FlowFileFilter;
|
||||||
|
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
|
||||||
|
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
|
||||||
|
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Definition for Database Connection Pooling Service.
|
* Definition for Database Connection Pooling Service.
|
||||||
*
|
*
|
||||||
|
@ -48,4 +54,45 @@ public interface DBCPService extends ControllerService {
|
||||||
// without attributes
|
// without attributes
|
||||||
return getConnection();
|
return getConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation classes should override this method to provide DBCPService specific FlowFile filtering rule.
|
||||||
|
* For example, when processing multiple incoming FlowFiles at the same time, every FlowFile should have the same attribute value.
|
||||||
|
* Components using this service and also accepting multiple incoming FlowFiles should use
|
||||||
|
* the FlowFileFilter returned by this method to get target FlowFiles from a process session.
|
||||||
|
* @return a FlowFileFilter or null if no service specific filtering is required
|
||||||
|
*/
|
||||||
|
default FlowFileFilter getFlowFileFilter() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An utility default method to composite DBCPService specific filtering provided by {@link #getFlowFileFilter()} and batch size limitation.
|
||||||
|
* Implementation classes do not have to override this method. Instead, override {@link #getFlowFileFilter()} to provide service specific filtering.
|
||||||
|
* Components using this service and also accepting multiple incoming FlowFiles should use
|
||||||
|
* the FlowFileFilter returned by this method to get target FlowFiles from a process session.
|
||||||
|
* @param batchSize the maximum number of FlowFiles to accept
|
||||||
|
* @return a composited FlowFileFilter having service specific filtering and batch size limitation, or null if no service specific filtering is required.
|
||||||
|
*/
|
||||||
|
default FlowFileFilter getFlowFileFilter(int batchSize) {
|
||||||
|
final FlowFileFilter filter = getFlowFileFilter();
|
||||||
|
if (filter == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AtomicInteger count = new AtomicInteger(0);
|
||||||
|
return flowFile -> {
|
||||||
|
if (count.get() >= batchSize) {
|
||||||
|
return REJECT_AND_TERMINATE;
|
||||||
|
}
|
||||||
|
|
||||||
|
final FlowFileFilterResult result = filter.filter(flowFile);
|
||||||
|
if (ACCEPT_AND_CONTINUE.equals(result)) {
|
||||||
|
count.incrementAndGet();
|
||||||
|
return ACCEPT_AND_CONTINUE;
|
||||||
|
} else {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,16 +16,21 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.dbcp;
|
package org.apache.nifi.dbcp;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
import org.apache.nifi.processor.FlowFileFilter;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
|
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
|
||||||
|
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
|
||||||
|
|
||||||
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
|
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
|
||||||
@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
|
@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
|
||||||
|
@ -61,4 +66,17 @@ public class DBCPConnectionPoolLookup
|
||||||
public Connection getConnection(Map<String, String> attributes) {
|
public Connection getConnection(Map<String, String> attributes) {
|
||||||
return lookupService(attributes).getConnection(attributes);
|
return lookupService(attributes).getConnection(attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FlowFileFilter getFlowFileFilter() {
|
||||||
|
final AtomicReference<String> ref = new AtomicReference<>();
|
||||||
|
return flowFile -> {
|
||||||
|
final String flowFileDBName = flowFile.getAttribute(DATABASE_NAME_ATTRIBUTE);
|
||||||
|
if (StringUtils.isEmpty(flowFileDBName)) {
|
||||||
|
throw new ProcessException("FlowFile attributes must contain an attribute name '" + DATABASE_NAME_ATTRIBUTE + "'");
|
||||||
|
}
|
||||||
|
final String databaseName = ref.compareAndSet(null, flowFileDBName) ? flowFileDBName : ref.get();
|
||||||
|
return flowFileDBName.equals(databaseName) ? ACCEPT_AND_CONTINUE : REJECT_AND_CONTINUE;
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,14 +17,17 @@
|
||||||
package org.apache.nifi.dbcp;
|
package org.apache.nifi.dbcp;
|
||||||
|
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
|
import org.apache.nifi.processor.FlowFileFilter;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -144,6 +147,54 @@ public class TestDBCPConnectionPoolLookup {
|
||||||
runner.assertNotValid(dbcpLookupService);
|
runner.assertNotValid(dbcpLookupService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlowFileFiltering() {
|
||||||
|
final FlowFileFilter filter = dbcpLookupService.getFlowFileFilter();
|
||||||
|
assertNotNull(filter);
|
||||||
|
|
||||||
|
final MockFlowFile ff0 = new MockFlowFile(0);
|
||||||
|
final MockFlowFile ff1 = new MockFlowFile(1);
|
||||||
|
final MockFlowFile ff2 = new MockFlowFile(2);
|
||||||
|
final MockFlowFile ff3 = new MockFlowFile(3);
|
||||||
|
final MockFlowFile ff4 = new MockFlowFile(4);
|
||||||
|
|
||||||
|
ff0.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
|
||||||
|
ff1.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
|
||||||
|
ff2.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
|
||||||
|
ff3.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
|
||||||
|
ff4.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
|
||||||
|
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff0));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff1));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff2));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff3));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff4));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlowFileFilteringWithBatchSize() {
|
||||||
|
final FlowFileFilter filter = dbcpLookupService.getFlowFileFilter(2);
|
||||||
|
assertNotNull(filter);
|
||||||
|
|
||||||
|
final MockFlowFile ff0 = new MockFlowFile(0);
|
||||||
|
final MockFlowFile ff1 = new MockFlowFile(1);
|
||||||
|
final MockFlowFile ff2 = new MockFlowFile(2);
|
||||||
|
final MockFlowFile ff3 = new MockFlowFile(3);
|
||||||
|
final MockFlowFile ff4 = new MockFlowFile(4);
|
||||||
|
|
||||||
|
ff0.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
|
||||||
|
ff1.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
|
||||||
|
ff2.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
|
||||||
|
ff3.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
|
||||||
|
ff4.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
|
||||||
|
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff0));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff1));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff2));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE, filter.filter(ff3));
|
||||||
|
assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE, filter.filter(ff4));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mock DBCPService that will always return the passed in MockConnection.
|
* A mock DBCPService that will always return the passed in MockConnection.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue