SOLR-3075: Overseer does not check cloudstate for previously assigned shardId but generates a new one

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1237803 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-01-30 17:16:54 +00:00
parent 6a2652a331
commit 76fc82a199
2 changed files with 176 additions and 36 deletions

View File

@ -176,20 +176,24 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
return false;
}
/**
* Try to assign core to the cluster
* Try to assign core to the cluster.
* @throws KeeperException
* @throws InterruptedException
*/
private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
String collection = coreState.getCollectionName();
String zkCoreNodeName = coreState.getCoreNodeName();
String shardId;
if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
shardId = AssignShard.assignShard(collection, state);
} else {
shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
}
// use the provided non null shardId
String shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
if(shardId==null) {
//use shardId from CloudState
shardId = getAssignedId(state, nodeName, coreState);
}
if(shardId==null) {
//request new shardId
shardId = AssignShard.assignShard(collection, state);
}
Map<String,String> props = new HashMap<String,String>();
for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
@ -209,6 +213,23 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
CloudState newCloudState = updateSlice(state, collection, slice);
return newCloudState;
}
/*
* Return an already assigned id or null if not assigned
*/
private String getAssignedId(final CloudState state, final String nodeName,
final CoreState coreState) {
final String key = coreState.getProperties().get(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getProperties().get(ZkStateReader.CORE_NAME_PROP);
Map<String, Slice> slices = state.getSlices(coreState.getCollectionName());
if (slices != null) {
for (Slice slice : slices.values()) {
if (slice.getShards().get(key) != null) {
return slice.getName();
}
}
}
return null;
}
private CloudState updateSlice(CloudState state, String collection, Slice slice) {
@ -480,6 +501,7 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
Set<String> downNodes = complement(oldLiveNodes, liveNodes);
for(String node: downNodes) {
NodeStateWatcher watcher = nodeStateWatches.remove(node);
log.debug("Removed NodeStateWatcher for node:" + node);
}
}
@ -491,6 +513,7 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
if (!nodeStateWatches.containsKey(nodeName)) {
zkCmdExecutor.ensureExists(path, zkClient);
nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
log.debug("Added NodeStateWatcher for node " + nodeName);
} else {
log.debug("watch already added");
}

View File

@ -40,6 +40,7 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
import org.junit.BeforeClass;
import org.junit.Test;
@ -49,6 +50,54 @@ public class OverseerTest extends SolrTestCaseJ4 {
private static final boolean DEBUG = false;
private static class MockZKController{
private final SolrZkClient zkClient;
private final String nodeName;
public MockZKController(String zkAddress, String nodeName) throws InterruptedException, TimeoutException, IOException, KeeperException {
this.nodeName = nodeName;
zkClient = new SolrZkClient(zkAddress, TIMEOUT);
Overseer.createClientNodes(zkClient, nodeName);
// live node
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
}
private void deleteNode(final String path) {
try {
Stat stat = zkClient.exists(path, null, false);
zkClient.delete(path, stat.getVersion(), false);
} catch (KeeperException e) {
fail("Unexpected KeeperException!" + e);
} catch (InterruptedException e) {
fail("Unexpected InterruptedException!" + e);
}
}
public void close(){
try {
deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1");
zkClient.close();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void publishState(String coreName, String stateName) throws KeeperException, InterruptedException{
HashMap<String,String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.STATE_PROP, stateName);
coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName);
coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
CoreState state = new CoreState(coreName, "collection1", coreProps);
final String statePath = Overseer.STATES_NODE + "/" + nodeName;
zkClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}), true);
}
}
@BeforeClass
public static void beforeClass() throws Exception {
initCore();
@ -438,11 +487,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
@ -450,45 +499,35 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
Overseer.createClientNodes(controllerClient, "node1");
mockController = new MockZKController(server.getZkAddress(), "node1");
overseerClient = electNewOverseer(server.getZkAddress());
// live node
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
controllerClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
HashMap<String,String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1");
CoreState state = new CoreState("core1", "collection1", coreProps);
final String statePath = Overseer.STATES_NODE + "/node1";
// publish node state (recovering)
controllerClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}), true);
mockController.publishState("core1", ZkStateReader.RECOVERING);
// wait overseer assignment
waitForSliceCount(reader, "collection1", 1);
verifyStatus(reader, ZkStateReader.RECOVERING);
// publish node state (active)
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
state = new CoreState("core1", "collection1", coreProps);
controllerClient.setData(statePath,
ZkStateReader.toJSON(new CoreState[] {state}), true);
int version = getCloudStateVersion(controllerClient);
mockController.publishState("core1", ZkStateReader.ACTIVE);
while(version == getCloudStateVersion(controllerClient));
verifyStatus(reader, ZkStateReader.ACTIVE);
version = getCloudStateVersion(controllerClient);
overseerClient.close();
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
state = new CoreState("core1", "collection1", coreProps);
controllerClient.setData(statePath,
ZkStateReader.toJSON(new CoreState[] {state}), true);
Thread.sleep(1000); //wait for overseer to get killed
overseerClient = electNewOverseer(server.getZkAddress());
mockController.publishState("core1", ZkStateReader.RECOVERING);
version = getCloudStateVersion(controllerClient);
overseerClient = electNewOverseer(server.getZkAddress());
while(version == getCloudStateVersion(controllerClient));
verifyStatus(reader, ZkStateReader.RECOVERING);
assertEquals("Live nodes count does not match", 1, reader.getCloudState()
@ -497,6 +536,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
.getSlice("collection1", "shard1").getShards().size());
} finally {
if (mockController != null) {
mockController.close();
}
if (overseerClient != null) {
overseerClient.close();
}
@ -509,6 +552,80 @@ public class OverseerTest extends SolrTestCaseJ4 {
server.shutdown();
}
}
@Test
public void testDoubleAssignment() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "2");
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1");
overseerClient = electNewOverseer(server.getZkAddress());
mockController.publishState("core1", ZkStateReader.RECOVERING);
// wait overseer assignment
waitForSliceCount(reader, "collection1", 1);
verifyStatus(reader, ZkStateReader.RECOVERING);
mockController.close();
int version = getCloudStateVersion(controllerClient);
mockController = new MockZKController(server.getZkAddress(), "node1");
mockController.publishState("core1", ZkStateReader.RECOVERING);
while (version == getCloudStateVersion(controllerClient));
reader.updateCloudState(true);
CloudState state = reader.getCloudState();
assertEquals("more than 1 shard id was assigned to same core", 1, state.getSlices("collection1").size());
} finally {
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
if (overseerClient != null) {
overseerClient.close();
}
if (mockController != null) {
mockController.close();
}
if (controllerClient != null) {
controllerClient.close();
}
if (reader != null) {
reader.close();
}
server.shutdown();
}
}
private int getCloudStateVersion(SolrZkClient controllerClient)
throws KeeperException, InterruptedException {
return controllerClient.exists(ZkStateReader.CLUSTER_STATE, null, false).getVersion();
}
private SolrZkClient electNewOverseer(String address) throws InterruptedException,
TimeoutException, IOException, KeeperException {