From cff81c0cd24aa044602bbcc5ffc4c85918e9d484 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 5 Jul 2017 16:35:04 -0400 Subject: [PATCH] NIFI-4153: Use a LinkedBlockingQueue instead of a SynchronousQueue for Request Replicator's thread pool so that requests will queue when all threads are active, instead of throwing an Exception. This closes #1980 --- .../ThreadPoolRequestReplicator.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index e5a46f894b..c38b3e9c9f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -17,12 +17,6 @@ package org.apache.nifi.cluster.coordination.http.replication; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; -import com.sun.jersey.core.util.MultivaluedMapImpl; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; @@ -36,10 +30,9 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -49,10 +42,12 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; + import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response.Status; + import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; @@ -79,6 +74,13 @@ import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; +import com.sun.jersey.core.util.MultivaluedMapImpl; + public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); @@ -92,7 +94,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final RequestCompletionCallback callback; private final ClusterCoordinator clusterCoordinator; - private ExecutorService executorService; + private ThreadPoolExecutor executorService; private ScheduledExecutorService maintenanceExecutor; private final ConcurrentMap responseMap = new ConcurrentHashMap<>(); @@ -162,7 +164,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { return t; }; - executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); + executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override @@ -444,6 +446,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", monitor, method, uri, t); } + if (response != null) { + final RuntimeException failure = (t instanceof RuntimeException) ? (RuntimeException) t : new RuntimeException("Failed to submit Replication Request to background thread", t); + response.setFailure(failure, new NodeIdentifier()); + } + throw t; } }