SOLR-13975, SOLR-13896: ConcurrentUpdateSolrClient connection stall prevention.

This commit is contained in:
Andrzej Bialecki 2019-12-10 21:45:43 +01:00
parent 6d03baa485
commit c4f0c33638
10 changed files with 216 additions and 13 deletions

View File

@ -230,6 +230,8 @@ Bug Fixes
* SOLR-13806: SolrJ QueryResponse._explainMap is incorrectly typed. (Guna Sekhar Dorai, ab) * SOLR-13806: SolrJ QueryResponse._explainMap is incorrectly typed. (Guna Sekhar Dorai, ab)
* SOLR-13975, SOLR-13896: ConcurrentUpdateSolrClient connection stall prevention. (ab, caomanhdat)
Other Changes Other Changes
--------------------- ---------------------

View File

@ -228,6 +228,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@Override @Override
public int addDoc(AddUpdateCommand cmd) throws IOException { public int addDoc(AddUpdateCommand cmd) throws IOException {
TestInjection.injectDirectUpdateLatch();
try { try {
return addDoc0(cmd); return addDoc0(cmd);
} catch (SolrException e) { } catch (SolrException e) {
@ -414,6 +415,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// we don't return the number of docs deleted because it's not always possible to quickly know that info. // we don't return the number of docs deleted because it's not always possible to quickly know that info.
@Override @Override
public void delete(DeleteUpdateCommand cmd) throws IOException { public void delete(DeleteUpdateCommand cmd) throws IOException {
TestInjection.injectDirectUpdateLatch();
deleteByIdCommands.increment(); deleteByIdCommands.increment();
deleteByIdCommandsCumulative.mark(); deleteByIdCommandsCumulative.mark();
@ -477,6 +479,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// we don't return the number of docs deleted because it's not always possible to quickly know that info. // we don't return the number of docs deleted because it's not always possible to quickly know that info.
@Override @Override
public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException { public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
TestInjection.injectDirectUpdateLatch();
deleteByQueryCommands.increment(); deleteByQueryCommands.increment();
deleteByQueryCommandsCumulative.mark(); deleteByQueryCommandsCumulative.mark();
boolean madeIt=false; boolean madeIt=false;
@ -542,6 +545,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@Override @Override
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException { public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
TestInjection.injectDirectUpdateLatch();
mergeIndexesCommands.mark(); mergeIndexesCommands.mark();
int rc; int rc;
@ -605,6 +609,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@Override @Override
public void commit(CommitUpdateCommand cmd) throws IOException { public void commit(CommitUpdateCommand cmd) throws IOException {
TestInjection.injectDirectUpdateLatch();
if (cmd.prepareCommit) { if (cmd.prepareCommit) {
prepareCommit(cmd); prepareCommit(cmd);
return; return;
@ -754,6 +759,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
*/ */
@Override @Override
public void rollback(RollbackUpdateCommand cmd) throws IOException { public void rollback(RollbackUpdateCommand cmd) throws IOException {
TestInjection.injectDirectUpdateLatch();
if (core.getCoreContainer().isZooKeeperAware()) { if (core.getCoreContainer().isZooKeeperAware()) {
throw new UnsupportedOperationException("Rollback is currently not supported in SolrCloud mode. (SOLR-4895)"); throw new UnsupportedOperationException("Rollback is currently not supported in SolrCloud mode. (SOLR-4895)");
} }

View File

@ -93,10 +93,12 @@ public class SolrCmdDistributor implements Closeable {
public void finish() { public void finish() {
try { try {
assert ! finished : "lifecycle sanity check"; assert !finished : "lifecycle sanity check";
finished = true; finished = true;
blockAndDoRetries(); blockAndDoRetries();
} catch (IOException e) {
log.warn("Unable to finish sending updates", e);
} finally { } finally {
clients.shutdown(); clients.shutdown();
} }
@ -106,7 +108,7 @@ public class SolrCmdDistributor implements Closeable {
clients.shutdown(); clients.shutdown();
} }
private void doRetriesIfNeeded() { private void doRetriesIfNeeded() throws IOException {
// NOTE: retries will be forwards to a single url // NOTE: retries will be forwards to a single url
List<Error> errors = new ArrayList<>(this.errors); List<Error> errors = new ArrayList<>(this.errors);
@ -259,7 +261,7 @@ public class SolrCmdDistributor implements Closeable {
} }
public void blockAndDoRetries() { public void blockAndDoRetries() throws IOException {
clients.blockUntilFinished(); clients.blockUntilFinished();
// wait for any async commits to complete // wait for any async commits to complete
@ -284,7 +286,7 @@ public class SolrCmdDistributor implements Closeable {
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher); : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
} }
private void submit(final Req req, boolean isCommit) { private void submit(final Req req, boolean isCommit) throws IOException {
// Copy user principal from the original request to the new update request, for later authentication interceptor use // Copy user principal from the original request to the new update request, for later authentication interceptor use
if (SolrRequestInfo.getRequestInfo() != null) { if (SolrRequestInfo.getRequestInfo() != null) {
req.uReq.setUserPrincipal(SolrRequestInfo.getRequestInfo().getReq().getUserPrincipal()); req.uReq.setUserPrincipal(SolrRequestInfo.getRequestInfo().getReq().getUserPrincipal());

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.solr.update; package org.apache.solr.update;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
@ -38,6 +39,8 @@ public class StreamingSolrClients {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1); private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1);
// should be less than solr.jetty.http.idleTimeout
private final int pollQueueTime = Integer.getInteger("solr.cloud.client.pollQueueTime", 10000);
private Http2SolrClient httpClient; private Http2SolrClient httpClient;
@ -72,14 +75,14 @@ public class StreamingSolrClients {
.withExecutorService(updateExecutor) .withExecutorService(updateExecutor)
.alwaysStreamDeletes() .alwaysStreamDeletes()
.build(); .build();
client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created client.setPollQueueTime(pollQueueTime); // minimize connections created
solrClients.put(url, client); solrClients.put(url, client);
} }
return client; return client;
} }
public synchronized void blockUntilFinished() { public synchronized void blockUntilFinished() throws IOException {
for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) { for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
client.blockUntilFinished(); client.blockUntilFinished();
} }

View File

@ -126,6 +126,8 @@ public class TestInjection {
public volatile static CountDownLatch splitLatch = null; public volatile static CountDownLatch splitLatch = null;
public volatile static CountDownLatch directUpdateLatch = null;
public volatile static CountDownLatch reindexLatch = null; public volatile static CountDownLatch reindexLatch = null;
public volatile static String reindexFailure = null; public volatile static String reindexFailure = null;
@ -164,6 +166,7 @@ public class TestInjection {
splitFailureBeforeReplicaCreation = null; splitFailureBeforeReplicaCreation = null;
splitFailureAfterReplicaCreation = null; splitFailureAfterReplicaCreation = null;
splitLatch = null; splitLatch = null;
directUpdateLatch = null;
reindexLatch = null; reindexLatch = null;
reindexFailure = null; reindexFailure = null;
prepRecoveryOpPauseForever = null; prepRecoveryOpPauseForever = null;
@ -435,6 +438,18 @@ public class TestInjection {
return true; return true;
} }
public static boolean injectDirectUpdateLatch() {
if (directUpdateLatch != null) {
try {
log.info("Waiting in DirectUpdateHandler2 for up to 60s");
return directUpdateLatch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return true;
}
public static boolean injectReindexFailure() { public static boolean injectReindexFailure() {
if (reindexFailure != null) { if (reindexFailure != null) {
Random rand = random(); Random rand = random();

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.List; import java.util.List;
@ -129,6 +130,8 @@ class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
stop = true; stop = true;
try { try {
cusc.blockUntilFinished(); cusc.blockUntilFinished();
} catch (IOException e) {
log.warn("Exception waiting for the indexing client to finish", e);
} finally { } finally {
cusc.shutdownNow(); cusc.shutdownNow();
} }

View File

@ -24,6 +24,7 @@ import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
@ -56,6 +57,7 @@ import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker; import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker; import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.apache.solr.util.TestInjection;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -73,11 +75,13 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
// we can't use the Randomized merge policy because the test depends on // we can't use the Randomized merge policy because the test depends on
// being able to call optimize to have all deletes expunged. // being able to call optimize to have all deletes expunged.
systemSetPropertySolrTestsMergePolicyFactory(LogDocMergePolicyFactory.class.getName()); systemSetPropertySolrTestsMergePolicyFactory(LogDocMergePolicyFactory.class.getName());
System.setProperty("solr.cloud.client.pollQueueTime", "2000");
} }
@AfterClass @AfterClass
public static void afterClass() { public static void afterClass() {
systemClearPropertySolrTestsMergePolicyFactory(); systemClearPropertySolrTestsMergePolicyFactory();
System.clearProperty("solr.cloud.client.pollQueueTime");
} }
private UpdateShardHandler updateShardHandler; private UpdateShardHandler updateShardHandler;
@ -356,6 +360,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
testDeletes(true, true); testDeletes(true, true);
testDeletes(true, false); testDeletes(true, false);
getRfFromResponseShouldNotCloseTheInputStream(); getRfFromResponseShouldNotCloseTheInputStream();
testStuckUpdates();
} }
private void testDeletes(boolean dbq, boolean withFailures) throws Exception { private void testDeletes(boolean dbq, boolean withFailures) throws Exception {
@ -859,4 +864,36 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
assertFalse(openSearcher); assertFalse(openSearcher);
} }
} }
private void testStuckUpdates() throws Exception {
TestInjection.directUpdateLatch = new CountDownLatch(1);
List<Node> nodes = new ArrayList<>();
ModifiableSolrParams params;
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
for (int i = 0; i < 3; i++) {
nodes.clear();
for (SolrClient c : clients) {
if (random().nextBoolean()) {
continue;
}
HttpSolrClient httpClient = (HttpSolrClient) c;
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
StdNode node = new StdNode(new ZkCoreNodeProps(nodeProps));
nodes.add(node);
}
AddUpdateCommand c = new AddUpdateCommand(null);
c.solrDoc = sdoc("id", id.incrementAndGet());
if (nodes.size() > 0) {
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(c, nodes, params, false);
}
}
cmdDistrib.blockAndDoRetries();
} catch (IOException e) {
assertTrue(e.toString(), e.toString().contains("processing has stalled"));
} finally {
TestInjection.directUpdateLatch.countDown();
}
}
} }

View File

@ -120,6 +120,12 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc
When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on. When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on.
`ConcurrentUpdateSolrClient` and its counterpart `ConcurrentUpdateHttp2SolrClient` implement also a stall prevention
timeout that allows requests to non-responsive nodes to fail quicker than waiting for a socket timeout.
The default value of this timeout is set to 15000 ms and can be adjusted by a system property `solr.cloud.client.stallTime`.
This value should be smaller than `solr.jetty.http.idleTimeout` (Which is 120000 ms by default) and greater than the
processing time of the largest update request.
=== Cloud Request Routing === Cloud Request Routing
The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>. The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>.

View File

@ -66,6 +66,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
private boolean shutdownClient; private boolean shutdownClient;
private boolean shutdownExecutor; private boolean shutdownExecutor;
private int pollQueueTime = 250; private int pollQueueTime = 250;
private int stallTime;
private final boolean streamDeletes; private final boolean streamDeletes;
private volatile boolean closed; private volatile boolean closed;
private volatile CountDownLatch lock = null; // used to block everything private volatile CountDownLatch lock = null; // used to block everything
@ -150,6 +151,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
this.runners = new LinkedList<>(); this.runners = new LinkedList<>();
this.streamDeletes = builder.streamDeletes; this.streamDeletes = builder.streamDeletes;
this.basePath = builder.baseSolrUrl; this.basePath = builder.baseSolrUrl;
this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000);
if (builder.executorService != null) { if (builder.executorService != null) {
this.scheduler = builder.executorService; this.scheduler = builder.executorService;
@ -212,6 +214,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
try { try {
Update update; Update update;
notifyQueueAndRunnersIfEmptyQueue(); notifyQueueAndRunnersIfEmptyQueue();
//log.info("-- polling 1");
update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
if (update == null) { if (update == null) {
@ -385,6 +388,8 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
Update update = new Update(req, collection); Update update = new Update(req, collection);
boolean success = queue.offer(update); boolean success = queue.offer(update);
long lastStallTime = -1;
int lastQueueSize = -1;
for (;;) { for (;;) {
synchronized (runners) { synchronized (runners) {
// see if queue is half full and we can add more runners // see if queue is half full and we can add more runners
@ -418,6 +423,25 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
if (!success) { if (!success) {
success = queue.offer(update, 100, TimeUnit.MILLISECONDS); success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
} }
if (!success) {
// stall prevention
int currentQueueSize = queue.size();
if (currentQueueSize != lastQueueSize) {
// there's still some progress in processing the queue - not stalled
lastQueueSize = currentQueueSize;
lastStallTime = -1;
} else {
if (lastStallTime == -1) {
// mark a stall but keep trying
lastStallTime = System.nanoTime();
} else {
long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
if (currentStallTime > stallTime) {
throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue.");
}
}
}
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("interrupted", e); log.error("interrupted", e);
@ -430,13 +454,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
return dummy; return dummy;
} }
public synchronized void blockUntilFinished() { public synchronized void blockUntilFinished() throws IOException {
lock = new CountDownLatch(1); lock = new CountDownLatch(1);
try { try {
waitForEmptyQueue(); waitForEmptyQueue();
interruptRunnerThreadsPolling(); interruptRunnerThreadsPolling();
long lastStallTime = -1;
int lastQueueSize = -1;
synchronized (runners) { synchronized (runners) {
// NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run, // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
@ -452,6 +479,23 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
// Need to check if the queue is empty before really considering this is finished (SOLR-4260) // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
int queueSize = queue.size(); int queueSize = queue.size();
// stall prevention
if (lastQueueSize != queueSize) {
// init, or no stall
lastQueueSize = queueSize;
lastStallTime = -1;
} else {
if (lastStallTime == -1) {
lastStallTime = System.nanoTime();
} else {
long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
if (currentStallTime > stallTime) {
throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
// Thread.currentThread().interrupt();
// break;
}
}
}
if (queueSize > 0 && runners.isEmpty()) { if (queueSize > 0 && runners.isEmpty()) {
// TODO: can this still happen? // TODO: can this still happen?
log.warn("No more runners, but queue still has " + log.warn("No more runners, but queue still has " +
@ -485,9 +529,11 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
} }
} }
private void waitForEmptyQueue() { private void waitForEmptyQueue() throws IOException {
boolean threadInterrupted = Thread.currentThread().isInterrupted(); boolean threadInterrupted = Thread.currentThread().isInterrupted();
long lastStallTime = -1;
int lastQueueSize = -1;
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
if (scheduler.isTerminated()) { if (scheduler.isTerminated()) {
log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. " log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. "
@ -513,6 +559,24 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
queue.size()); queue.size());
} }
} }
int currentQueueSize = queue.size();
// stall prevention
if (currentQueueSize != lastQueueSize) {
lastQueueSize = currentQueueSize;
lastStallTime = -1;
} else {
lastQueueSize = currentQueueSize;
if (lastStallTime == -1) {
lastStallTime = System.nanoTime();
} else {
long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
if (currentStallTime > stallTime) {
throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process.");
// threadInterrupted = true;
// break;
}
}
}
} }
if (threadInterrupted) { if (threadInterrupted) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -598,6 +662,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
*/ */
public void setPollQueueTime(int pollQueueTime) { public void setPollQueueTime(int pollQueueTime) {
this.pollQueueTime = pollQueueTime; this.pollQueueTime = pollQueueTime;
this.stallTime = this.pollQueueTime * 3 / 2;
} }
/** /**

View File

@ -85,6 +85,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
final int threadCount; final int threadCount;
boolean shutdownExecutor = false; boolean shutdownExecutor = false;
int pollQueueTime = 250; int pollQueueTime = 250;
int stallTime;
private final boolean streamDeletes; private final boolean streamDeletes;
private boolean internalHttpClient; private boolean internalHttpClient;
private volatile Integer connectionTimeout; private volatile Integer connectionTimeout;
@ -132,7 +133,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
this.streamDeletes = builder.streamDeletes; this.streamDeletes = builder.streamDeletes;
this.connectionTimeout = builder.connectionTimeoutMillis; this.connectionTimeout = builder.connectionTimeoutMillis;
this.soTimeout = builder.socketTimeoutMillis; this.soTimeout = builder.socketTimeoutMillis;
this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000);
if (builder.executorService != null) { if (builder.executorService != null) {
this.scheduler = builder.executorService; this.scheduler = builder.executorService;
this.shutdownExecutor = false; this.shutdownExecutor = false;
@ -518,6 +521,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
Update update = new Update(req, collection); Update update = new Update(req, collection);
boolean success = queue.offer(update); boolean success = queue.offer(update);
long lastStallTime = -1;
int lastQueueSize = -1;
for (;;) { for (;;) {
synchronized (runners) { synchronized (runners) {
// see if queue is half full and we can add more runners // see if queue is half full and we can add more runners
@ -551,6 +556,25 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
if (!success) { if (!success) {
success = queue.offer(update, 100, TimeUnit.MILLISECONDS); success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
} }
if (!success) {
// stall prevention
int currentQueueSize = queue.size();
if (currentQueueSize != lastQueueSize) {
// there's still some progress in processing the queue - not stalled
lastQueueSize = currentQueueSize;
lastStallTime = -1;
} else {
if (lastStallTime == -1) {
// mark a stall but keep trying
lastStallTime = System.nanoTime();
} else {
long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
if (currentStallTime > stallTime) {
throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue.");
}
}
}
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("interrupted", e); log.error("interrupted", e);
@ -563,13 +587,16 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
return dummy; return dummy;
} }
public synchronized void blockUntilFinished() { public synchronized void blockUntilFinished() throws IOException {
lock = new CountDownLatch(1); lock = new CountDownLatch(1);
try { try {
waitForEmptyQueue(); waitForEmptyQueue();
interruptRunnerThreadsPolling(); interruptRunnerThreadsPolling();
long lastStallTime = -1;
int lastQueueSize = -1;
synchronized (runners) { synchronized (runners) {
// NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run, // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
@ -587,6 +614,23 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
// Need to check if the queue is empty before really considering this is finished (SOLR-4260) // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
int queueSize = queue.size(); int queueSize = queue.size();
// stall prevention
if (lastQueueSize != queueSize) {
// init, or no stall
lastQueueSize = queueSize;
lastStallTime = -1;
} else {
if (lastStallTime == -1) {
lastStallTime = System.nanoTime();
} else {
long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
if (currentStallTime > stallTime) {
throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
// Thread.currentThread().interrupt();
// break;
}
}
}
if (queueSize > 0 && runners.isEmpty()) { if (queueSize > 0 && runners.isEmpty()) {
// TODO: can this still happen? // TODO: can this still happen?
log.warn("No more runners, but queue still has " + log.warn("No more runners, but queue still has " +
@ -620,9 +664,11 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
} }
} }
private void waitForEmptyQueue() { private void waitForEmptyQueue() throws IOException {
boolean threadInterrupted = Thread.currentThread().isInterrupted(); boolean threadInterrupted = Thread.currentThread().isInterrupted();
long lastStallTime = -1;
int lastQueueSize = -1;
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet(); if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet();
if (scheduler.isTerminated()) { if (scheduler.isTerminated()) {
@ -643,12 +689,30 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
try { try {
queue.wait(250); queue.wait(250);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// If we set the thread as interrupted again, the next time the wait it's called i t's going to return immediately // If we set the thread as interrupted again, the next time the wait it's called it's going to return immediately
threadInterrupted = true; threadInterrupted = true;
log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.", log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.",
queue.size()); queue.size());
} }
} }
int currentQueueSize = queue.size();
// stall prevention
if (currentQueueSize != lastQueueSize) {
lastQueueSize = currentQueueSize;
lastStallTime = -1;
} else {
lastQueueSize = currentQueueSize;
if (lastStallTime == -1) {
lastStallTime = System.nanoTime();
} else {
long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
if (currentStallTime > stallTime) {
throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process.");
// threadInterrupted = true;
// break;
}
}
}
} }
if (threadInterrupted) { if (threadInterrupted) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();