diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 4a252e78d9b..9264b14e384 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -176,20 +176,24 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { return false; } /** - * Try to assign core to the cluster + * Try to assign core to the cluster. * @throws KeeperException * @throws InterruptedException */ private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException { String collection = coreState.getCollectionName(); String zkCoreNodeName = coreState.getCoreNodeName(); - - String shardId; - if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) { - shardId = AssignShard.assignShard(collection, state); - } else { - shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP); - } + + // use the provided non null shardId + String shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP); + if(shardId==null) { + //use shardId from CloudState + shardId = getAssignedId(state, nodeName, coreState); + } + if(shardId==null) { + //request new shardId + shardId = AssignShard.assignShard(collection, state); + } Map props = new HashMap(); for (Entry entry : coreState.getProperties().entrySet()) { @@ -209,6 +213,23 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { CloudState newCloudState = updateSlice(state, collection, slice); return newCloudState; } + + /* + * Return an already assigned id or null if not assigned + */ + private String getAssignedId(final CloudState state, final String nodeName, + final CoreState coreState) { + final String key = coreState.getProperties().get(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getProperties().get(ZkStateReader.CORE_NAME_PROP); + Map slices = state.getSlices(coreState.getCollectionName()); + if (slices != null) { + for (Slice slice : slices.values()) { + if (slice.getShards().get(key) != null) { + return slice.getName(); + } + } + } + return null; + } private CloudState updateSlice(CloudState state, String collection, Slice slice) { @@ -480,6 +501,7 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { Set downNodes = complement(oldLiveNodes, liveNodes); for(String node: downNodes) { NodeStateWatcher watcher = nodeStateWatches.remove(node); + log.debug("Removed NodeStateWatcher for node:" + node); } } @@ -491,6 +513,7 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { if (!nodeStateWatches.containsKey(nodeName)) { zkCmdExecutor.ensureExists(path, zkClient); nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this)); + log.debug("Added NodeStateWatcher for node " + nodeName); } else { log.debug("watch already added"); } diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index c4826fb2cc4..cd99aebc4fa 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -40,6 +40,7 @@ import org.apache.solr.core.CoreDescriptor; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.Stat; import org.junit.BeforeClass; import org.junit.Test; @@ -49,6 +50,54 @@ public class OverseerTest extends SolrTestCaseJ4 { private static final boolean DEBUG = false; + private static class MockZKController{ + + private final SolrZkClient zkClient; + private final String nodeName; + + public MockZKController(String zkAddress, String nodeName) throws InterruptedException, TimeoutException, IOException, KeeperException { + this.nodeName = nodeName; + zkClient = new SolrZkClient(zkAddress, TIMEOUT); + Overseer.createClientNodes(zkClient, nodeName); + + // live node + final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1"; + zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true); + } + + private void deleteNode(final String path) { + try { + Stat stat = zkClient.exists(path, null, false); + zkClient.delete(path, stat.getVersion(), false); + } catch (KeeperException e) { + fail("Unexpected KeeperException!" + e); + } catch (InterruptedException e) { + fail("Unexpected InterruptedException!" + e); + } + } + + public void close(){ + try { + deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1"); + zkClient.close(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void publishState(String coreName, String stateName) throws KeeperException, InterruptedException{ + HashMap coreProps = new HashMap(); + coreProps.put(ZkStateReader.STATE_PROP, stateName); + coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName); + coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName); + CoreState state = new CoreState(coreName, "collection1", coreProps); + final String statePath = Overseer.STATES_NODE + "/" + nodeName; + zkClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}), true); + } + + } + @BeforeClass public static void beforeClass() throws Exception { initCore(); @@ -438,11 +487,11 @@ public class OverseerTest extends SolrTestCaseJ4 { SolrZkClient controllerClient = null; SolrZkClient overseerClient = null; ZkStateReader reader = null; + MockZKController mockController = null; try { server.run(); controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT); - AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true); @@ -450,45 +499,35 @@ public class OverseerTest extends SolrTestCaseJ4 { reader = new ZkStateReader(controllerClient); reader.createClusterStateWatchersAndUpdate(); - Overseer.createClientNodes(controllerClient, "node1"); + mockController = new MockZKController(server.getZkAddress(), "node1"); + overseerClient = electNewOverseer(server.getZkAddress()); - - // live node - final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1"; - controllerClient.makePath(nodePath, CreateMode.EPHEMERAL, true); - - HashMap coreProps = new HashMap(); - coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING); - coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1"); - CoreState state = new CoreState("core1", "collection1", coreProps); - - final String statePath = Overseer.STATES_NODE + "/node1"; - // publish node state (recovering) - controllerClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}), true); + + mockController.publishState("core1", ZkStateReader.RECOVERING); // wait overseer assignment waitForSliceCount(reader, "collection1", 1); verifyStatus(reader, ZkStateReader.RECOVERING); - // publish node state (active) - coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE); - coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1"); - state = new CoreState("core1", "collection1", coreProps); - controllerClient.setData(statePath, - ZkStateReader.toJSON(new CoreState[] {state}), true); + int version = getCloudStateVersion(controllerClient); + + mockController.publishState("core1", ZkStateReader.ACTIVE); + + while(version == getCloudStateVersion(controllerClient)); verifyStatus(reader, ZkStateReader.ACTIVE); + version = getCloudStateVersion(controllerClient); overseerClient.close(); - - coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING); - state = new CoreState("core1", "collection1", coreProps); - - controllerClient.setData(statePath, - ZkStateReader.toJSON(new CoreState[] {state}), true); + Thread.sleep(1000); //wait for overseer to get killed - overseerClient = electNewOverseer(server.getZkAddress()); + mockController.publishState("core1", ZkStateReader.RECOVERING); + version = getCloudStateVersion(controllerClient); + overseerClient = electNewOverseer(server.getZkAddress()); + + while(version == getCloudStateVersion(controllerClient)); + verifyStatus(reader, ZkStateReader.RECOVERING); assertEquals("Live nodes count does not match", 1, reader.getCloudState() @@ -497,6 +536,10 @@ public class OverseerTest extends SolrTestCaseJ4 { .getSlice("collection1", "shard1").getShards().size()); } finally { + if (mockController != null) { + mockController.close(); + } + if (overseerClient != null) { overseerClient.close(); } @@ -509,6 +552,80 @@ public class OverseerTest extends SolrTestCaseJ4 { server.shutdown(); } } + + @Test + public void testDoubleAssignment() throws Exception { + String zkDir = dataDir.getAbsolutePath() + File.separator + + "zookeeper/server1/data"; + + System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "2"); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient controllerClient = null; + SolrZkClient overseerClient = null; + ZkStateReader reader = null; + MockZKController mockController = null; + + try { + server.run(); + controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT); + + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true); + + reader = new ZkStateReader(controllerClient); + reader.createClusterStateWatchersAndUpdate(); + + mockController = new MockZKController(server.getZkAddress(), "node1"); + + overseerClient = electNewOverseer(server.getZkAddress()); + + mockController.publishState("core1", ZkStateReader.RECOVERING); + + // wait overseer assignment + waitForSliceCount(reader, "collection1", 1); + + verifyStatus(reader, ZkStateReader.RECOVERING); + + mockController.close(); + + int version = getCloudStateVersion(controllerClient); + + mockController = new MockZKController(server.getZkAddress(), "node1"); + mockController.publishState("core1", ZkStateReader.RECOVERING); + + while (version == getCloudStateVersion(controllerClient)); + + reader.updateCloudState(true); + CloudState state = reader.getCloudState(); + assertEquals("more than 1 shard id was assigned to same core", 1, state.getSlices("collection1").size()); + + } finally { + System.clearProperty(ZkStateReader.NUM_SHARDS_PROP); + if (overseerClient != null) { + overseerClient.close(); + } + if (mockController != null) { + mockController.close(); + } + + if (controllerClient != null) { + controllerClient.close(); + } + if (reader != null) { + reader.close(); + } + server.shutdown(); + } + } + + private int getCloudStateVersion(SolrZkClient controllerClient) + throws KeeperException, InterruptedException { + return controllerClient.exists(ZkStateReader.CLUSTER_STATE, null, false).getVersion(); + } + private SolrZkClient electNewOverseer(String address) throws InterruptedException, TimeoutException, IOException, KeeperException {