From 29f6dfe7ed9065f5e8a743b5732a4cdb68518056 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Fri, 18 Dec 2015 04:01:03 +0000 Subject: [PATCH] SOLR-8371: Try and prevent too many recovery requests from stacking up and clean up some faulty cancel recovery logic. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1720718 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../solr/handler/admin/CoreAdminHandler.java | 34 ++-- .../solr/update/DefaultSolrCoreState.java | 149 ++++++++++-------- .../solr/update/UpdateShardHandler.java | 24 ++- 4 files changed, 126 insertions(+), 84 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1b0156bd04d..84b14afb8bf 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -128,6 +128,9 @@ Bug Fixes * SOLR-8191: Guard against CloudSolrStream close method NullPointerException (Kevin Risden, Joel Bernstein) + +* SOLR-8371: Try and prevent too many recovery requests from stacking up and clean up some faulty + cancel recovery logic. (Mark Miller) Optimizations diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index 9e048a91a60..dad6384c5d2 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -746,28 +746,22 @@ public class CoreAdminHandler extends RequestHandlerBase { } - protected void handleRequestRecoveryAction(SolrQueryRequest req, - SolrQueryResponse rsp) throws IOException { + protected void handleRequestRecoveryAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException { final SolrParams params = req.getParams(); - log.info("It has been requested that we recover: core="+params.get(CoreAdminParams.CORE)); - Thread thread = new Thread() { - @Override - public void run() { - String cname = params.get(CoreAdminParams.CORE); - if (cname == null) { - cname = ""; - } - try (SolrCore core = coreContainer.getCore(cname)) { - if (core != null) { - core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, core.getCoreDescriptor()); - } else { - SolrException.log(log, "Could not find core to call recovery:" + cname); - } - } - } - }; + log.info("It has been requested that we recover: core=" + params.get(CoreAdminParams.CORE)); + + String cname = params.get(CoreAdminParams.CORE); + if (cname == null) { + cname = ""; + } + try (SolrCore core = coreContainer.getCore(cname)) { + if (core != null) { + core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, core.getCoreDescriptor()); + } else { + SolrException.log(log, "Could not find core to call recovery:" + cname); + } + } - thread.start(); } protected void handleRequestSyncAction(SolrQueryRequest req, diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java index f9eb28c3817..c2d2365bf7c 100644 --- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -45,11 +47,13 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery"); - private final Object recoveryLock = new Object(); + private final ReentrantLock recoveryLock = new ReentrantLock(); private final ActionThrottle recoveryThrottle = new ActionThrottle("recovery", 10000); private final ActionThrottle leaderThrottle = new ActionThrottle("leader", 5000); + + private final AtomicInteger recoveryWaiting = new AtomicInteger(); // Use the readLock to retrieve the current IndexWriter (may be lazily opened) // Use the writeLock for changing index writers @@ -58,9 +62,8 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover private SolrIndexWriter indexWriter = null; private DirectoryFactory directoryFactory; - private volatile boolean recoveryRunning; - private RecoveryStrategy recoveryStrat; - private Future future; + private volatile RecoveryStrategy recoveryStrat; + private volatile Future future; private volatile boolean lastReplicationSuccess = true; // will we attempt recovery as if we just started up (i.e. use starting versions rather than recent versions for peersync @@ -244,74 +247,97 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover @Override public void doRecovery(CoreContainer cc, CoreDescriptor cd) { - MDCLoggingContext.setCoreDescriptor(cd); - try { - if (SKIP_AUTO_RECOVERY) { - log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery"); - return; - } - - // check before we grab the lock - if (cc.isShutDown()) { - log.warn("Skipping recovery because Solr is shutdown"); - return; - } - - synchronized (recoveryLock) { - // to be air tight we must also check after lock - if (cc.isShutDown()) { - log.warn("Skipping recovery because Solr is shutdown"); - return; - } - log.info("Running recovery - first canceling any ongoing recovery"); - cancelRecovery(); - - while (recoveryRunning) { - try { - recoveryLock.wait(1000); - } catch (InterruptedException e) { - + + Thread thread = new Thread() { + @Override + public void run() { + MDCLoggingContext.setCoreDescriptor(cd); + try { + if (SKIP_AUTO_RECOVERY) { + log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery"); + return; } - // check again for those that were waiting + + // check before we grab the lock if (cc.isShutDown()) { log.warn("Skipping recovery because Solr is shutdown"); return; } - if (closed) return; + + // if we can't get the lock, another recovery is running + // we check to see if there is already one waiting to go + // after the current one, and if there is, bail + boolean locked = recoveryLock.tryLock(); + try { + if (!locked) { + if (recoveryWaiting.get() > 0) { + return; + } + recoveryWaiting.incrementAndGet(); + } else { + recoveryWaiting.incrementAndGet(); + cancelRecovery(); + } + + recoveryLock.lock(); + try { + recoveryWaiting.decrementAndGet(); + + // to be air tight we must also check after lock + if (cc.isShutDown()) { + log.warn("Skipping recovery because Solr is shutdown"); + return; + } + log.info("Running recovery"); + + recoveryThrottle.minimumWaitBetweenActions(); + recoveryThrottle.markAttemptingAction(); + + recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this); + recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup); + future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat); + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SolrException(ErrorCode.SERVER_ERROR, e); + } catch (ExecutionException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, e); + } + } finally { + recoveryLock.unlock(); + } + } finally { + if (locked) recoveryLock.unlock(); + } + } finally { + MDCLoggingContext.clear(); } - - recoveryThrottle.minimumWaitBetweenActions(); - recoveryThrottle.markAttemptingAction(); - - recoveryStrat = new RecoveryStrategy(cc, cd, this); - recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup); - future = cc.getUpdateShardHandler().getUpdateExecutor().submit(recoveryStrat); - recoveryRunning = true; } - } finally { - MDCLoggingContext.clear(); + }; + try { + // we make recovery requests async - that async request may + // have to 'wait in line' a bit or bail if a recovery is + // already queued up - the recovery execution itself is run + // in another thread on another 'recovery' executor. + // The update executor is interrupted on shutdown and should + // not do disk IO. + // The recovery executor is not interrupted on shutdown. + // + // avoid deadlock: we can't use the recovery executor here + cc.getUpdateShardHandler().getUpdateExecutor().submit(thread); + } catch (RejectedExecutionException e) { + // fine, we are shutting down } } @Override public void cancelRecovery() { - synchronized (recoveryLock) { - if (recoveryStrat != null && recoveryRunning) { + if (recoveryStrat != null) { + try { recoveryStrat.close(); - while (true) { - try { - future.get(); - } catch (InterruptedException e) { - // not interruptible - keep waiting - continue; - } catch (ExecutionException e) { - break; - } - break; - } - - recoveryRunning = false; - recoveryLock.notifyAll(); + } catch (NullPointerException e) { + // okay } } } @@ -320,14 +346,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover @Override public void recovered() { recoveringAfterStartup = false; // once we have successfully recovered, we no longer need to act as if we are recovering after startup - recoveryRunning = false; } /** called from recoveryStrat on a failed recovery */ @Override - public void failed() { - recoveryRunning = false; - } + public void failed() {} @Override public synchronized void close(IndexWriterCloser closer) { diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java index ad9606db1ed..acf4ddecfa5 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java @@ -25,6 +25,7 @@ import org.apache.http.impl.conn.PoolingClientConnectionManager; import org.apache.http.impl.conn.SchemeRegistryFactory; import org.apache.solr.client.solrj.impl.HttpClientConfigurer; import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.cloud.RecoveryStrategy; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.ExecutorUtil; @@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; public class UpdateShardHandler { @@ -52,6 +52,9 @@ public class UpdateShardHandler { private ExecutorService updateExecutor = ExecutorUtil.newMDCAwareCachedThreadPool( new SolrjNamedThreadFactory("updateExecutor")); + private ExecutorService recoveryExecutor = ExecutorUtil.newMDCAwareCachedThreadPool( + new SolrjNamedThreadFactory("recoveryExecutor")); + private PoolingClientConnectionManager clientConnectionManager; private final CloseableHttpClient client; @@ -105,13 +108,32 @@ public class UpdateShardHandler { return clientConnectionManager; } + /** + * This method returns an executor that is not meant for disk IO and that will + * be interrupted on shutdown. + * + * @return an executor for update related activities that do not do disk IO. + */ public ExecutorService getUpdateExecutor() { return updateExecutor; } + + /** + * In general, RecoveryStrategy threads do not do disk IO, but they open and close SolrCores + * in async threads, amoung other things, and can trigger disk IO, so we use this alternate + * executor rather than the 'updateExecutor', which is interrupted on shutdown. + * + * @return executor for {@link RecoveryStrategy} thread which will not be interrupted on close. + */ + public ExecutorService getRecoveryExecutor() { + return recoveryExecutor; + } public void close() { try { + // we interrupt on purpose here, but this exectuor should not run threads that do disk IO! ExecutorUtil.shutdownWithInterruptAndAwaitTermination(updateExecutor); + ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor); } catch (Exception e) { SolrException.log(log, e); } finally {