NIFI-3568: This closes #1577. Use a cached thread pool in order to allow ThreadPoolRequestReplicator to scale up the number of threads to some configurable max

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-03-08 10:04:53 -05:00 committed by joewitt
parent 86728bac7e
commit 5aa3baca79
8 changed files with 99 additions and 61 deletions

View File

@ -174,6 +174,7 @@ public abstract class NiFiProperties {
public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address";
public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port";
public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads";
public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads";
public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file";
@ -245,7 +246,8 @@ public abstract class NiFiProperties {
public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec";
// cluster node defaults
public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10;
public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS = 50;
public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs";
public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins";
@ -678,7 +680,15 @@ public abstract class NiFiProperties {
}
}
/**
* @deprecated Use getClusterNodeProtocolCorePoolSize() and getClusterNodeProtocolMaxPoolSize() instead
*/
@Deprecated()
public int getClusterNodeProtocolThreads() {
return getClusterNodeProtocolCorePoolSize();
}
public int getClusterNodeProtocolCorePoolSize() {
try {
return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS));
} catch (NumberFormatException nfe) {
@ -686,6 +696,14 @@ public abstract class NiFiProperties {
}
}
public int getClusterNodeProtocolMaxPoolSize() {
try {
return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_MAX_THREADS));
} catch (NumberFormatException nfe) {
return DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS;
}
}
public boolean isClustered() {
return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
}

View File

@ -1449,7 +1449,13 @@ For each Node, the minimum properties to configure are as follows:
** nifi.cluster.node.address - Set this to the fully qualified hostname of the node. If left blank, it defaults to "localhost".
** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root).
** nifi.cluster.node.protocol.threads - The number of threads that should be used to communicate with other nodes in the cluster. This property
defaults to 10, but for large clusters, this value may need to be larger.
defaults to 10. A thread pool is used for replicating requests to all nodes, and the
thread pool will never have fewer than this number of threads. It will grow as needed up to the maximum value set by the `nifi.cluster.node.protocol.max.threads`
property.
** nifi.cluster.node.protocol.max.threads - The maximum number of threads that should be used to communicate with other nodes in the cluster. This property
defaults to 50. A thread pool is used for replication requests to all nodes, and the thread pool will have a "core" size that is configured by the
`nifi.cluster.node.protocol.threads` property. However, if necessary, the thread pool will increase the number of active threads to the limit
set by this property.
** nifi.zookeeper.connect.string - The Connect String that is needed to connect to Apache ZooKeeper. This is a comma-separted list
of hostname:port pairs. For example, localhost:2181,localhost:2182,localhost:2183. This should contain a list of all ZooKeeper
instances in the ZooKeeper quorum.

View File

@ -23,6 +23,36 @@ import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response.Status;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
@ -49,35 +79,6 @@ import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ThreadPoolRequestReplicator implements RequestReplicator {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
@ -104,22 +105,24 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
/**
* Creates an instance using a connection timeout and read timeout of 3 seconds
*
* @param numThreads the number of threads to use when parallelizing requests
* @param corePoolSize core size of the thread pool
* @param maxPoolSize the max number of threads in the thread pool
* @param client a client for making requests
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
* @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
* @param nifiProperties properties
*/
public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) {
this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
}
/**
* Creates an instance.
*
* @param numThreads the number of threads to use when parallelizing requests
* @param corePoolSize core size of the thread pool
* @param maxPoolSize the max number of threads in the thread pool
* @param client a client for making requests
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
* @param connectionTimeout the connection timeout specified in milliseconds
@ -128,11 +131,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
* @param nifiProperties properties
*/
public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback,
final EventReporter eventReporter, final NiFiProperties nifiProperties) {
if (numThreads <= 0) {
throw new IllegalArgumentException("The number of threads must be greater than zero.");
if (corePoolSize <= 0) {
throw new IllegalArgumentException("The Core Pool Size must be greater than zero.");
} else if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("Max Pool Size must be >= Core Pool Size.");
} else if (client == null) {
throw new IllegalArgumentException("Client may not be null.");
}
@ -150,12 +155,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
final AtomicInteger threadId = new AtomicInteger(0);
executorService = Executors.newFixedThreadPool(numThreads, r -> {
final ThreadFactory threadFactory = r -> {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Replicate Request Thread-" + threadId.incrementAndGet());
return t;
});
};
executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override

View File

@ -44,12 +44,13 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<Threa
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
final RequestCompletionCallback requestCompletionCallback = applicationContext.getBean("clusterCoordinator", RequestCompletionCallback.class);
final int numThreads = nifiProperties.getClusterNodeProtocolThreads();
final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize();
final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize();
final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties));
final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout();
final String readTimeout = nifiProperties.getClusterNodeReadTimeout();
replicator = new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator,
replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, jerseyClient, clusterCoordinator,
connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties);
}

View File

@ -231,8 +231,8 @@ public class TestThreadPoolRequestReplicator {
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
final AtomicInteger requestCount = new AtomicInteger(0);
final ThreadPoolRequestReplicator replicator
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@ -303,8 +303,8 @@ public class TestThreadPoolRequestReplicator {
nodeMap.put(NodeConnectionState.CONNECTING, otherState);
Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
final ThreadPoolRequestReplicator replicator
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
boolean indicateReplicated, boolean verify) {
@ -361,8 +361,8 @@ public class TestThreadPoolRequestReplicator {
final ClusterCoordinator coordinator = createClusterCoordinator();
final AtomicInteger requestCount = new AtomicInteger(0);
final ThreadPoolRequestReplicator replicator
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@ -572,7 +572,7 @@ public class TestThreadPoolRequestReplicator {
private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) {
final ClusterCoordinator coordinator = createClusterCoordinator();
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {

View File

@ -156,6 +156,7 @@
<nifi.cluster.node.address />
<nifi.cluster.node.protocol.port />
<nifi.cluster.node.protocol.threads>10</nifi.cluster.node.protocol.threads>
<nifi.cluster.node.protocol.max.threads>50</nifi.cluster.node.protocol.max.threads>
<nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
<nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>
<nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>

View File

@ -175,6 +175,7 @@ nifi.cluster.is.node=${nifi.cluster.is.node}
nifi.cluster.node.address=${nifi.cluster.node.address}
nifi.cluster.node.protocol.port=${nifi.cluster.node.protocol.port}
nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads}
nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}

View File

@ -52,8 +52,12 @@ public class TimerFilter implements Filter {
} finally {
final long stop = System.nanoTime();
final String requestId = ((HttpServletRequest) req).getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
logger.debug("{} {} from {} request duration for Request ID {}: {} millis", request.getMethod(), request.getRequestURL().toString(),
req.getRemoteHost(), requestId, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
final String replicationHeader = ((HttpServletRequest) req).getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER);
final boolean validationPhase = RequestReplicator.NODE_CONTINUE.equals(replicationHeader);
final String requestDescription = validationPhase ? "Validation Phase of Request " + requestId : "Request ID " + requestId;
logger.debug("{} {} from {} duration for {}: {} millis", request.getMethod(), request.getRequestURL().toString(),
req.getRemoteHost(), requestDescription, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
}
}