mirror of https://github.com/apache/lucene.git
SOLR-3655: A restarted node can briefly appear live and active before it really is in some cases.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1444239 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3f511d259c
commit
e729e57a8e
|
@ -110,6 +110,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-4400: Deadlock can occur in a rare race between committing and
|
||||
closing a SolrIndexWriter. (Erick Erickson, Mark Miller)
|
||||
|
||||
* SOLR-3655: A restarted node can briefly appear live and active before it really
|
||||
is in some cases. (Mark Miller)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.net.InetAddress;
|
|||
import java.net.NetworkInterface;
|
||||
import java.net.URLEncoder;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
|
@ -31,6 +33,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -48,6 +51,8 @@ import org.apache.solr.common.cloud.DocCollection;
|
|||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.OnReconnect;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
|
@ -475,9 +480,10 @@ public final class ZkController {
|
|||
}
|
||||
|
||||
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
|
||||
registerAllCoresAsDown(registerOnReconnect, true);
|
||||
|
||||
boolean alreadyCreatedZkReader = false;
|
||||
try {
|
||||
alreadyCreatedZkReader = publishAndWaitForDownStates(alreadyCreatedZkReader);
|
||||
|
||||
// makes nodes zkNode
|
||||
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
|
||||
|
||||
|
@ -494,7 +500,10 @@ public final class ZkController {
|
|||
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
|
||||
overseerElector.setup(context);
|
||||
overseerElector.joinElection(context, false);
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
if (!alreadyCreatedZkReader) {
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
log.error("", e);
|
||||
|
@ -514,6 +523,78 @@ public final class ZkController {
|
|||
|
||||
}
|
||||
|
||||
private boolean publishAndWaitForDownStates(boolean alreadyCreatedZkReader)
|
||||
throws KeeperException, InterruptedException {
|
||||
if (zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, true)) {
|
||||
alreadyCreatedZkReader = true;
|
||||
// try and publish anyone from our node as down
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Set<String> collections = clusterState.getCollections();
|
||||
List<String> updatedNodes = new ArrayList<String>();
|
||||
for (String collectionName : collections) {
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
Collection<Slice> slices = collection.getSlices();
|
||||
for (Slice slice : slices) {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if (replica.getNodeName().equals(getNodeName())
|
||||
&& !(replica.getStr(ZkStateReader.STATE_PROP)
|
||||
.equals(ZkStateReader.DOWN))) {
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
|
||||
"state", ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
|
||||
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
|
||||
ZkStateReader.CORE_NAME_PROP, replica.getStr(ZkStateReader.CORE_NAME_PROP),
|
||||
ZkStateReader.ROLES_PROP,
|
||||
replica.getStr(ZkStateReader.ROLES_PROP),
|
||||
ZkStateReader.NODE_NAME_PROP, getNodeName(),
|
||||
ZkStateReader.SHARD_ID_PROP,
|
||||
replica.getStr(ZkStateReader.SHARD_ID_PROP),
|
||||
ZkStateReader.COLLECTION_PROP,
|
||||
replica.getStr(ZkStateReader.COLLECTION_PROP));
|
||||
updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
overseerJobQueue.offer(ZkStateReader.toJSON(m));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now wait till the updates are in our state
|
||||
long now = System.currentTimeMillis();
|
||||
long timeout = now + 1000 * 300;
|
||||
boolean foundStates = false;
|
||||
while (System.currentTimeMillis() < timeout) {
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
collections = clusterState.getCollections();
|
||||
for (String collectionName : collections) {
|
||||
DocCollection collection = clusterState
|
||||
.getCollection(collectionName);
|
||||
Collection<Slice> slices = collection.getSlices();
|
||||
for (Slice slice : slices) {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if (replica.getStr(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.DOWN)) {
|
||||
updatedNodes.remove(replica
|
||||
.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (updatedNodes.size() == 0) {
|
||||
foundStates = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!foundStates) {
|
||||
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
|
||||
}
|
||||
}
|
||||
return alreadyCreatedZkReader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if the chroot exists in zk (or if it is successfully created). Optionally, if create is set to true this method will create the path
|
||||
* in case it doesn't exist
|
||||
|
@ -903,7 +984,7 @@ public final class ZkController {
|
|||
ZkStateReader.NODE_NAME_PROP, getNodeName(),
|
||||
ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
|
||||
ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
|
||||
.getCollectionName(), ZkStateReader.STATE_PROP, state,
|
||||
.getCollectionName(),
|
||||
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
|
||||
: null);
|
||||
cd.getCloudDescriptor().lastPublished = state;
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -31,6 +32,11 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
|||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.junit.After;
|
||||
|
@ -180,16 +186,7 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase {
|
|||
// bring back dead node
|
||||
ChaosMonkey.start(deadJetty.jetty); // he is not the leader anymore
|
||||
|
||||
// give a moment to be sure it has started recovering
|
||||
Thread.sleep(2000);
|
||||
|
||||
waitForThingsToLevelOut(15);
|
||||
waitForRecoveriesToFinish(false);
|
||||
|
||||
Thread.sleep(3000);
|
||||
|
||||
waitForThingsToLevelOut(15);
|
||||
waitForRecoveriesToFinish(false);
|
||||
waitTillRecovered();
|
||||
|
||||
skipServers = getRandomOtherJetty(leaderJetty, null);
|
||||
skipServers.addAll( getRandomOtherJetty(leaderJetty, null));
|
||||
|
@ -241,6 +238,32 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
}
|
||||
|
||||
private void waitTillRecovered() throws Exception {
|
||||
for (int i = 0; i < 30; i++) {
|
||||
Thread.sleep(1000);
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
zkStateReader.updateClusterState(true);
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
DocCollection collection1 = clusterState.getCollection("collection1");
|
||||
Slice slice = collection1.getSlice("shard1");
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
boolean allActive = true;
|
||||
for (Replica replica : replicas) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName())
|
||||
|| !replica.get(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE)) {
|
||||
allActive = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (allActive) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
printLayout();
|
||||
fail("timeout waiting to see recovered node");
|
||||
}
|
||||
|
||||
private String waitTillInconsistent() throws Exception, InterruptedException {
|
||||
String shardFailMessage = null;
|
||||
|
||||
|
|
Loading…
Reference in New Issue