SOLR-3782: A leader going down while updates are coming in can cause shard inconsistency.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1380974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-09-05 04:03:55 +00:00
parent 99c26e9955
commit 678e35aa58
10 changed files with 100 additions and 103 deletions

View File

@ -238,10 +238,9 @@ public class JettySolrRunner {
server.getServer().stop();
server.stop();
if (threadPool instanceof QueuedThreadPool) {
((QueuedThreadPool) threadPool).setMaxStopTimeMs(15000);
((QueuedThreadPool) threadPool).stop();
((QueuedThreadPool) threadPool).stop();
((QueuedThreadPool) threadPool).setMaxStopTimeMs(30000);
((QueuedThreadPool) threadPool).stop();
((QueuedThreadPool) threadPool).join();
}
//server.destroy();
if (server.getState().equals(Server.FAILED)) {

View File

@ -452,13 +452,25 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
SolrException.log(log, "Recovery failed - interrupted. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
SolrException.log(log, "Recovery failed - interrupted. core="
+ coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} catch (Throwable t) {
SolrException.log(log,
"Could not publish that recovery failed", t);
}
} else {
SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
SolrException.log(log,
"Recovery failed - max retries exceeded. core=" + coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} catch (Throwable t) {
SolrException.log(log,
"Could not publish that recovery failed", t);
}
}
break;
}

View File

@ -48,6 +48,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.Config;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
@ -192,6 +193,8 @@ public final class ZkController {
String adminPath;
shardHandler = cc.getShardHandlerFactory().getShardHandler();
adminPath = cc.getAdminPath();
ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
cc.newCmdDistribExecutor();
ZkController.this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
overseerElector.joinElection(context);
@ -234,44 +237,6 @@ public final class ZkController {
});
zkClient.getZkClientConnectionStrategy().addDisconnectedListener(new ZkClientConnectionStrategy.DisconnectedListener() {
@Override
public void disconnected() {
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
// re register all descriptors
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
descriptor.getCloudDescriptor().isLeader = false;
}
}
}
});
zkClient.getZkClientConnectionStrategy().addConnectedListener(new ZkClientConnectionStrategy.ConnectedListener() {
@Override
public void connected() {
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
String leaderUrl;
try {
leaderUrl = getLeaderProps(cloudDesc.getCollectionName(), cloudDesc.getShardId())
.getCoreUrl();
} catch (InterruptedException e) {
throw new RuntimeException();
}
String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), descriptor.getName());
boolean isLeader = leaderUrl.equals(ourUrl);
log.info("SolrCore connected to ZooKeeper - we are " + ourUrl + " and leader is " + leaderUrl);
cloudDesc.isLeader = isLeader;
}
}
}
});
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
cmdExecutor = new ZkCmdExecutor();
@ -296,6 +261,7 @@ public final class ZkController {
final String coreZkNodeName = getNodeName() + "_"
+ descriptor.getName();
try {
descriptor.getCloudDescriptor().isLeader = false;
publish(descriptor, ZkStateReader.DOWN);
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
@ -309,17 +275,6 @@ public final class ZkController {
* Closes the underlying ZooKeeper client.
*/
public void close() {
try {
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
// we don't retry if there is a problem - count on ephem timeout
zkClient.delete(nodePath, -1, false);
} catch (KeeperException.NoNodeException e) {
// fine
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (KeeperException e) {
SolrException.log(log, "Error trying to remove our ephem live node", e);
}
for (ElectionContext context : electionContexts.values()) {
context.close();

View File

@ -145,7 +145,7 @@ public class CoreContainer
private Map<SolrCore,String> coreToOrigName = new ConcurrentHashMap<SolrCore,String>();
private String leaderVoteWait;
private ThreadPoolExecutor cmdDistribExecutor;
private volatile ThreadPoolExecutor cmdDistribExecutor;
{
log.info("New CoreContainer " + System.identityHashCode(this));
@ -190,9 +190,7 @@ public class CoreContainer
}
protected void initZooKeeper(String zkHost, int zkClientTimeout) {
cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
newCmdDistribExecutor();
// if zkHost sys property is not set, we are not using ZooKeeper
String zookeeperHost;
@ -296,6 +294,12 @@ public class CoreContainer
}
public void newCmdDistribExecutor() {
cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
}
// may return null if not in zk mode
public ThreadPoolExecutor getCmdDistribExecutor() {
return cmdDistribExecutor;
@ -600,6 +604,14 @@ public class CoreContainer
log.info("Shutting down CoreContainer instance="+System.identityHashCode(this));
isShutDown = true;
if (cmdDistribExecutor != null) {
try {
ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
}
}
if (isZooKeeperAware()) {
cancelCoreRecoveries();
}
@ -618,13 +630,7 @@ public class CoreContainer
if (shardHandlerFactory != null) {
shardHandlerFactory.close();
}
if (cmdDistribExecutor != null) {
try {
ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
}
}
// we want to close zk stuff last
if(zkController != null) {
zkController.close();

View File

@ -138,7 +138,7 @@ public class SolrDispatchFilter implements Filter
}
if (this.cores == null) {
((HttpServletResponse)response).sendError( 403, "Server is shutting down" );
((HttpServletResponse)response).sendError( 503, "Server is shutting down" );
return;
}
CoreContainer cores = this.cores;

View File

@ -40,12 +40,12 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequestExt;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.AdjustableSemaphore;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -323,6 +323,12 @@ public class SolrCmdDistributor {
HttpSolrServer server = new HttpSolrServer(fullUrl,
client);
if (Thread.currentThread().isInterrupted()) {
clonedRequest.rspCode = 503;
clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down.");
return clonedRequest;
}
clonedRequest.ursp = server.request(clonedRequest.ureq);
// currently no way to get the request body.

View File

@ -251,29 +251,30 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
throw new SolrException(ErrorCode.BAD_REQUEST, "Request says it is coming from leader, but we are the leader");
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
}
if (DistribPhase.FROMLEADER == phase && from != null) { // from will be null on log replay
ZkCoreNodeProps clusterStateLeader = new ZkCoreNodeProps(zkController
.getClusterState().getLeader(collection, shardId));
if (clusterStateLeader.getNodeProps() == null
|| !clusterStateLeader.getCoreUrl().equals(from)) {
String coreUrl = null;
if (clusterStateLeader.getNodeProps() != null) {
coreUrl = clusterStateLeader.getCoreUrl();
}
log.error("We got a request from the leader, but it's not who our cluster state says is the leader :"
+ req.getParamString()
+ " : "
+ coreUrl);
new SolrException(ErrorCode.BAD_REQUEST, "We got a request from the leader, but it's not who our cluster state says is the leader.");
}
}
// this is too restrictive - cluster state can be stale - can cause shard inconsistency
// if (DistribPhase.FROMLEADER == phase && from != null) { // from will be null on log replay
//
// ZkCoreNodeProps clusterStateLeader = new ZkCoreNodeProps(zkController
// .getClusterState().getLeader(collection, shardId));
//
// if (clusterStateLeader.getNodeProps() == null
// || !clusterStateLeader.getCoreUrl().equals(from)) {
// String coreUrl = null;
// if (clusterStateLeader.getNodeProps() != null) {
// coreUrl = clusterStateLeader.getCoreUrl();
// }
// log.error("We got a request from the leader, but it's not who our cluster state says is the leader :"
// + req.getParamString()
// + " : "
// + coreUrl);
//
// new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "We got a request from the leader, but it's not who our cluster state says is the leader.");
// }
//
// }
}
@ -348,11 +349,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ModifiableSolrParams params = null;
if (nodes != null) {
if (isLeader && !req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
log.error("Abort sending request to replicas, we are no longer leader");
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Abort sending request to replicas, we are no longer leader");
}
params = new ModifiableSolrParams(req.getParams());
params.set(DISTRIB_UPDATE_PARAM,
(isLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
if (isLeader) {
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
}
params.remove("commit"); // this will be distributed from the local commit
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
@ -682,6 +692,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ModifiableSolrParams params = null;
if (nodes != null) {
if (isLeader && !req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
log.error("Abort sending request to replicas, we are no longer leader");
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Abort sending request to replicas, we are no longer leader");
}
params = new ModifiableSolrParams(req.getParams());
params.set(DISTRIB_UPDATE_PARAM,
(isLeader ?
@ -851,7 +866,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (leaderLogic && replicas != null) {
if (!req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
log.error("Abort sending request to replicas, we are no longer leader");
throw new SolrException(ErrorCode.BAD_REQUEST, "Abort sending request to replicas, we are no longer leader");
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Abort sending request to replicas, we are no longer leader");
}
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));

View File

@ -135,10 +135,12 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
indexThread.join();
}
// fails will happen...
// for (StopableIndexingThread indexThread : threads) {
// assertEquals(0, indexThread.getFails());
// }
// we expect full throttle fails, but not cloud client...
for (StopableThread indexThread : threads) {
if (indexThread instanceof StopableIndexingThread && !(indexThread instanceof FullThrottleStopableIndexingThread)) {
assertEquals(0, ((StopableIndexingThread) indexThread).getFails());
}
}
// try and wait for any replications and what not to finish...

View File

@ -30,17 +30,20 @@ public class ExecutorUtil {
public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
pool.shutdownNow(); // Cancel currently executing tasks
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
SolrException.log(log, "Executor still has running tasks.");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
try {
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
SolrException.log(log, "Executor still has running tasks.");
} catch (InterruptedException e) {
}
// Preserve interrupt status
Thread.currentThread().interrupt();
}

View File

@ -126,7 +126,7 @@ public class ChaosMonkey {
if (cores != null) {
SolrZkClient zkClient = cores.getZkController().getZkClient();
// must be at least double tick time...
zkClient.getSolrZooKeeper().pauseCnxn(ZkTestServer.TICK_TIME * 2);
zkClient.getSolrZooKeeper().pauseCnxn(ZkTestServer.TICK_TIME * 2 + 200);
}
}
}
@ -381,7 +381,6 @@ public class ChaosMonkey {
if (causeConnectionLoss && rnd < CONLOSS_PERCENT) {
randomConnectionLoss();
randomConnectionLoss();
}
CloudJettyRunner cjetty;