mirror of https://github.com/apache/lucene.git
SOLR-8371: Try and prevent too many recovery requests from stacking up and clean up some faulty cancel recovery logic.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1720718 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
46392dde41
commit
29f6dfe7ed
|
@ -129,6 +129,9 @@ Bug Fixes
|
||||||
* SOLR-8191: Guard against CloudSolrStream close method NullPointerException
|
* SOLR-8191: Guard against CloudSolrStream close method NullPointerException
|
||||||
(Kevin Risden, Joel Bernstein)
|
(Kevin Risden, Joel Bernstein)
|
||||||
|
|
||||||
|
* SOLR-8371: Try and prevent too many recovery requests from stacking up and clean up some faulty
|
||||||
|
cancel recovery logic. (Mark Miller)
|
||||||
|
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
|
@ -746,28 +746,22 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void handleRequestRecoveryAction(SolrQueryRequest req,
|
protected void handleRequestRecoveryAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
|
||||||
SolrQueryResponse rsp) throws IOException {
|
|
||||||
final SolrParams params = req.getParams();
|
final SolrParams params = req.getParams();
|
||||||
log.info("It has been requested that we recover: core="+params.get(CoreAdminParams.CORE));
|
log.info("It has been requested that we recover: core=" + params.get(CoreAdminParams.CORE));
|
||||||
Thread thread = new Thread() {
|
|
||||||
@Override
|
String cname = params.get(CoreAdminParams.CORE);
|
||||||
public void run() {
|
if (cname == null) {
|
||||||
String cname = params.get(CoreAdminParams.CORE);
|
cname = "";
|
||||||
if (cname == null) {
|
}
|
||||||
cname = "";
|
try (SolrCore core = coreContainer.getCore(cname)) {
|
||||||
}
|
if (core != null) {
|
||||||
try (SolrCore core = coreContainer.getCore(cname)) {
|
core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, core.getCoreDescriptor());
|
||||||
if (core != null) {
|
} else {
|
||||||
core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, core.getCoreDescriptor());
|
SolrException.log(log, "Could not find core to call recovery:" + cname);
|
||||||
} else {
|
}
|
||||||
SolrException.log(log, "Could not find core to call recovery:" + cname);
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
thread.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void handleRequestSyncAction(SolrQueryRequest req,
|
protected void handleRequestSyncAction(SolrQueryRequest req,
|
||||||
|
|
|
@ -21,7 +21,9 @@ import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -45,12 +47,14 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
||||||
|
|
||||||
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
|
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
|
||||||
|
|
||||||
private final Object recoveryLock = new Object();
|
private final ReentrantLock recoveryLock = new ReentrantLock();
|
||||||
|
|
||||||
private final ActionThrottle recoveryThrottle = new ActionThrottle("recovery", 10000);
|
private final ActionThrottle recoveryThrottle = new ActionThrottle("recovery", 10000);
|
||||||
|
|
||||||
private final ActionThrottle leaderThrottle = new ActionThrottle("leader", 5000);
|
private final ActionThrottle leaderThrottle = new ActionThrottle("leader", 5000);
|
||||||
|
|
||||||
|
private final AtomicInteger recoveryWaiting = new AtomicInteger();
|
||||||
|
|
||||||
// Use the readLock to retrieve the current IndexWriter (may be lazily opened)
|
// Use the readLock to retrieve the current IndexWriter (may be lazily opened)
|
||||||
// Use the writeLock for changing index writers
|
// Use the writeLock for changing index writers
|
||||||
private final ReentrantReadWriteLock iwLock = new ReentrantReadWriteLock();
|
private final ReentrantReadWriteLock iwLock = new ReentrantReadWriteLock();
|
||||||
|
@ -58,9 +62,8 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
||||||
private SolrIndexWriter indexWriter = null;
|
private SolrIndexWriter indexWriter = null;
|
||||||
private DirectoryFactory directoryFactory;
|
private DirectoryFactory directoryFactory;
|
||||||
|
|
||||||
private volatile boolean recoveryRunning;
|
private volatile RecoveryStrategy recoveryStrat;
|
||||||
private RecoveryStrategy recoveryStrat;
|
private volatile Future future;
|
||||||
private Future future;
|
|
||||||
private volatile boolean lastReplicationSuccess = true;
|
private volatile boolean lastReplicationSuccess = true;
|
||||||
|
|
||||||
// will we attempt recovery as if we just started up (i.e. use starting versions rather than recent versions for peersync
|
// will we attempt recovery as if we just started up (i.e. use starting versions rather than recent versions for peersync
|
||||||
|
@ -244,74 +247,97 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doRecovery(CoreContainer cc, CoreDescriptor cd) {
|
public void doRecovery(CoreContainer cc, CoreDescriptor cd) {
|
||||||
MDCLoggingContext.setCoreDescriptor(cd);
|
|
||||||
try {
|
|
||||||
if (SKIP_AUTO_RECOVERY) {
|
|
||||||
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check before we grab the lock
|
|
||||||
if (cc.isShutDown()) {
|
|
||||||
log.warn("Skipping recovery because Solr is shutdown");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (recoveryLock) {
|
|
||||||
// to be air tight we must also check after lock
|
|
||||||
if (cc.isShutDown()) {
|
|
||||||
log.warn("Skipping recovery because Solr is shutdown");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
log.info("Running recovery - first canceling any ongoing recovery");
|
|
||||||
cancelRecovery();
|
|
||||||
|
|
||||||
while (recoveryRunning) {
|
|
||||||
try {
|
|
||||||
recoveryLock.wait(1000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
|
|
||||||
|
Thread thread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
MDCLoggingContext.setCoreDescriptor(cd);
|
||||||
|
try {
|
||||||
|
if (SKIP_AUTO_RECOVERY) {
|
||||||
|
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
// check again for those that were waiting
|
|
||||||
|
// check before we grab the lock
|
||||||
if (cc.isShutDown()) {
|
if (cc.isShutDown()) {
|
||||||
log.warn("Skipping recovery because Solr is shutdown");
|
log.warn("Skipping recovery because Solr is shutdown");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (closed) return;
|
|
||||||
|
// if we can't get the lock, another recovery is running
|
||||||
|
// we check to see if there is already one waiting to go
|
||||||
|
// after the current one, and if there is, bail
|
||||||
|
boolean locked = recoveryLock.tryLock();
|
||||||
|
try {
|
||||||
|
if (!locked) {
|
||||||
|
if (recoveryWaiting.get() > 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
recoveryWaiting.incrementAndGet();
|
||||||
|
} else {
|
||||||
|
recoveryWaiting.incrementAndGet();
|
||||||
|
cancelRecovery();
|
||||||
|
}
|
||||||
|
|
||||||
|
recoveryLock.lock();
|
||||||
|
try {
|
||||||
|
recoveryWaiting.decrementAndGet();
|
||||||
|
|
||||||
|
// to be air tight we must also check after lock
|
||||||
|
if (cc.isShutDown()) {
|
||||||
|
log.warn("Skipping recovery because Solr is shutdown");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info("Running recovery");
|
||||||
|
|
||||||
|
recoveryThrottle.minimumWaitBetweenActions();
|
||||||
|
recoveryThrottle.markAttemptingAction();
|
||||||
|
|
||||||
|
recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this);
|
||||||
|
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
|
||||||
|
future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat);
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
recoveryLock.unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (locked) recoveryLock.unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
MDCLoggingContext.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
recoveryThrottle.minimumWaitBetweenActions();
|
|
||||||
recoveryThrottle.markAttemptingAction();
|
|
||||||
|
|
||||||
recoveryStrat = new RecoveryStrategy(cc, cd, this);
|
|
||||||
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
|
|
||||||
future = cc.getUpdateShardHandler().getUpdateExecutor().submit(recoveryStrat);
|
|
||||||
recoveryRunning = true;
|
|
||||||
}
|
}
|
||||||
} finally {
|
};
|
||||||
MDCLoggingContext.clear();
|
try {
|
||||||
|
// we make recovery requests async - that async request may
|
||||||
|
// have to 'wait in line' a bit or bail if a recovery is
|
||||||
|
// already queued up - the recovery execution itself is run
|
||||||
|
// in another thread on another 'recovery' executor.
|
||||||
|
// The update executor is interrupted on shutdown and should
|
||||||
|
// not do disk IO.
|
||||||
|
// The recovery executor is not interrupted on shutdown.
|
||||||
|
//
|
||||||
|
// avoid deadlock: we can't use the recovery executor here
|
||||||
|
cc.getUpdateShardHandler().getUpdateExecutor().submit(thread);
|
||||||
|
} catch (RejectedExecutionException e) {
|
||||||
|
// fine, we are shutting down
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancelRecovery() {
|
public void cancelRecovery() {
|
||||||
synchronized (recoveryLock) {
|
if (recoveryStrat != null) {
|
||||||
if (recoveryStrat != null && recoveryRunning) {
|
try {
|
||||||
recoveryStrat.close();
|
recoveryStrat.close();
|
||||||
while (true) {
|
} catch (NullPointerException e) {
|
||||||
try {
|
// okay
|
||||||
future.get();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// not interruptible - keep waiting
|
|
||||||
continue;
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
recoveryRunning = false;
|
|
||||||
recoveryLock.notifyAll();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -320,14 +346,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
||||||
@Override
|
@Override
|
||||||
public void recovered() {
|
public void recovered() {
|
||||||
recoveringAfterStartup = false; // once we have successfully recovered, we no longer need to act as if we are recovering after startup
|
recoveringAfterStartup = false; // once we have successfully recovered, we no longer need to act as if we are recovering after startup
|
||||||
recoveryRunning = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** called from recoveryStrat on a failed recovery */
|
/** called from recoveryStrat on a failed recovery */
|
||||||
@Override
|
@Override
|
||||||
public void failed() {
|
public void failed() {}
|
||||||
recoveryRunning = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close(IndexWriterCloser closer) {
|
public synchronized void close(IndexWriterCloser closer) {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.http.impl.conn.PoolingClientConnectionManager;
|
||||||
import org.apache.http.impl.conn.SchemeRegistryFactory;
|
import org.apache.http.impl.conn.SchemeRegistryFactory;
|
||||||
import org.apache.solr.client.solrj.impl.HttpClientConfigurer;
|
import org.apache.solr.client.solrj.impl.HttpClientConfigurer;
|
||||||
import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
||||||
|
import org.apache.solr.cloud.RecoveryStrategy;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
|
@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
public class UpdateShardHandler {
|
public class UpdateShardHandler {
|
||||||
|
|
||||||
|
@ -52,6 +52,9 @@ public class UpdateShardHandler {
|
||||||
private ExecutorService updateExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
|
private ExecutorService updateExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
|
||||||
new SolrjNamedThreadFactory("updateExecutor"));
|
new SolrjNamedThreadFactory("updateExecutor"));
|
||||||
|
|
||||||
|
private ExecutorService recoveryExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
|
||||||
|
new SolrjNamedThreadFactory("recoveryExecutor"));
|
||||||
|
|
||||||
private PoolingClientConnectionManager clientConnectionManager;
|
private PoolingClientConnectionManager clientConnectionManager;
|
||||||
|
|
||||||
private final CloseableHttpClient client;
|
private final CloseableHttpClient client;
|
||||||
|
@ -105,13 +108,32 @@ public class UpdateShardHandler {
|
||||||
return clientConnectionManager;
|
return clientConnectionManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method returns an executor that is not meant for disk IO and that will
|
||||||
|
* be interrupted on shutdown.
|
||||||
|
*
|
||||||
|
* @return an executor for update related activities that do not do disk IO.
|
||||||
|
*/
|
||||||
public ExecutorService getUpdateExecutor() {
|
public ExecutorService getUpdateExecutor() {
|
||||||
return updateExecutor;
|
return updateExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In general, RecoveryStrategy threads do not do disk IO, but they open and close SolrCores
|
||||||
|
* in async threads, amoung other things, and can trigger disk IO, so we use this alternate
|
||||||
|
* executor rather than the 'updateExecutor', which is interrupted on shutdown.
|
||||||
|
*
|
||||||
|
* @return executor for {@link RecoveryStrategy} thread which will not be interrupted on close.
|
||||||
|
*/
|
||||||
|
public ExecutorService getRecoveryExecutor() {
|
||||||
|
return recoveryExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
|
// we interrupt on purpose here, but this exectuor should not run threads that do disk IO!
|
||||||
ExecutorUtil.shutdownWithInterruptAndAwaitTermination(updateExecutor);
|
ExecutorUtil.shutdownWithInterruptAndAwaitTermination(updateExecutor);
|
||||||
|
ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SolrException.log(log, e);
|
SolrException.log(log, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue