From d6337ac3e566c504766d69499ab470bd26744a29 Mon Sep 17 00:00:00 2001 From: markrmiller Date: Wed, 22 Feb 2017 13:00:42 -0500 Subject: [PATCH] SOLR-9824: Some bulk update paths could be very slow due to CUSC polling. --- solr/CHANGES.txt | 2 + .../handler/loader/ContentStreamLoader.java | 2 - .../solr/handler/loader/JavabinLoader.java | 3 - .../apache/solr/update/AddUpdateCommand.java | 2 - .../solr/update/SolrCmdDistributor.java | 15 +- .../solr/update/StreamingSolrClients.java | 2 +- .../processor/DistributedUpdateProcessor.java | 7 + .../impl/ConcurrentUpdateSolrClient.java | 319 +++++++++++++----- 8 files changed, 253 insertions(+), 99 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index a6b5504dfe2..dcea40c797c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -178,6 +178,8 @@ Bug Fixes * SOLR-10168: ShardSplit can fail with NPE in OverseerCollectionMessageHandler#waitForCoreAdminAsyncCallToComplete. (Mark Miller) +* SOLR-9824: Some bulk update paths could be very slow due to CUSC polling. (David Smiley, Mark Miller) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java index 1dd038ffebe..7751b43f843 100644 --- a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java +++ b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java @@ -29,8 +29,6 @@ import org.apache.solr.update.processor.UpdateRequestProcessor; */ public abstract class ContentStreamLoader { - protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25); - /** * This should be called once for each RequestHandler */ diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java index 6114280b833..873bcd15aa5 100644 --- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java +++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java @@ -116,9 +116,6 @@ public class JavabinLoader extends ContentStreamLoader { private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) { AddUpdateCommand addCmd = new AddUpdateCommand(req); - // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger - // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily - addCmd.pollQueueTime = pollQueueTime; addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true); addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1); return addCmd; diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java index 0ede72838ba..f5263973d55 100644 --- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java +++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java @@ -60,8 +60,6 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable queryParams = new HashSet<>(2); queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM); queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index c6ccb710a34..ec093cf5dfe 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -828,6 +828,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // Given that, it may also make sense to move the version reporting out of this // processor too. } + + @Override + protected void doClose() { + if (cmdDistrib != null) { + cmdDistrib.close(); + } + } // TODO: optionally fail if n replicas are not reached... private void doFinish() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java index 5c3f289c4f5..4eac2a534a2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; @@ -87,7 +88,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient { private boolean internalHttpClient; private volatile Integer connectionTimeout; private volatile Integer soTimeout; - + private volatile boolean closed; + + AtomicInteger pollInterrupts; + AtomicInteger pollExits; + AtomicInteger blockLoops; + AtomicInteger emptyQueueLoops; + /** * Uses an internally managed HttpClient instance. * @@ -156,6 +163,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient { scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler")); shutdownExecutor = true; } + + if (log.isDebugEnabled()) { + pollInterrupts = new AtomicInteger(); + pollExits = new AtomicInteger(); + blockLoops = new AtomicInteger(); + emptyQueueLoops = new AtomicInteger(); + } } public Set getQueryParams() { @@ -174,13 +188,19 @@ public class ConcurrentUpdateSolrClient extends SolrClient { * Opens a connection and sends everything... */ class Runner implements Runnable { + volatile Thread thread = null; + volatile boolean inPoll = false; + + public Thread getThread() { + return thread; + } + @Override public void run() { + this.thread = Thread.currentThread(); log.debug("starting runner: {}", this); - // This loop is so we can continue if an element was added to the queue after the last runner exited. for (;;) { - try { sendUpdateStream(); @@ -191,7 +211,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } handleError(e); } finally { - synchronized (runners) { // check to see if anything else was added to the queue if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) { @@ -205,26 +224,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient { break; } } - } } log.debug("finished: {}", this); } + public void interruptPoll() { + Thread lthread = thread; + if (inPoll && lthread != null) { + lthread.interrupt(); + } + } + // // Pull from the queue multiple times and streams over a single connection. // Exits on exception, interruption, or an empty queue to pull from. // void sendUpdateStream() throws Exception { + while (!queue.isEmpty()) { HttpPost method = null; HttpResponse response = null; - + InputStream rspBody = null; try { - final Update update = - queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + Update update; + notifyQueueAndRunnersIfEmptyQueue(); + try { + inPoll = true; + update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + if (log.isDebugEnabled()) pollInterrupts.incrementAndGet(); + continue; + } finally { + inPoll = false; + } if (update == null) break; @@ -234,61 +269,73 @@ public class ConcurrentUpdateSolrClient extends SolrClient { final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams()); EntityTemplate template = new EntityTemplate(new ContentProducer() { - + @Override public void writeTo(OutputStream out) throws IOException { - try { - if (isXml) { - out.write("".getBytes(StandardCharsets.UTF_8)); // can be anything - } - Update upd = update; - while (upd != null) { - UpdateRequest req = upd.getRequest(); - SolrParams currentParams = new ModifiableSolrParams(req.getParams()); - if (!origParams.toNamedList().equals(currentParams.toNamedList())) { - queue.add(upd); // params are different, push back to queue - break; - } - client.requestWriter.write(req, out); - if (isXml) { - // check for commit or optimize - SolrParams params = req.getParams(); - if (params != null) { - String fmt = null; - if (params.getBool(UpdateParams.OPTIMIZE, false)) { - fmt = ""; - } else if (params.getBool(UpdateParams.COMMIT, false)) { - fmt = ""; - } - if (fmt != null) { - byte[] content = String.format(Locale.ROOT, - fmt, - params.getBool(UpdateParams.WAIT_SEARCHER, false) - + "").getBytes(StandardCharsets.UTF_8); - out.write(content); - } + if (isXml) { + out.write("".getBytes(StandardCharsets.UTF_8)); // can be anything + } + Update upd = update; + while (upd != null) { + UpdateRequest req = upd.getRequest(); + SolrParams currentParams = new ModifiableSolrParams(req.getParams()); + if (!origParams.toNamedList().equals(currentParams.toNamedList())) { + queue.add(upd); // params are different, push back to queue + break; + } + + client.requestWriter.write(req, out); + if (isXml) { + // check for commit or optimize + SolrParams params = req.getParams(); + if (params != null) { + String fmt = null; + if (params.getBool(UpdateParams.OPTIMIZE, false)) { + fmt = ""; + } else if (params.getBool(UpdateParams.COMMIT, false)) { + fmt = ""; + } + if (fmt != null) { + byte[] content = String.format(Locale.ROOT, + fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false) + + "") + .getBytes(StandardCharsets.UTF_8); + out.write(content); } } - out.flush(); + } + out.flush(); - if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) { - // no need to wait to see another doc in the queue if we've hit the last doc in a batch - upd = queue.poll(0, TimeUnit.MILLISECONDS); - } else { - upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + notifyQueueAndRunnersIfEmptyQueue(); + inPoll = true; + try { + while (true) { + try { + upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException e) { + if (log.isDebugEnabled()) pollInterrupts.incrementAndGet(); + if (!queue.isEmpty()) { + continue; + } + if (log.isDebugEnabled()) pollExits.incrementAndGet(); + upd = null; + break; + } finally { + inPoll = false; + } } - + }finally { + inPoll = false; } - - if (isXml) { - out.write("".getBytes(StandardCharsets.UTF_8)); - } - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("", e); } + + if (isXml) { + out.write("".getBytes(StandardCharsets.UTF_8)); + } + + } }); @@ -318,10 +365,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient { method.setEntity(template); method.addHeader("User-Agent", HttpSolrClient.AGENT); method.addHeader("Content-Type", contentType); - + + response = client.getHttpClient() .execute(method, HttpClientUtil.createNewHttpClientRequestContext()); + rspBody = response.getEntity().getContent(); + int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != HttpStatus.SC_OK) { StringBuilder msg = new StringBuilder(); @@ -364,6 +414,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } else { onSuccess(response); } + } finally { try { if (response != null) { @@ -372,10 +423,25 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } catch (Exception e) { log.error("Error consuming and closing http response stream.", e); } + notifyQueueAndRunnersIfEmptyQueue(); } } } } + + private void notifyQueueAndRunnersIfEmptyQueue() { + if (queue.size() == 0) { + synchronized (queue) { + // queue may be empty + queue.notifyAll(); + } + synchronized (runners) { + // we notify runners too - if there is a high queue poll time and this is the update + // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished + runners.notifyAll(); + } + } + } // *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() } private void addRunner() { @@ -383,7 +449,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient { try { Runner r = new Runner(); runners.add(r); + scheduler.execute(r); // this can throw an exception if the scheduler has been shutdown, but that should be fine. + } finally { MDC.remove("ConcurrentUpdateSolrClient.url"); } @@ -517,29 +585,52 @@ public class ConcurrentUpdateSolrClient extends SolrClient { public synchronized void blockUntilFinished() { lock = new CountDownLatch(1); try { + + waitForEmptyQueue(); + interruptRunnerThreadsPolling(); + synchronized (runners) { // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run, - // which means it would never remove itself from the runners list. This is why we don't wait forever + // which means it would never remove itself from the runners list. This is why we don't wait forever // and periodically check if the scheduler is shutting down. + int loopCount = 0; while (!runners.isEmpty()) { - try { - runners.wait(250); - } catch (InterruptedException e) { - Thread.interrupted(); - } + + if (log.isDebugEnabled()) blockLoops.incrementAndGet(); if (scheduler.isShutdown()) break; - + + loopCount++; + // Need to check if the queue is empty before really considering this is finished (SOLR-4260) int queueSize = queue.size(); if (queueSize > 0 && runners.isEmpty()) { // TODO: can this still happen? - log.warn("No more runners, but queue still has "+ - queueSize+" adding more runners to process remaining requests on queue"); + log.warn("No more runners, but queue still has " + + queueSize + " adding more runners to process remaining requests on queue"); addRunner(); } + + interruptRunnerThreadsPolling(); + + // try to avoid the worst case wait timeout + // without bad spin + int timeout; + if (loopCount < 3) { + timeout = 10; + } else if (loopCount < 10) { + timeout = 25; + } else { + timeout = 250; + } + + try { + runners.wait(timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } } finally { @@ -548,6 +639,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } } + private void waitForEmptyQueue() { + + while (!queue.isEmpty()) { + if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet(); + + synchronized (runners) { + int queueSize = queue.size(); + if (queueSize > 0 && runners.isEmpty()) { + log.warn("No more runners, but queue still has " + + queueSize + " adding more runners to process remaining requests on queue"); + addRunner(); + } + } + synchronized (queue) { + try { + queue.wait(250); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + public void handleError(Throwable ex) { log.error("error", ex); } @@ -560,19 +674,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } @Override - public void close() { - if (internalHttpClient) IOUtils.closeQuietly(client); - if (shutdownExecutor) { - scheduler.shutdown(); - try { - if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { + public synchronized void close() { + if (closed) { + interruptRunnerThreadsPolling(); + return; + } + closed = true; + + try { + if (shutdownExecutor) { + scheduler.shutdown(); + interruptRunnerThreadsPolling(); + try { + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log + .error("ExecutorService did not terminate"); + } + } catch (InterruptedException ie) { scheduler.shutdownNow(); - if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log - .error("ExecutorService did not terminate"); + Thread.currentThread().interrupt(); } - } catch (InterruptedException ie) { - scheduler.shutdownNow(); - Thread.currentThread().interrupt(); + } else { + interruptRunnerThreadsPolling(); + } + } finally { + if (internalHttpClient) IOUtils.closeQuietly(client); + if (log.isDebugEnabled()) { + log.debug("STATS pollInteruppts={} pollExists={} blockLoops={} emptyQueueLoops={}", pollInterrupts.get(), pollExits.get(), blockLoops.get(), emptyQueueLoops.get()); + } + } + } + + private void interruptRunnerThreadsPolling() { + synchronized (runners) { + for (Runner runner : runners) { + runner.interruptPoll(); } } } @@ -590,17 +727,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } public void shutdownNow() { - if (internalHttpClient) IOUtils.closeQuietly(client); - if (shutdownExecutor) { - scheduler.shutdownNow(); // Cancel currently executing tasks - try { - if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) - log.error("ExecutorService did not terminate"); - } catch (InterruptedException ie) { - scheduler.shutdownNow(); - Thread.currentThread().interrupt(); + if (closed) { + return; + } + closed = true; + try { + + if (shutdownExecutor) { + scheduler.shutdown(); + interruptRunnerThreadsPolling(); + scheduler.shutdownNow(); // Cancel currently executing tasks + try { + if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) + log.error("ExecutorService did not terminate"); + } catch (InterruptedException ie) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } else { + interruptRunnerThreadsPolling(); } - } + } finally { + if (internalHttpClient) IOUtils.closeQuietly(client); + } } public void setParser(ResponseParser responseParser) {