diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 46c397897d6..12a939ded17 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -110,6 +110,9 @@ Bug Fixes * SOLR-4400: Deadlock can occur in a rare race between committing and closing a SolrIndexWriter. (Erick Erickson, Mark Miller) + +* SOLR-3655: A restarted node can briefly appear live and active before it really + is in some cases. (Mark Miller) Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index ac388b9824c..8b70a30ad16 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -24,6 +24,8 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.net.URLEncoder; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; @@ -31,6 +33,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; @@ -48,6 +51,8 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.OnReconnect; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.solr.common.cloud.ZkCoreNodeProps; @@ -475,9 +480,10 @@ public final class ZkController { } private void init(CurrentCoreDescriptorProvider registerOnReconnect) { - registerAllCoresAsDown(registerOnReconnect, true); - + boolean alreadyCreatedZkReader = false; try { + alreadyCreatedZkReader = publishAndWaitForDownStates(alreadyCreatedZkReader); + // makes nodes zkNode cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient); @@ -494,7 +500,10 @@ public final class ZkController { ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); overseerElector.setup(context); overseerElector.joinElection(context, false); - zkStateReader.createClusterStateWatchersAndUpdate(); + + if (!alreadyCreatedZkReader) { + zkStateReader.createClusterStateWatchersAndUpdate(); + } } catch (IOException e) { log.error("", e); @@ -514,6 +523,78 @@ public final class ZkController { } + private boolean publishAndWaitForDownStates(boolean alreadyCreatedZkReader) + throws KeeperException, InterruptedException { + if (zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, true)) { + alreadyCreatedZkReader = true; + // try and publish anyone from our node as down + zkStateReader.createClusterStateWatchersAndUpdate(); + ClusterState clusterState = zkStateReader.getClusterState(); + Set collections = clusterState.getCollections(); + List updatedNodes = new ArrayList(); + for (String collectionName : collections) { + DocCollection collection = clusterState.getCollection(collectionName); + Collection slices = collection.getSlices(); + for (Slice slice : slices) { + Collection replicas = slice.getReplicas(); + for (Replica replica : replicas) { + if (replica.getNodeName().equals(getNodeName()) + && !(replica.getStr(ZkStateReader.STATE_PROP) + .equals(ZkStateReader.DOWN))) { + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, + "state", ZkStateReader.STATE_PROP, ZkStateReader.DOWN, + ZkStateReader.BASE_URL_PROP, getBaseUrl(), + ZkStateReader.CORE_NAME_PROP, replica.getStr(ZkStateReader.CORE_NAME_PROP), + ZkStateReader.ROLES_PROP, + replica.getStr(ZkStateReader.ROLES_PROP), + ZkStateReader.NODE_NAME_PROP, getNodeName(), + ZkStateReader.SHARD_ID_PROP, + replica.getStr(ZkStateReader.SHARD_ID_PROP), + ZkStateReader.COLLECTION_PROP, + replica.getStr(ZkStateReader.COLLECTION_PROP)); + updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP)); + overseerJobQueue.offer(ZkStateReader.toJSON(m)); + } + } + } + } + + // now wait till the updates are in our state + long now = System.currentTimeMillis(); + long timeout = now + 1000 * 300; + boolean foundStates = false; + while (System.currentTimeMillis() < timeout) { + clusterState = zkStateReader.getClusterState(); + collections = clusterState.getCollections(); + for (String collectionName : collections) { + DocCollection collection = clusterState + .getCollection(collectionName); + Collection slices = collection.getSlices(); + for (Slice slice : slices) { + Collection replicas = slice.getReplicas(); + for (Replica replica : replicas) { + if (replica.getStr(ZkStateReader.STATE_PROP).equals( + ZkStateReader.DOWN)) { + updatedNodes.remove(replica + .getStr(ZkStateReader.CORE_NAME_PROP)); + + } + } + } + } + + if (updatedNodes.size() == 0) { + foundStates = true; + break; + } + } + if (!foundStates) { + log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); + } + } + return alreadyCreatedZkReader; + } + /** * Validates if the chroot exists in zk (or if it is successfully created). Optionally, if create is set to true this method will create the path * in case it doesn't exist @@ -903,7 +984,7 @@ public final class ZkController { ZkStateReader.NODE_NAME_PROP, getNodeName(), ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(), ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor() - .getCollectionName(), ZkStateReader.STATE_PROP, state, + .getCollectionName(), ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString() : null); cd.getCloudDescriptor().lastPublished = state; diff --git a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java index 7101ddce979..f962567fdb8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java @@ -19,6 +19,7 @@ package org.apache.solr.cloud; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -31,6 +32,11 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.ModifiableSolrParams; import org.junit.After; @@ -180,16 +186,7 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase { // bring back dead node ChaosMonkey.start(deadJetty.jetty); // he is not the leader anymore - // give a moment to be sure it has started recovering - Thread.sleep(2000); - - waitForThingsToLevelOut(15); - waitForRecoveriesToFinish(false); - - Thread.sleep(3000); - - waitForThingsToLevelOut(15); - waitForRecoveriesToFinish(false); + waitTillRecovered(); skipServers = getRandomOtherJetty(leaderJetty, null); skipServers.addAll( getRandomOtherJetty(leaderJetty, null)); @@ -241,6 +238,32 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase { } + private void waitTillRecovered() throws Exception { + for (int i = 0; i < 30; i++) { + Thread.sleep(1000); + ZkStateReader zkStateReader = cloudClient.getZkStateReader(); + zkStateReader.updateClusterState(true); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection collection1 = clusterState.getCollection("collection1"); + Slice slice = collection1.getSlice("shard1"); + Collection replicas = slice.getReplicas(); + boolean allActive = true; + for (Replica replica : replicas) { + if (!clusterState.liveNodesContain(replica.getNodeName()) + || !replica.get(ZkStateReader.STATE_PROP).equals( + ZkStateReader.ACTIVE)) { + allActive = false; + break; + } + } + if (allActive) { + return; + } + } + printLayout(); + fail("timeout waiting to see recovered node"); + } + private String waitTillInconsistent() throws Exception, InterruptedException { String shardFailMessage = null;