diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0b9af781ffc..20679a07d23 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -230,6 +230,8 @@ Bug Fixes * SOLR-13806: SolrJ QueryResponse._explainMap is incorrectly typed. (Guna Sekhar Dorai, ab) +* SOLR-13975, SOLR-13896: ConcurrentUpdateSolrClient connection stall prevention. (ab, caomanhdat) + Other Changes --------------------- diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java index b6232ae6c2a..b00dc16e574 100644 --- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -228,6 +228,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState @Override public int addDoc(AddUpdateCommand cmd) throws IOException { + TestInjection.injectDirectUpdateLatch(); try { return addDoc0(cmd); } catch (SolrException e) { @@ -414,6 +415,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState // we don't return the number of docs deleted because it's not always possible to quickly know that info. @Override public void delete(DeleteUpdateCommand cmd) throws IOException { + TestInjection.injectDirectUpdateLatch(); deleteByIdCommands.increment(); deleteByIdCommandsCumulative.mark(); @@ -477,6 +479,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState // we don't return the number of docs deleted because it's not always possible to quickly know that info. @Override public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException { + TestInjection.injectDirectUpdateLatch(); deleteByQueryCommands.increment(); deleteByQueryCommandsCumulative.mark(); boolean madeIt=false; @@ -542,6 +545,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState @Override public int mergeIndexes(MergeIndexesCommand cmd) throws IOException { + TestInjection.injectDirectUpdateLatch(); mergeIndexesCommands.mark(); int rc; @@ -605,6 +609,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState @Override public void commit(CommitUpdateCommand cmd) throws IOException { + TestInjection.injectDirectUpdateLatch(); if (cmd.prepareCommit) { prepareCommit(cmd); return; @@ -754,6 +759,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState */ @Override public void rollback(RollbackUpdateCommand cmd) throws IOException { + TestInjection.injectDirectUpdateLatch(); if (core.getCoreContainer().isZooKeeperAware()) { throw new UnsupportedOperationException("Rollback is currently not supported in SolrCloud mode. (SOLR-4895)"); } diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 5098cd1daa7..e4727dab2af 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -93,10 +93,12 @@ public class SolrCmdDistributor implements Closeable { public void finish() { try { - assert ! finished : "lifecycle sanity check"; + assert !finished : "lifecycle sanity check"; finished = true; - + blockAndDoRetries(); + } catch (IOException e) { + log.warn("Unable to finish sending updates", e); } finally { clients.shutdown(); } @@ -106,7 +108,7 @@ public class SolrCmdDistributor implements Closeable { clients.shutdown(); } - private void doRetriesIfNeeded() { + private void doRetriesIfNeeded() throws IOException { // NOTE: retries will be forwards to a single url List errors = new ArrayList<>(this.errors); @@ -259,7 +261,7 @@ public class SolrCmdDistributor implements Closeable { } - public void blockAndDoRetries() { + public void blockAndDoRetries() throws IOException { clients.blockUntilFinished(); // wait for any async commits to complete @@ -284,7 +286,7 @@ public class SolrCmdDistributor implements Closeable { : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher); } - private void submit(final Req req, boolean isCommit) { + private void submit(final Req req, boolean isCommit) throws IOException { // Copy user principal from the original request to the new update request, for later authentication interceptor use if (SolrRequestInfo.getRequestInfo() != null) { req.uReq.setUserPrincipal(SolrRequestInfo.getRequestInfo().getReq().getUserPrincipal()); diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java index 9d06c95ab5e..2a22ee01785 100644 --- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java +++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java @@ -16,6 +16,7 @@ */ package org.apache.solr.update; +import java.io.IOException; import java.io.InputStream; import java.lang.invoke.MethodHandles; import java.util.ArrayList; @@ -38,6 +39,8 @@ public class StreamingSolrClients { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1); + // should be less than solr.jetty.http.idleTimeout + private final int pollQueueTime = Integer.getInteger("solr.cloud.client.pollQueueTime", 10000); private Http2SolrClient httpClient; @@ -72,14 +75,14 @@ public class StreamingSolrClients { .withExecutorService(updateExecutor) .alwaysStreamDeletes() .build(); - client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created + client.setPollQueueTime(pollQueueTime); // minimize connections created solrClients.put(url, client); } return client; } - public synchronized void blockUntilFinished() { + public synchronized void blockUntilFinished() throws IOException { for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) { client.blockUntilFinished(); } diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java index 39c849b7208..9af4a8f625a 100644 --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java @@ -126,6 +126,8 @@ public class TestInjection { public volatile static CountDownLatch splitLatch = null; + public volatile static CountDownLatch directUpdateLatch = null; + public volatile static CountDownLatch reindexLatch = null; public volatile static String reindexFailure = null; @@ -164,6 +166,7 @@ public class TestInjection { splitFailureBeforeReplicaCreation = null; splitFailureAfterReplicaCreation = null; splitLatch = null; + directUpdateLatch = null; reindexLatch = null; reindexFailure = null; prepRecoveryOpPauseForever = null; @@ -435,6 +438,18 @@ public class TestInjection { return true; } + public static boolean injectDirectUpdateLatch() { + if (directUpdateLatch != null) { + try { + log.info("Waiting in DirectUpdateHandler2 for up to 60s"); + return directUpdateLatch.await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return true; + } + public static boolean injectReindexFailure() { if (reindexFailure != null) { Random rand = random(); diff --git a/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java b/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java index 78dc1dea540..1c5d470beb2 100644 --- a/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java +++ b/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java @@ -16,6 +16,7 @@ */ package org.apache.solr.cloud; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.net.ConnectException; import java.util.List; @@ -129,6 +130,8 @@ class FullThrottleStoppableIndexingThread extends StoppableIndexingThread { stop = true; try { cusc.blockUntilFinished(); + } catch (IOException e) { + log.warn("Exception waiting for the indexing client to finish", e); } finally { cusc.shutdownNow(); } diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java index 6926c6fab94..f6d7087d1a9 100644 --- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java +++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java @@ -24,6 +24,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.xml.parsers.ParserConfigurationException; @@ -56,6 +57,7 @@ import org.apache.solr.update.SolrCmdDistributor.StdNode; import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker; import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker; +import org.apache.solr.util.TestInjection; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -73,11 +75,13 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { // we can't use the Randomized merge policy because the test depends on // being able to call optimize to have all deletes expunged. systemSetPropertySolrTestsMergePolicyFactory(LogDocMergePolicyFactory.class.getName()); + System.setProperty("solr.cloud.client.pollQueueTime", "2000"); } @AfterClass public static void afterClass() { systemClearPropertySolrTestsMergePolicyFactory(); + System.clearProperty("solr.cloud.client.pollQueueTime"); } private UpdateShardHandler updateShardHandler; @@ -356,6 +360,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { testDeletes(true, true); testDeletes(true, false); getRfFromResponseShouldNotCloseTheInputStream(); + testStuckUpdates(); } private void testDeletes(boolean dbq, boolean withFailures) throws Exception { @@ -859,4 +864,36 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { assertFalse(openSearcher); } } + + private void testStuckUpdates() throws Exception { + TestInjection.directUpdateLatch = new CountDownLatch(1); + List nodes = new ArrayList<>(); + ModifiableSolrParams params; + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) { + for (int i = 0; i < 3; i++) { + nodes.clear(); + for (SolrClient c : clients) { + if (random().nextBoolean()) { + continue; + } + HttpSolrClient httpClient = (HttpSolrClient) c; + ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, + httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); + StdNode node = new StdNode(new ZkCoreNodeProps(nodeProps)); + nodes.add(node); + } + AddUpdateCommand c = new AddUpdateCommand(null); + c.solrDoc = sdoc("id", id.incrementAndGet()); + if (nodes.size() > 0) { + params = new ModifiableSolrParams(); + cmdDistrib.distribAdd(c, nodes, params, false); + } + } + cmdDistrib.blockAndDoRetries(); + } catch (IOException e) { + assertTrue(e.toString(), e.toString().contains("processing has stalled")); + } finally { + TestInjection.directUpdateLatch.countDown(); + } + } } diff --git a/solr/solr-ref-guide/src/using-solrj.adoc b/solr/solr-ref-guide/src/using-solrj.adoc index f60664d34cc..8eb82c1597e 100644 --- a/solr/solr-ref-guide/src/using-solrj.adoc +++ b/solr/solr-ref-guide/src/using-solrj.adoc @@ -120,6 +120,12 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on. +`ConcurrentUpdateSolrClient` and its counterpart `ConcurrentUpdateHttp2SolrClient` implement also a stall prevention +timeout that allows requests to non-responsive nodes to fail quicker than waiting for a socket timeout. +The default value of this timeout is set to 15000 ms and can be adjusted by a system property `solr.cloud.client.stallTime`. +This value should be smaller than `solr.jetty.http.idleTimeout` (Which is 120000 ms by default) and greater than the +processing time of the largest update request. + === Cloud Request Routing The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <>. diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java index 7165e9b3a7e..bb0c5822f6e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java @@ -66,6 +66,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { private boolean shutdownClient; private boolean shutdownExecutor; private int pollQueueTime = 250; + private int stallTime; private final boolean streamDeletes; private volatile boolean closed; private volatile CountDownLatch lock = null; // used to block everything @@ -150,6 +151,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { this.runners = new LinkedList<>(); this.streamDeletes = builder.streamDeletes; this.basePath = builder.baseSolrUrl; + this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000); if (builder.executorService != null) { this.scheduler = builder.executorService; @@ -212,6 +214,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { try { Update update; notifyQueueAndRunnersIfEmptyQueue(); + //log.info("-- polling 1"); update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); if (update == null) { @@ -385,6 +388,8 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { Update update = new Update(req, collection); boolean success = queue.offer(update); + long lastStallTime = -1; + int lastQueueSize = -1; for (;;) { synchronized (runners) { // see if queue is half full and we can add more runners @@ -418,6 +423,25 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { if (!success) { success = queue.offer(update, 100, TimeUnit.MILLISECONDS); } + if (!success) { + // stall prevention + int currentQueueSize = queue.size(); + if (currentQueueSize != lastQueueSize) { + // there's still some progress in processing the queue - not stalled + lastQueueSize = currentQueueSize; + lastStallTime = -1; + } else { + if (lastStallTime == -1) { + // mark a stall but keep trying + lastStallTime = System.nanoTime(); + } else { + long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime); + if (currentStallTime > stallTime) { + throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue."); + } + } + } + } } } catch (InterruptedException e) { log.error("interrupted", e); @@ -430,13 +454,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { return dummy; } - public synchronized void blockUntilFinished() { + public synchronized void blockUntilFinished() throws IOException { lock = new CountDownLatch(1); try { waitForEmptyQueue(); interruptRunnerThreadsPolling(); + long lastStallTime = -1; + int lastQueueSize = -1; + synchronized (runners) { // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run, @@ -452,6 +479,23 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { // Need to check if the queue is empty before really considering this is finished (SOLR-4260) int queueSize = queue.size(); + // stall prevention + if (lastQueueSize != queueSize) { + // init, or no stall + lastQueueSize = queueSize; + lastStallTime = -1; + } else { + if (lastStallTime == -1) { + lastStallTime = System.nanoTime(); + } else { + long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime); + if (currentStallTime > stallTime) { + throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process."); +// Thread.currentThread().interrupt(); +// break; + } + } + } if (queueSize > 0 && runners.isEmpty()) { // TODO: can this still happen? log.warn("No more runners, but queue still has " + @@ -485,9 +529,11 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } } - private void waitForEmptyQueue() { + private void waitForEmptyQueue() throws IOException { boolean threadInterrupted = Thread.currentThread().isInterrupted(); + long lastStallTime = -1; + int lastQueueSize = -1; while (!queue.isEmpty()) { if (scheduler.isTerminated()) { log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. " @@ -513,6 +559,24 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { queue.size()); } } + int currentQueueSize = queue.size(); + // stall prevention + if (currentQueueSize != lastQueueSize) { + lastQueueSize = currentQueueSize; + lastStallTime = -1; + } else { + lastQueueSize = currentQueueSize; + if (lastStallTime == -1) { + lastStallTime = System.nanoTime(); + } else { + long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime); + if (currentStallTime > stallTime) { + throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process."); +// threadInterrupted = true; +// break; + } + } + } } if (threadInterrupted) { Thread.currentThread().interrupt(); @@ -598,6 +662,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { */ public void setPollQueueTime(int pollQueueTime) { this.pollQueueTime = pollQueueTime; + this.stallTime = this.pollQueueTime * 3 / 2; } /** diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java index e19d2783e0e..d921cb2114a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java @@ -85,6 +85,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient { final int threadCount; boolean shutdownExecutor = false; int pollQueueTime = 250; + int stallTime; private final boolean streamDeletes; private boolean internalHttpClient; private volatile Integer connectionTimeout; @@ -132,7 +133,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient { this.streamDeletes = builder.streamDeletes; this.connectionTimeout = builder.connectionTimeoutMillis; this.soTimeout = builder.socketTimeoutMillis; - + this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000); + + if (builder.executorService != null) { this.scheduler = builder.executorService; this.shutdownExecutor = false; @@ -518,6 +521,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient { Update update = new Update(req, collection); boolean success = queue.offer(update); + long lastStallTime = -1; + int lastQueueSize = -1; for (;;) { synchronized (runners) { // see if queue is half full and we can add more runners @@ -551,6 +556,25 @@ public class ConcurrentUpdateSolrClient extends SolrClient { if (!success) { success = queue.offer(update, 100, TimeUnit.MILLISECONDS); } + if (!success) { + // stall prevention + int currentQueueSize = queue.size(); + if (currentQueueSize != lastQueueSize) { + // there's still some progress in processing the queue - not stalled + lastQueueSize = currentQueueSize; + lastStallTime = -1; + } else { + if (lastStallTime == -1) { + // mark a stall but keep trying + lastStallTime = System.nanoTime(); + } else { + long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime); + if (currentStallTime > stallTime) { + throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue."); + } + } + } + } } } catch (InterruptedException e) { log.error("interrupted", e); @@ -563,13 +587,16 @@ public class ConcurrentUpdateSolrClient extends SolrClient { return dummy; } - public synchronized void blockUntilFinished() { + public synchronized void blockUntilFinished() throws IOException { lock = new CountDownLatch(1); try { waitForEmptyQueue(); interruptRunnerThreadsPolling(); + long lastStallTime = -1; + int lastQueueSize = -1; + synchronized (runners) { // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run, @@ -587,6 +614,23 @@ public class ConcurrentUpdateSolrClient extends SolrClient { // Need to check if the queue is empty before really considering this is finished (SOLR-4260) int queueSize = queue.size(); + // stall prevention + if (lastQueueSize != queueSize) { + // init, or no stall + lastQueueSize = queueSize; + lastStallTime = -1; + } else { + if (lastStallTime == -1) { + lastStallTime = System.nanoTime(); + } else { + long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime); + if (currentStallTime > stallTime) { + throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process."); +// Thread.currentThread().interrupt(); +// break; + } + } + } if (queueSize > 0 && runners.isEmpty()) { // TODO: can this still happen? log.warn("No more runners, but queue still has " + @@ -620,9 +664,11 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } } - private void waitForEmptyQueue() { + private void waitForEmptyQueue() throws IOException { boolean threadInterrupted = Thread.currentThread().isInterrupted(); + long lastStallTime = -1; + int lastQueueSize = -1; while (!queue.isEmpty()) { if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet(); if (scheduler.isTerminated()) { @@ -643,12 +689,30 @@ public class ConcurrentUpdateSolrClient extends SolrClient { try { queue.wait(250); } catch (InterruptedException e) { - // If we set the thread as interrupted again, the next time the wait it's called i t's going to return immediately + // If we set the thread as interrupted again, the next time the wait it's called it's going to return immediately threadInterrupted = true; log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.", queue.size()); } } + int currentQueueSize = queue.size(); + // stall prevention + if (currentQueueSize != lastQueueSize) { + lastQueueSize = currentQueueSize; + lastStallTime = -1; + } else { + lastQueueSize = currentQueueSize; + if (lastStallTime == -1) { + lastStallTime = System.nanoTime(); + } else { + long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime); + if (currentStallTime > stallTime) { + throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process."); +// threadInterrupted = true; +// break; + } + } + } } if (threadInterrupted) { Thread.currentThread().interrupt();