diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index c1fc0bc244..4c9320512c 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -177,6 +177,7 @@ public abstract class NiFiProperties { public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads"; public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; + public static final String CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = "nifi.cluster.node.max.concurrent.requests"; public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time"; public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates"; @@ -244,6 +245,7 @@ public abstract class NiFiProperties { public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec"; public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec"; public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec"; + public static final int DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 100; // cluster node defaults public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10; @@ -561,6 +563,10 @@ public abstract class NiFiProperties { return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS); } + public int getClusterNodeMaxConcurrentRequests() { + return getIntegerProperty(CLUSTER_NODE_MAX_CONCURRENT_REQUESTS, DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS); + } + public File getWebWorkingDirectory() { return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index c38b3e9c9f..7bdf6fa15d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -84,11 +84,11 @@ import com.sun.jersey.core.util.MultivaluedMapImpl; public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); - private static final int MAX_CONCURRENT_REQUESTS = 100; private final Client client; // the client to use for issuing requests private final int connectionTimeoutMs; // connection timeout per node request private final int readTimeoutMs; // read timeout per node request + private final int maxConcurrentRequests; // maximum number of concurrent requests private final HttpResponseMapper responseMapper; private final EventReporter eventReporter; private final RequestCompletionCallback callback; @@ -109,15 +109,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @param corePoolSize core size of the thread pool * @param maxPoolSize the max number of threads in the thread pool + * @param maxConcurrentRequests maximum number of concurrent requests * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param nifiProperties properties */ - public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, + public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { - this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); + this(corePoolSize, maxPoolSize, maxConcurrentRequests, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); } /** @@ -125,6 +126,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @param corePoolSize core size of the thread pool * @param maxPoolSize the max number of threads in the thread pool + * @param maxConcurrentRequests maximum number of concurrent requests * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses * @param connectionTimeout the connection timeout specified in milliseconds @@ -133,7 +135,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param nifiProperties properties */ - public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, + public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator, final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { if (corePoolSize <= 0) { @@ -148,6 +150,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { this.clusterCoordinator = clusterCoordinator; this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); + this.maxConcurrentRequests = maxConcurrentRequests; this.responseMapper = new StandardHttpResponseMapper(nifiProperties); this.eventReporter = eventReporter; this.callback = callback; @@ -361,11 +364,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } int numRequests = responseMap.size(); - if (numRequests >= MAX_CONCURRENT_REQUESTS) { + if (numRequests >= maxConcurrentRequests) { numRequests = purgeExpiredRequests(); } - if (numRequests >= MAX_CONCURRENT_REQUESTS) { + if (numRequests >= maxConcurrentRequests) { final Map countsByUri = responseMap.values().stream().collect( Collectors.groupingBy( StandardAsyncClusterResponse::getURIPath, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java index 128075e07c..8943276a37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java @@ -46,11 +46,12 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean givenHeaders, final StandardAsyncClusterResponse response) { @@ -305,7 +305,7 @@ public class TestThreadPoolRequestReplicator { Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { @Override public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated, boolean verify) { @@ -363,7 +363,7 @@ public class TestThreadPoolRequestReplicator { final ClusterCoordinator coordinator = createClusterCoordinator(); final AtomicInteger requestCount = new AtomicInteger(0); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map givenHeaders, final StandardAsyncClusterResponse response) { @@ -573,7 +573,7 @@ public class TestThreadPoolRequestReplicator { private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) { final ClusterCoordinator coordinator = createClusterCoordinator(); final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map givenHeaders, final StandardAsyncClusterResponse response) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 8e740a9915..5cfe17c3e0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -160,6 +160,7 @@ 25 5 sec 5 sec + 100 5 mins diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 8167c4944b..f96d1672b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -179,6 +179,7 @@ nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads} nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size} nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout} nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout} +nifi.cluster.node.max.concurrent.requests=${nifi.cluster.node.max.concurrent.requests} nifi.cluster.firewall.file=${nifi.cluster.firewall.file} nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time} nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates}