From 678e35aa588d99e6a8b8e3664d714634be21deb6 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Wed, 5 Sep 2012 04:03:55 +0000 Subject: [PATCH] SOLR-3782: A leader going down while updates are coming in can cause shard inconsistency. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1380974 13f79535-47bb-0310-9956-ffa450edef68 --- .../solrj/embedded/JettySolrRunner.java | 5 +- .../apache/solr/cloud/RecoveryStrategy.java | 24 ++++++-- .../org/apache/solr/cloud/ZkController.java | 53 ++--------------- .../org/apache/solr/core/CoreContainer.java | 28 +++++---- .../solr/servlet/SolrDispatchFilter.java | 2 +- .../solr/update/SolrCmdDistributor.java | 8 ++- .../processor/DistributedUpdateProcessor.java | 59 ++++++++++++------- .../cloud/ChaosMonkeyNothingIsSafeTest.java | 10 ++-- .../apache/solr/common/util/ExecutorUtil.java | 11 ++-- .../org/apache/solr/cloud/ChaosMonkey.java | 3 +- 10 files changed, 100 insertions(+), 103 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java index b5f4050f6a4..0f52abf4e02 100644 --- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java +++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java @@ -238,10 +238,9 @@ public class JettySolrRunner { server.getServer().stop(); server.stop(); if (threadPool instanceof QueuedThreadPool) { - ((QueuedThreadPool) threadPool).setMaxStopTimeMs(15000); - ((QueuedThreadPool) threadPool).stop(); - ((QueuedThreadPool) threadPool).stop(); + ((QueuedThreadPool) threadPool).setMaxStopTimeMs(30000); ((QueuedThreadPool) threadPool).stop(); + ((QueuedThreadPool) threadPool).join(); } //server.destroy(); if (server.getState().equals(Server.FAILED)) { diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 69f8577ae09..737c8d92783 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -452,13 +452,25 @@ public class RecoveryStrategy extends Thread implements ClosableThread { retries++; if (retries >= MAX_RETRIES) { if (retries == INTERRUPTED) { - SolrException.log(log, "Recovery failed - interrupted. core=" + coreName); - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, - core.getCoreDescriptor()); + SolrException.log(log, "Recovery failed - interrupted. core=" + + coreName); + try { + recoveryFailed(core, zkController, baseUrl, coreZkNodeName, + core.getCoreDescriptor()); + } catch (Throwable t) { + SolrException.log(log, + "Could not publish that recovery failed", t); + } } else { - SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName); - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, - core.getCoreDescriptor()); + SolrException.log(log, + "Recovery failed - max retries exceeded. core=" + coreName); + try { + recoveryFailed(core, zkController, baseUrl, coreZkNodeName, + core.getCoreDescriptor()); + } catch (Throwable t) { + SolrException.log(log, + "Could not publish that recovery failed", t); + } } break; } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index d02227a3c3b..6b3e3567748 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -48,6 +48,7 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.core.Config; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; @@ -192,6 +193,8 @@ public final class ZkController { String adminPath; shardHandler = cc.getShardHandlerFactory().getShardHandler(); adminPath = cc.getAdminPath(); + ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor()); + cc.newCmdDistribExecutor(); ZkController.this.overseer = new Overseer(shardHandler, adminPath, zkStateReader); ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); overseerElector.joinElection(context); @@ -234,44 +237,6 @@ public final class ZkController { }); - zkClient.getZkClientConnectionStrategy().addDisconnectedListener(new ZkClientConnectionStrategy.DisconnectedListener() { - - @Override - public void disconnected() { - List descriptors = registerOnReconnect.getCurrentDescriptors(); - // re register all descriptors - if (descriptors != null) { - for (CoreDescriptor descriptor : descriptors) { - descriptor.getCloudDescriptor().isLeader = false; - } - } - } - }); - - zkClient.getZkClientConnectionStrategy().addConnectedListener(new ZkClientConnectionStrategy.ConnectedListener() { - - @Override - public void connected() { - List descriptors = registerOnReconnect.getCurrentDescriptors(); - if (descriptors != null) { - for (CoreDescriptor descriptor : descriptors) { - CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); - String leaderUrl; - try { - leaderUrl = getLeaderProps(cloudDesc.getCollectionName(), cloudDesc.getShardId()) - .getCoreUrl(); - } catch (InterruptedException e) { - throw new RuntimeException(); - } - String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), descriptor.getName()); - boolean isLeader = leaderUrl.equals(ourUrl); - log.info("SolrCore connected to ZooKeeper - we are " + ourUrl + " and leader is " + leaderUrl); - cloudDesc.isLeader = isLeader; - } - } - } - }); - this.overseerJobQueue = Overseer.getInQueue(zkClient); this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient); cmdExecutor = new ZkCmdExecutor(); @@ -296,6 +261,7 @@ public final class ZkController { final String coreZkNodeName = getNodeName() + "_" + descriptor.getName(); try { + descriptor.getCloudDescriptor().isLeader = false; publish(descriptor, ZkStateReader.DOWN); waitForLeaderToSeeDownState(descriptor, coreZkNodeName); } catch (Exception e) { @@ -309,17 +275,6 @@ public final class ZkController { * Closes the underlying ZooKeeper client. */ public void close() { - try { - String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName; - // we don't retry if there is a problem - count on ephem timeout - zkClient.delete(nodePath, -1, false); - } catch (KeeperException.NoNodeException e) { - // fine - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (KeeperException e) { - SolrException.log(log, "Error trying to remove our ephem live node", e); - } for (ElectionContext context : electionContexts.values()) { context.close(); diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index a4b6ebeac7b..605dee7bbb0 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -145,7 +145,7 @@ public class CoreContainer private Map coreToOrigName = new ConcurrentHashMap(); private String leaderVoteWait; - private ThreadPoolExecutor cmdDistribExecutor; + private volatile ThreadPoolExecutor cmdDistribExecutor; { log.info("New CoreContainer " + System.identityHashCode(this)); @@ -190,9 +190,7 @@ public class CoreContainer } protected void initZooKeeper(String zkHost, int zkClientTimeout) { - cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, - TimeUnit.SECONDS, new SynchronousQueue(), - new DefaultSolrThreadFactory("cmdDistribExecutor")); + newCmdDistribExecutor(); // if zkHost sys property is not set, we are not using ZooKeeper String zookeeperHost; @@ -296,6 +294,12 @@ public class CoreContainer } + public void newCmdDistribExecutor() { + cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, + TimeUnit.SECONDS, new SynchronousQueue(), + new DefaultSolrThreadFactory("cmdDistribExecutor")); + } + // may return null if not in zk mode public ThreadPoolExecutor getCmdDistribExecutor() { return cmdDistribExecutor; @@ -600,6 +604,14 @@ public class CoreContainer log.info("Shutting down CoreContainer instance="+System.identityHashCode(this)); isShutDown = true; + if (cmdDistribExecutor != null) { + try { + ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor); + } catch (Throwable e) { + SolrException.log(log, e); + } + } + if (isZooKeeperAware()) { cancelCoreRecoveries(); } @@ -618,13 +630,7 @@ public class CoreContainer if (shardHandlerFactory != null) { shardHandlerFactory.close(); } - if (cmdDistribExecutor != null) { - try { - ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor); - } catch (Throwable e) { - SolrException.log(log, e); - } - } + // we want to close zk stuff last if(zkController != null) { zkController.close(); diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index bd1021770b0..e59fa526efc 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -138,7 +138,7 @@ public class SolrDispatchFilter implements Filter } if (this.cores == null) { - ((HttpServletResponse)response).sendError( 403, "Server is shutting down" ); + ((HttpServletResponse)response).sendError( 503, "Server is shutting down" ); return; } CoreContainer cores = this.cores; diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 9f9adab283f..bf1ef624e07 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -40,12 +40,12 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequestExt; import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.SolrCore; import org.apache.solr.util.AdjustableSemaphore; -import org.apache.solr.util.DefaultSolrThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -323,6 +323,12 @@ public class SolrCmdDistributor { HttpSolrServer server = new HttpSolrServer(fullUrl, client); + if (Thread.currentThread().isInterrupted()) { + clonedRequest.rspCode = 503; + clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down."); + return clonedRequest; + } + clonedRequest.ursp = server.request(clonedRequest.ureq); // currently no way to get the request body. diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 81c2b138751..3779040c6ce 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -251,29 +251,30 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader(); if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString()); - throw new SolrException(ErrorCode.BAD_REQUEST, "Request says it is coming from leader, but we are the leader"); + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader"); } - if (DistribPhase.FROMLEADER == phase && from != null) { // from will be null on log replay - - ZkCoreNodeProps clusterStateLeader = new ZkCoreNodeProps(zkController - .getClusterState().getLeader(collection, shardId)); - - if (clusterStateLeader.getNodeProps() == null - || !clusterStateLeader.getCoreUrl().equals(from)) { - String coreUrl = null; - if (clusterStateLeader.getNodeProps() != null) { - coreUrl = clusterStateLeader.getCoreUrl(); - } - log.error("We got a request from the leader, but it's not who our cluster state says is the leader :" - + req.getParamString() - + " : " - + coreUrl); - - new SolrException(ErrorCode.BAD_REQUEST, "We got a request from the leader, but it's not who our cluster state says is the leader."); - } - - } + // this is too restrictive - cluster state can be stale - can cause shard inconsistency +// if (DistribPhase.FROMLEADER == phase && from != null) { // from will be null on log replay +// +// ZkCoreNodeProps clusterStateLeader = new ZkCoreNodeProps(zkController +// .getClusterState().getLeader(collection, shardId)); +// +// if (clusterStateLeader.getNodeProps() == null +// || !clusterStateLeader.getCoreUrl().equals(from)) { +// String coreUrl = null; +// if (clusterStateLeader.getNodeProps() != null) { +// coreUrl = clusterStateLeader.getCoreUrl(); +// } +// log.error("We got a request from the leader, but it's not who our cluster state says is the leader :" +// + req.getParamString() +// + " : " +// + coreUrl); +// +// new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "We got a request from the leader, but it's not who our cluster state says is the leader."); +// } +// +// } } @@ -348,11 +349,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { ModifiableSolrParams params = null; if (nodes != null) { + if (isLeader && !req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) { + log.error("Abort sending request to replicas, we are no longer leader"); + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Abort sending request to replicas, we are no longer leader"); + } + params = new ModifiableSolrParams(req.getParams()); params.set(DISTRIB_UPDATE_PARAM, (isLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); + if (isLeader) { + params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + } params.remove("commit"); // this will be distributed from the local commit params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); @@ -682,6 +692,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { ModifiableSolrParams params = null; if (nodes != null) { + if (isLeader && !req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) { + log.error("Abort sending request to replicas, we are no longer leader"); + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Abort sending request to replicas, we are no longer leader"); + } + params = new ModifiableSolrParams(req.getParams()); params.set(DISTRIB_UPDATE_PARAM, (isLeader ? @@ -851,7 +866,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (leaderLogic && replicas != null) { if (!req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) { log.error("Abort sending request to replicas, we are no longer leader"); - throw new SolrException(ErrorCode.BAD_REQUEST, "Abort sending request to replicas, we are no longer leader"); + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Abort sending request to replicas, we are no longer leader"); } ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); params.set(VERSION_FIELD, Long.toString(cmd.getVersion())); diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 2987fa56f8b..c4c703224dd 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -135,10 +135,12 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase indexThread.join(); } - // fails will happen... - // for (StopableIndexingThread indexThread : threads) { - // assertEquals(0, indexThread.getFails()); - // } + // we expect full throttle fails, but not cloud client... + for (StopableThread indexThread : threads) { + if (indexThread instanceof StopableIndexingThread && !(indexThread instanceof FullThrottleStopableIndexingThread)) { + assertEquals(0, ((StopableIndexingThread) indexThread).getFails()); + } + } // try and wait for any replications and what not to finish... diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index 6ff31875f84..a43f9a3ab8b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -30,17 +30,20 @@ public class ExecutorUtil { public static void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted + pool.shutdownNow(); // Cancel currently executing tasks try { // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) SolrException.log(log, "Executor still has running tasks."); - } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); + try { + if (!pool.awaitTermination(60, TimeUnit.SECONDS)) + SolrException.log(log, "Executor still has running tasks."); + } catch (InterruptedException e) { + + } // Preserve interrupt status Thread.currentThread().interrupt(); } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java index 8bf3a0e8bce..94bb2b33024 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java @@ -126,7 +126,7 @@ public class ChaosMonkey { if (cores != null) { SolrZkClient zkClient = cores.getZkController().getZkClient(); // must be at least double tick time... - zkClient.getSolrZooKeeper().pauseCnxn(ZkTestServer.TICK_TIME * 2); + zkClient.getSolrZooKeeper().pauseCnxn(ZkTestServer.TICK_TIME * 2 + 200); } } } @@ -381,7 +381,6 @@ public class ChaosMonkey { if (causeConnectionLoss && rnd < CONLOSS_PERCENT) { randomConnectionLoss(); - randomConnectionLoss(); } CloudJettyRunner cjetty;