mirror of https://github.com/apache/lucene.git
SOLR-14942: Reduce leader election time on node shutdown (#2004)
The shutdown process waits for all replicas/cores to be closed before removing the election node of the leader. This can take some time due to index flush or merge activities on the leader cores and delays new leaders from being elected. Moreover, jetty stops accepting new requests on receiving SIGTERM which means that even though a leader technically exists, no new indexing requests can be processed by the node. This commit waits for all in-flight indexing requests to complete, removes election nodes (thus triggering leader election) and then closes all replicas. Co-authored-by: Cao Manh Dat <datcm@apache.org>
This commit is contained in:
parent
840a353bc7
commit
706f284c46
|
@ -139,6 +139,31 @@ Bug Fixes
|
|||
* SOLR-14546: Fix for a relatively hard to hit issue in OverseerTaskProcessor that could lead to out of order execution
|
||||
of Collection API tasks competing for a lock (Ilan Ginzburg).
|
||||
|
||||
================== 8.8.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
||||
New Features
|
||||
---------------------
|
||||
(No changes)
|
||||
|
||||
Improvements
|
||||
---------------------
|
||||
* SOLR-14942: Reduce leader election time on node shutdown by removing election nodes before closing cores.
|
||||
(Cao Manh Dat, Mike Drob, hossman, shalin)
|
||||
|
||||
Optimizations
|
||||
---------------------
|
||||
(No changes)
|
||||
|
||||
Bug Fixes
|
||||
---------------------
|
||||
(No changes)
|
||||
|
||||
Other Changes
|
||||
---------------------
|
||||
(No changes)
|
||||
|
||||
================== 8.7.0 ==================
|
||||
|
||||
Consult the lucene/CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
|
|
@ -648,18 +648,23 @@ public class ZkController implements Closeable {
|
|||
|
||||
ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool"));
|
||||
|
||||
customThreadPool.submit(() -> Collections.singleton(overseerElector.getContext()).parallelStream().forEach(IOUtils::closeQuietly));
|
||||
customThreadPool.submit(() -> IOUtils.closeQuietly(overseerElector.getContext()));
|
||||
|
||||
customThreadPool.submit(() -> Collections.singleton(overseer).parallelStream().forEach(IOUtils::closeQuietly));
|
||||
customThreadPool.submit(() -> IOUtils.closeQuietly(overseer));
|
||||
|
||||
try {
|
||||
customThreadPool.submit(() -> electionContexts.values().parallelStream().forEach(IOUtils::closeQuietly));
|
||||
customThreadPool.submit(() -> {
|
||||
Collection<ElectionContext> values = electionContexts.values();
|
||||
synchronized (electionContexts) {
|
||||
values.forEach(IOUtils::closeQuietly);
|
||||
}
|
||||
});
|
||||
|
||||
} finally {
|
||||
|
||||
sysPropsCacher.close();
|
||||
customThreadPool.submit(() -> Collections.singleton(cloudSolrClient).parallelStream().forEach(IOUtils::closeQuietly));
|
||||
customThreadPool.submit(() -> Collections.singleton(cloudManager).parallelStream().forEach(IOUtils::closeQuietly));
|
||||
customThreadPool.submit(() -> IOUtils.closeQuietly(cloudSolrClient));
|
||||
customThreadPool.submit(() -> IOUtils.closeQuietly(cloudManager));
|
||||
|
||||
try {
|
||||
try {
|
||||
|
@ -1838,6 +1843,28 @@ public class ZkController implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to cancel all leader elections. This method should be called on node shutdown.
|
||||
*/
|
||||
public void tryCancelAllElections() {
|
||||
if (zkClient.isClosed()) {
|
||||
return;
|
||||
}
|
||||
Collection<ElectionContext> values = electionContexts.values();
|
||||
synchronized (electionContexts) {
|
||||
values.forEach(context -> {
|
||||
try {
|
||||
context.cancelElection();
|
||||
context.close();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (KeeperException e) {
|
||||
log.warn("Error on cancelling elections of {}", context.leaderPath, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private ZkCoreNodeProps waitForLeaderToSeeDownState(
|
||||
CoreDescriptor descriptor, final String coreZkNodeName) throws SessionExpiredException {
|
||||
// try not to wait too long here - if we are waiting too long, we should probably
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -1017,6 +1018,27 @@ public class CoreContainer {
|
|||
if (isZooKeeperAware()) {
|
||||
cancelCoreRecoveries();
|
||||
zkSys.zkController.preClose();
|
||||
/*
|
||||
* Pause updates for all cores on this node and wait for all in-flight update requests to finish.
|
||||
* Here, we (slightly) delay leader election so that in-flight update requests succeed and we can preserve consistency.
|
||||
*
|
||||
* Jetty already allows a grace period for in-flight requests to complete and our solr cores, searchers etc
|
||||
* are reference counted to allow for graceful shutdown. So we don't worry about any other kind of requests.
|
||||
*
|
||||
* We do not need to unpause ever because the node is being shut down.
|
||||
*/
|
||||
getCores().parallelStream().forEach(solrCore -> {
|
||||
SolrCoreState solrCoreState = solrCore.getSolrCoreState();
|
||||
try {
|
||||
solrCoreState.pauseUpdatesAndAwaitInflightRequests();
|
||||
} catch (TimeoutException e) {
|
||||
log.warn("Timed out waiting for in-flight update requests to complete for core: {}", solrCore.getName());
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Interrupted while waiting for in-flight update requests to complete for core: {}", solrCore.getName());
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
zkSys.zkController.tryCancelAllElections();
|
||||
}
|
||||
|
||||
ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
|
||||
|
|
|
@ -86,6 +86,7 @@ import org.apache.solr.core.CoreContainer;
|
|||
import org.apache.solr.core.SolrConfig;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.ContentStreamHandlerBase;
|
||||
import org.apache.solr.handler.UpdateRequestHandler;
|
||||
import org.apache.solr.logging.MDCLoggingContext;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
|
@ -551,39 +552,57 @@ public class HttpSolrCall {
|
|||
remoteQuery(coreUrl + path, resp);
|
||||
return RETURN;
|
||||
case PROCESS:
|
||||
final Method reqMethod = Method.getMethod(req.getMethod());
|
||||
HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
|
||||
// unless we have been explicitly told not to, do cache validation
|
||||
// if we fail cache validation, execute the query
|
||||
if (config.getHttpCachingConfig().isNever304() ||
|
||||
!HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
|
||||
SolrQueryResponse solrRsp = new SolrQueryResponse();
|
||||
/*
|
||||
We track update requests so that we can preserve consistency by waiting for them to complete
|
||||
on a node shutdown and then immediately trigger a leader election without waiting for the core to close.
|
||||
See how the SolrCoreState#pauseUpdatesAndAwaitInflightRequests() method is used in CoreContainer#shutdown()
|
||||
|
||||
Also see https://issues.apache.org/jira/browse/SOLR-14942 for details on why we do not care for
|
||||
other kinds of requests.
|
||||
*/
|
||||
if (handler instanceof UpdateRequestHandler && !core.getSolrCoreState().registerInFlightUpdate()) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Updates are temporarily paused for core: " + core.getName());
|
||||
}
|
||||
try {
|
||||
final Method reqMethod = Method.getMethod(req.getMethod());
|
||||
HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
|
||||
// unless we have been explicitly told not to, do cache validation
|
||||
// if we fail cache validation, execute the query
|
||||
if (config.getHttpCachingConfig().isNever304() ||
|
||||
!HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
|
||||
SolrQueryResponse solrRsp = new SolrQueryResponse();
|
||||
/* even for HEAD requests, we need to execute the handler to
|
||||
* ensure we don't get an error (and to make sure the correct
|
||||
* QueryResponseWriter is selected and we get the correct
|
||||
* Content-Type)
|
||||
*/
|
||||
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
|
||||
mustClearSolrRequestInfo = true;
|
||||
execute(solrRsp);
|
||||
if (shouldAudit()) {
|
||||
EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
|
||||
if (shouldAudit(eventType)) {
|
||||
cores.getAuditLoggerPlugin().doAudit(
|
||||
new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
|
||||
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
|
||||
mustClearSolrRequestInfo = true;
|
||||
execute(solrRsp);
|
||||
if (shouldAudit()) {
|
||||
EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
|
||||
if (shouldAudit(eventType)) {
|
||||
cores.getAuditLoggerPlugin().doAudit(
|
||||
new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
|
||||
}
|
||||
}
|
||||
HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
|
||||
Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
|
||||
while (headers.hasNext()) {
|
||||
Map.Entry<String, String> entry = headers.next();
|
||||
resp.addHeader(entry.getKey(), entry.getValue());
|
||||
}
|
||||
QueryResponseWriter responseWriter = getResponseWriter();
|
||||
if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
|
||||
writeResponse(solrRsp, responseWriter, reqMethod);
|
||||
}
|
||||
HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
|
||||
Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
|
||||
while (headers.hasNext()) {
|
||||
Map.Entry<String, String> entry = headers.next();
|
||||
resp.addHeader(entry.getKey(), entry.getValue());
|
||||
return RETURN;
|
||||
} finally {
|
||||
if (handler instanceof UpdateRequestHandler) {
|
||||
// every registered request must also be de-registered
|
||||
core.getSolrCoreState().deregisterInFlightUpdate();
|
||||
}
|
||||
QueryResponseWriter responseWriter = getResponseWriter();
|
||||
if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
|
||||
writeResponse(solrRsp, responseWriter, reqMethod);
|
||||
}
|
||||
return RETURN;
|
||||
default: return action;
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
|
|
|
@ -16,10 +16,6 @@
|
|||
*/
|
||||
package org.apache.solr.update;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.solr.cloud.ActionThrottle;
|
||||
|
@ -33,6 +29,14 @@ import org.apache.solr.util.RefCounted;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
/**
|
||||
* The state in this class can be easily shared between SolrCores across
|
||||
* SolrCore reloads.
|
||||
|
@ -40,11 +44,37 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public abstract class SolrCoreState {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final int PAUSE_UPDATES_TIMEOUT_MILLIS = Integer.getInteger("solr.cloud.wait-for-updates-on-shutdown-millis", 2500);
|
||||
|
||||
protected boolean closed = false;
|
||||
private final Object updateLock = new Object();
|
||||
private final Object reloadLock = new Object();
|
||||
|
||||
|
||||
/**
|
||||
* If true then all update requests will be refused
|
||||
*/
|
||||
private final AtomicBoolean pauseUpdateRequests = new AtomicBoolean();
|
||||
|
||||
/**
|
||||
* Phaser is used to track in flight update requests and can be used
|
||||
* to wait for all in-flight requests to finish. A Phaser terminates
|
||||
* automatically when the number of registered parties reach zero.
|
||||
* Since we track requests with this phaser, we disable the automatic
|
||||
* termination by overriding the onAdvance method to return false.
|
||||
*
|
||||
* @see #registerInFlightUpdate()
|
||||
* @see #deregisterInFlightUpdate()
|
||||
* @see #pauseUpdatesAndAwaitInflightRequests()
|
||||
*/
|
||||
private final Phaser inflightUpdatesCounter = new Phaser() {
|
||||
@Override
|
||||
protected boolean onAdvance(int phase, int registeredParties) {
|
||||
// disable termination of phaser
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
public Object getUpdateLock() {
|
||||
return updateLock;
|
||||
}
|
||||
|
@ -86,7 +116,40 @@ public abstract class SolrCoreState {
|
|||
}
|
||||
return close;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Pauses all update requests to this core and waits (indefinitely) for all in-flight
|
||||
* update requests to finish
|
||||
*/
|
||||
public void pauseUpdatesAndAwaitInflightRequests() throws TimeoutException, InterruptedException {
|
||||
if (pauseUpdateRequests.compareAndSet(false, true)) {
|
||||
int arrivalNumber = inflightUpdatesCounter.register();
|
||||
assert arrivalNumber >= 0 : "Registration of in-flight request should have succeeded but got arrival phase number < 0";
|
||||
inflightUpdatesCounter.awaitAdvanceInterruptibly(inflightUpdatesCounter.arrive(), PAUSE_UPDATES_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers in-flight update requests to this core.
|
||||
*
|
||||
* @return true if request was registered, false if update requests are paused
|
||||
*/
|
||||
public boolean registerInFlightUpdate() {
|
||||
if (pauseUpdateRequests.get()) {
|
||||
return false;
|
||||
}
|
||||
inflightUpdatesCounter.register();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* De-registers in-flight update requests to this core (marks them as completed)
|
||||
*/
|
||||
public void deregisterInFlightUpdate() {
|
||||
int arrivalPhaseNumber = inflightUpdatesCounter.arriveAndDeregister();
|
||||
assert arrivalPhaseNumber >= 0 : "inflightUpdatesCounter should not have been terminated";
|
||||
}
|
||||
|
||||
public abstract Lock getCommitLock();
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue