NIFI-4143 - externalize MAX_CONCURRENT_REQUESTS. This closes #1962

This commit is contained in:
Pierre Villard 2017-06-29 20:27:55 +02:00 committed by Matt Gilman
parent ba56774fa1
commit a3b72f1bb7
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
6 changed files with 23 additions and 11 deletions

View File

@ -177,6 +177,7 @@ public abstract class NiFiProperties {
public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads"; public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads";
public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
public static final String CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = "nifi.cluster.node.max.concurrent.requests";
public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file";
public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time"; public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time";
public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates"; public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates";
@ -244,6 +245,7 @@ public abstract class NiFiProperties {
public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec"; public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec";
public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec"; public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec";
public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec"; public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec";
public static final int DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 100;
// cluster node defaults // cluster node defaults
public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10; public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10;
@ -561,6 +563,10 @@ public abstract class NiFiProperties {
return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS); return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS);
} }
public int getClusterNodeMaxConcurrentRequests() {
return getIntegerProperty(CLUSTER_NODE_MAX_CONCURRENT_REQUESTS, DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS);
}
public File getWebWorkingDirectory() { public File getWebWorkingDirectory() {
return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR)); return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR));
} }

View File

@ -84,11 +84,11 @@ import com.sun.jersey.core.util.MultivaluedMapImpl;
public class ThreadPoolRequestReplicator implements RequestReplicator { public class ThreadPoolRequestReplicator implements RequestReplicator {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
private static final int MAX_CONCURRENT_REQUESTS = 100;
private final Client client; // the client to use for issuing requests private final Client client; // the client to use for issuing requests
private final int connectionTimeoutMs; // connection timeout per node request private final int connectionTimeoutMs; // connection timeout per node request
private final int readTimeoutMs; // read timeout per node request private final int readTimeoutMs; // read timeout per node request
private final int maxConcurrentRequests; // maximum number of concurrent requests
private final HttpResponseMapper responseMapper; private final HttpResponseMapper responseMapper;
private final EventReporter eventReporter; private final EventReporter eventReporter;
private final RequestCompletionCallback callback; private final RequestCompletionCallback callback;
@ -109,15 +109,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* *
* @param corePoolSize core size of the thread pool * @param corePoolSize core size of the thread pool
* @param maxPoolSize the max number of threads in the thread pool * @param maxPoolSize the max number of threads in the thread pool
* @param maxConcurrentRequests maximum number of concurrent requests
* @param client a client for making requests * @param client a client for making requests
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
* @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
* @param nifiProperties properties * @param nifiProperties properties
*/ */
public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator,
final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) {
this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); this(corePoolSize, maxPoolSize, maxConcurrentRequests, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
} }
/** /**
@ -125,6 +126,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* *
* @param corePoolSize core size of the thread pool * @param corePoolSize core size of the thread pool
* @param maxPoolSize the max number of threads in the thread pool * @param maxPoolSize the max number of threads in the thread pool
* @param maxConcurrentRequests maximum number of concurrent requests
* @param client a client for making requests * @param client a client for making requests
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
* @param connectionTimeout the connection timeout specified in milliseconds * @param connectionTimeout the connection timeout specified in milliseconds
@ -133,7 +135,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
* @param nifiProperties properties * @param nifiProperties properties
*/ */
public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator,
final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback,
final EventReporter eventReporter, final NiFiProperties nifiProperties) { final EventReporter eventReporter, final NiFiProperties nifiProperties) {
if (corePoolSize <= 0) { if (corePoolSize <= 0) {
@ -148,6 +150,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
this.clusterCoordinator = clusterCoordinator; this.clusterCoordinator = clusterCoordinator;
this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
this.maxConcurrentRequests = maxConcurrentRequests;
this.responseMapper = new StandardHttpResponseMapper(nifiProperties); this.responseMapper = new StandardHttpResponseMapper(nifiProperties);
this.eventReporter = eventReporter; this.eventReporter = eventReporter;
this.callback = callback; this.callback = callback;
@ -361,11 +364,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
} }
int numRequests = responseMap.size(); int numRequests = responseMap.size();
if (numRequests >= MAX_CONCURRENT_REQUESTS) { if (numRequests >= maxConcurrentRequests) {
numRequests = purgeExpiredRequests(); numRequests = purgeExpiredRequests();
} }
if (numRequests >= MAX_CONCURRENT_REQUESTS) { if (numRequests >= maxConcurrentRequests) {
final Map<String, Long> countsByUri = responseMap.values().stream().collect( final Map<String, Long> countsByUri = responseMap.values().stream().collect(
Collectors.groupingBy( Collectors.groupingBy(
StandardAsyncClusterResponse::getURIPath, StandardAsyncClusterResponse::getURIPath,

View File

@ -46,11 +46,12 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<Threa
final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize(); final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize();
final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize(); final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize();
final int maxConcurrentRequests = nifiProperties.getClusterNodeMaxConcurrentRequests();
final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties)); final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties));
final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout(); final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout();
final String readTimeout = nifiProperties.getClusterNodeReadTimeout(); final String readTimeout = nifiProperties.getClusterNodeReadTimeout();
replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, jerseyClient, clusterCoordinator, replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, maxConcurrentRequests, jerseyClient, clusterCoordinator,
connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties); connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties);
} }

View File

@ -233,7 +233,7 @@ public class TestThreadPoolRequestReplicator {
final AtomicInteger requestCount = new AtomicInteger(0); final AtomicInteger requestCount = new AtomicInteger(0);
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override @Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@ -305,7 +305,7 @@ public class TestThreadPoolRequestReplicator {
Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override @Override
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
boolean indicateReplicated, boolean verify) { boolean indicateReplicated, boolean verify) {
@ -363,7 +363,7 @@ public class TestThreadPoolRequestReplicator {
final ClusterCoordinator coordinator = createClusterCoordinator(); final ClusterCoordinator coordinator = createClusterCoordinator();
final AtomicInteger requestCount = new AtomicInteger(0); final AtomicInteger requestCount = new AtomicInteger(0);
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override @Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@ -573,7 +573,7 @@ public class TestThreadPoolRequestReplicator {
private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) { private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) {
final ClusterCoordinator coordinator = createClusterCoordinator(); final ClusterCoordinator coordinator = createClusterCoordinator();
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null); final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
@Override @Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {

View File

@ -160,6 +160,7 @@
<nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size> <nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
<nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout> <nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>
<nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout> <nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>
<nifi.cluster.node.max.concurrent.requests>100</nifi.cluster.node.max.concurrent.requests>
<nifi.cluster.firewall.file /> <nifi.cluster.firewall.file />
<nifi.cluster.flow.election.max.wait.time>5 mins</nifi.cluster.flow.election.max.wait.time> <nifi.cluster.flow.election.max.wait.time>5 mins</nifi.cluster.flow.election.max.wait.time>
<nifi.cluster.flow.election.max.candidates /> <nifi.cluster.flow.election.max.candidates />

View File

@ -179,6 +179,7 @@ nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads}
nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size} nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout} nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout} nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}
nifi.cluster.node.max.concurrent.requests=${nifi.cluster.node.max.concurrent.requests}
nifi.cluster.firewall.file=${nifi.cluster.firewall.file} nifi.cluster.firewall.file=${nifi.cluster.firewall.file}
nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time} nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time}
nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates} nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates}