diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index d69a280fd5..c1fc0bc244 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -174,6 +174,7 @@ public abstract class NiFiProperties { public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads"; + public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads"; public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; @@ -245,7 +246,8 @@ public abstract class NiFiProperties { public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec"; // cluster node defaults - public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2; + public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10; + public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS = 50; public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs"; public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins"; @@ -678,7 +680,15 @@ public abstract class NiFiProperties { } } + /** + * @deprecated Use getClusterNodeProtocolCorePoolSize() and getClusterNodeProtocolMaxPoolSize() instead + */ + @Deprecated() public int getClusterNodeProtocolThreads() { + return getClusterNodeProtocolCorePoolSize(); + } + + public int getClusterNodeProtocolCorePoolSize() { try { return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS)); } catch (NumberFormatException nfe) { @@ -686,6 +696,14 @@ public abstract class NiFiProperties { } } + public int getClusterNodeProtocolMaxPoolSize() { + try { + return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_MAX_THREADS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS; + } + } + public boolean isClustered() { return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); } diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index be7b267e9b..bc823bcc6a 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -1449,7 +1449,13 @@ For each Node, the minimum properties to configure are as follows: ** nifi.cluster.node.address - Set this to the fully qualified hostname of the node. If left blank, it defaults to "localhost". ** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root). ** nifi.cluster.node.protocol.threads - The number of threads that should be used to communicate with other nodes in the cluster. This property - defaults to 10, but for large clusters, this value may need to be larger. + defaults to 10. A thread pool is used for replicating requests to all nodes, and the + thread pool will never have fewer than this number of threads. It will grow as needed up to the maximum value set by the `nifi.cluster.node.protocol.max.threads` + property. +** nifi.cluster.node.protocol.max.threads - The maximum number of threads that should be used to communicate with other nodes in the cluster. This property + defaults to 50. A thread pool is used for replication requests to all nodes, and the thread pool will have a "core" size that is configured by the + `nifi.cluster.node.protocol.threads` property. However, if necessary, the thread pool will increase the number of active threads to the limit + set by this property. ** nifi.zookeeper.connect.string - The Connect String that is needed to connect to Apache ZooKeeper. This is a comma-separted list of hostname:port pairs. For example, localhost:2181,localhost:2182,localhost:2183. This should contain a list of all ZooKeeper instances in the ZooKeeper quorum. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index a8f9a7da09..d2c7b381b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -23,6 +23,36 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; import com.sun.jersey.core.util.MultivaluedMapImpl; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response.Status; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; @@ -49,35 +79,6 @@ import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response.Status; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.LongSummaryStatistics; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; - public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); @@ -104,35 +105,39 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { /** * Creates an instance using a connection timeout and read timeout of 3 seconds * - * @param numThreads the number of threads to use when parallelizing requests - * @param client a client for making requests + * @param corePoolSize core size of the thread pool + * @param maxPoolSize the max number of threads in the thread pool + * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses - * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. - * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. - * @param nifiProperties properties + * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. + * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. + * @param nifiProperties properties */ - public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, + public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { - this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); + this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); } /** * Creates an instance. * - * @param numThreads the number of threads to use when parallelizing requests - * @param client a client for making requests + * @param corePoolSize core size of the thread pool + * @param maxPoolSize the max number of threads in the thread pool + * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses - * @param connectionTimeout the connection timeout specified in milliseconds - * @param readTimeout the read timeout specified in milliseconds - * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. - * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. - * @param nifiProperties properties + * @param connectionTimeout the connection timeout specified in milliseconds + * @param readTimeout the read timeout specified in milliseconds + * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. + * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. + * @param nifiProperties properties */ - public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, + public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { - if (numThreads <= 0) { - throw new IllegalArgumentException("The number of threads must be greater than zero."); + if (corePoolSize <= 0) { + throw new IllegalArgumentException("The Core Pool Size must be greater than zero."); + } else if (maxPoolSize < corePoolSize) { + throw new IllegalArgumentException("Max Pool Size must be >= Core Pool Size."); } else if (client == null) { throw new IllegalArgumentException("Client may not be null."); } @@ -150,12 +155,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE); final AtomicInteger threadId = new AtomicInteger(0); - executorService = Executors.newFixedThreadPool(numThreads, r -> { + final ThreadFactory threadFactory = r -> { final Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); t.setName("Replicate Request Thread-" + threadId.incrementAndGet()); return t; - }); + }; + + executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java index 31c3b1d433..128075e07c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java @@ -44,12 +44,13 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean givenHeaders, final StandardAsyncClusterResponse response) { @@ -303,8 +303,8 @@ public class TestThreadPoolRequestReplicator { nodeMap.put(NodeConnectionState.CONNECTING, otherState); Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); - final ThreadPoolRequestReplicator replicator - = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { + final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { @Override public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated, boolean verify) { @@ -361,8 +361,8 @@ public class TestThreadPoolRequestReplicator { final ClusterCoordinator coordinator = createClusterCoordinator(); final AtomicInteger requestCount = new AtomicInteger(0); - final ThreadPoolRequestReplicator replicator - = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { + final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map givenHeaders, final StandardAsyncClusterResponse response) { @@ -572,7 +572,7 @@ public class TestThreadPoolRequestReplicator { private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) { final ClusterCoordinator coordinator = createClusterCoordinator(); final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map givenHeaders, final StandardAsyncClusterResponse response) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 03d13ae4db..7dd24de83e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -156,6 +156,7 @@ 10 + 50 25 5 sec 5 sec diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index dadc5e6dd1..8167c4944b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -175,6 +175,7 @@ nifi.cluster.is.node=${nifi.cluster.is.node} nifi.cluster.node.address=${nifi.cluster.node.address} nifi.cluster.node.protocol.port=${nifi.cluster.node.protocol.port} nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads} +nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads} nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size} nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout} nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java index a5bf7e5555..a522fa5083 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java @@ -52,8 +52,12 @@ public class TimerFilter implements Filter { } finally { final long stop = System.nanoTime(); final String requestId = ((HttpServletRequest) req).getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); - logger.debug("{} {} from {} request duration for Request ID {}: {} millis", request.getMethod(), request.getRequestURL().toString(), - req.getRemoteHost(), requestId, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS)); + final String replicationHeader = ((HttpServletRequest) req).getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER); + final boolean validationPhase = RequestReplicator.NODE_CONTINUE.equals(replicationHeader); + final String requestDescription = validationPhase ? "Validation Phase of Request " + requestId : "Request ID " + requestId; + + logger.debug("{} {} from {} duration for {}: {} millis", request.getMethod(), request.getRequestURL().toString(), + req.getRemoteHost(), requestDescription, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS)); } }