mirror of https://github.com/apache/nifi.git
NIFI-4153: Use a LinkedBlockingQueue instead of a SynchronousQueue for Request Replicator's thread pool so that requests will queue when all threads are active, instead of throwing an Exception. This closes #1980
This commit is contained in:
parent
e6b166a3a2
commit
cff81c0cd2
|
@ -17,12 +17,6 @@
|
||||||
|
|
||||||
package org.apache.nifi.cluster.coordination.http.replication;
|
package org.apache.nifi.cluster.coordination.http.replication;
|
||||||
|
|
||||||
import com.sun.jersey.api.client.Client;
|
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
|
||||||
import com.sun.jersey.api.client.WebResource;
|
|
||||||
import com.sun.jersey.api.client.config.ClientConfig;
|
|
||||||
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
|
|
||||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -36,10 +30,9 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -49,10 +42,12 @@ import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.ws.rs.HttpMethod;
|
import javax.ws.rs.HttpMethod;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.MultivaluedMap;
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
import org.apache.nifi.authorization.AccessDeniedException;
|
import org.apache.nifi.authorization.AccessDeniedException;
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
|
@ -79,6 +74,13 @@ import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.sun.jersey.api.client.Client;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
import com.sun.jersey.api.client.WebResource;
|
||||||
|
import com.sun.jersey.api.client.config.ClientConfig;
|
||||||
|
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
|
||||||
|
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||||
|
|
||||||
public class ThreadPoolRequestReplicator implements RequestReplicator {
|
public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
|
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
|
||||||
|
@ -92,7 +94,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
private final RequestCompletionCallback callback;
|
private final RequestCompletionCallback callback;
|
||||||
private final ClusterCoordinator clusterCoordinator;
|
private final ClusterCoordinator clusterCoordinator;
|
||||||
|
|
||||||
private ExecutorService executorService;
|
private ThreadPoolExecutor executorService;
|
||||||
private ScheduledExecutorService maintenanceExecutor;
|
private ScheduledExecutorService maintenanceExecutor;
|
||||||
|
|
||||||
private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap<>();
|
||||||
|
@ -162,7 +164,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
return t;
|
return t;
|
||||||
};
|
};
|
||||||
|
|
||||||
executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
|
executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
|
||||||
|
|
||||||
maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
|
maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -444,6 +446,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", monitor, method, uri, t);
|
logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", monitor, method, uri, t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (response != null) {
|
||||||
|
final RuntimeException failure = (t instanceof RuntimeException) ? (RuntimeException) t : new RuntimeException("Failed to submit Replication Request to background thread", t);
|
||||||
|
response.setFailure(failure, new NodeIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue