mirror of https://github.com/apache/lucene.git
SOLR-3080: remove shard info from zk when unload is called
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1292739 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
20d2cb9701
commit
565777a7b3
|
@ -18,6 +18,7 @@ package org.apache.solr.cloud;
|
|||
*/
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -42,6 +43,8 @@ public class NodeStateWatcher implements Watcher {
|
|||
public static interface NodeStateChangeListener {
|
||||
void coreChanged(String nodeName, Set<CoreState> states)
|
||||
throws KeeperException, InterruptedException;
|
||||
void coreDeleted(String nodeName, Collection<CoreState> states)
|
||||
throws KeeperException, InterruptedException;
|
||||
}
|
||||
|
||||
private final SolrZkClient zkClient;
|
||||
|
@ -104,6 +107,19 @@ public class NodeStateWatcher implements Watcher {
|
|||
}
|
||||
}
|
||||
|
||||
HashMap<String, CoreState> deletedCores = new HashMap<String, CoreState>();
|
||||
for(CoreState state: currentState) {
|
||||
deletedCores.put(state.getCoreNodeName(), state);
|
||||
}
|
||||
|
||||
for(CoreState state: stateList) {
|
||||
deletedCores.remove(state.getCoreNodeName());
|
||||
}
|
||||
|
||||
if (deletedCores.size() > 0) {
|
||||
listener.coreDeleted(nodeName, deletedCores.values());
|
||||
}
|
||||
|
||||
currentState = Collections.unmodifiableSet(newState);
|
||||
|
||||
if (modifiedCores.size() > 0) {
|
||||
|
|
|
@ -57,7 +57,7 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
private static final int STATE_UPDATE_DELAY = 500; // delay between cloud state updates
|
||||
|
||||
static enum Op {
|
||||
LeaderChange, StateChange;
|
||||
LeaderChange, StateChange, CoreDeleted;
|
||||
}
|
||||
|
||||
private final class CloudStateUpdateRequest {
|
||||
|
@ -135,6 +135,9 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
(String) request.args[0], (CoreState) request.args[1]);
|
||||
break;
|
||||
|
||||
case CoreDeleted:
|
||||
cloudState = removeCore(cloudState, (String) request.args[0], (String) request.args[1]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,7 +297,6 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
|
||||
private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
|
||||
|
||||
boolean updated = false;
|
||||
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
|
||||
newStates.putAll(state.getCollectionStates());
|
||||
|
||||
|
@ -314,32 +316,49 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
Map<String, String> newShardProps = new LinkedHashMap<String,String>();
|
||||
newShardProps.putAll(shard.getValue().getProperties());
|
||||
|
||||
String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
|
||||
|
||||
newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
|
||||
|
||||
ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
|
||||
if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
|
||||
newShardProps.put(ZkStateReader.LEADER_PROP,"true");
|
||||
if (wasLeader == null) {
|
||||
updated = true;
|
||||
}
|
||||
} else {
|
||||
if (wasLeader != null) {
|
||||
updated = true;
|
||||
}
|
||||
}
|
||||
newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
|
||||
}
|
||||
Slice slice = new Slice(sliceName, newShards);
|
||||
slices.put(sliceName, slice);
|
||||
}
|
||||
if (updated) {
|
||||
return new CloudState(state.getLiveNodes(), newStates);
|
||||
} else {
|
||||
return state;
|
||||
}
|
||||
return new CloudState(state.getLiveNodes(), newStates);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Remove core from cloudstate
|
||||
*/
|
||||
private CloudState removeCore(final CloudState cloudState, final String collection, final String coreNodeName) {
|
||||
final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
|
||||
for(String collectionName: cloudState.getCollections()) {
|
||||
if(collection.equals(collectionName)) {
|
||||
Map<String, Slice> slices = cloudState.getSlices(collection);
|
||||
LinkedHashMap<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
|
||||
for(Slice slice: slices.values()) {
|
||||
if(slice.getShards().containsKey(coreNodeName)) {
|
||||
LinkedHashMap<String, ZkNodeProps> newShards = new LinkedHashMap<String, ZkNodeProps>();
|
||||
newShards.putAll(slice.getShards());
|
||||
newShards.remove(coreNodeName);
|
||||
Slice newSlice = new Slice(slice.getName(), newShards);
|
||||
newSlices.put(slice.getName(), newSlice);
|
||||
} else {
|
||||
newSlices.put(slice.getName(), slice);
|
||||
}
|
||||
}
|
||||
newStates.put(collectionName, newSlices);
|
||||
} else {
|
||||
newStates.put(collectionName, cloudState.getSlices(collectionName));
|
||||
}
|
||||
}
|
||||
CloudState newState = new CloudState(cloudState.getLiveNodes(), newStates);
|
||||
return newState;
|
||||
}
|
||||
}
|
||||
|
||||
public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
|
||||
log.info("Constructing new Overseer id=" + id);
|
||||
|
@ -462,7 +481,6 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
ShardLeaderWatcher watcher = watches.remove(shardId);
|
||||
if (watcher != null) {
|
||||
watcher.close();
|
||||
announceLeader(collection, shardId, new ZkCoreNodeProps(new ZkNodeProps())); //removes loeader for shard
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -567,7 +585,15 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void coreDeleted(String nodeName, Collection<CoreState> states)
|
||||
throws KeeperException, InterruptedException {
|
||||
for (CoreState state : states) {
|
||||
fifo.add(new CloudStateUpdateRequest(Op.CoreDeleted, state.getCollectionName(), state.getCoreNodeName()));
|
||||
}
|
||||
}
|
||||
|
||||
public static void createClientNodes(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
|
||||
final String node = STATES_NODE + "/" + nodeName;
|
||||
if (log.isInfoEnabled()) {
|
||||
|
@ -585,4 +611,4 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, coreUrl));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -85,7 +87,8 @@ public final class ZkController {
|
|||
public final static String COLLECTION_PARAM_PREFIX="collection.";
|
||||
public final static String CONFIGNAME_PROP="configName";
|
||||
|
||||
private final Map<String, CoreState> coreStates = new HashMap<String, CoreState>();
|
||||
private Map<String, CoreState> coreStates = null;
|
||||
private final Map<String, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
|
||||
|
||||
private SolrZkClient zkClient;
|
||||
private ZkCmdExecutor cmdExecutor;
|
||||
|
@ -340,6 +343,8 @@ public final class ZkController {
|
|||
createEphemeralLiveNode();
|
||||
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
|
||||
|
||||
syncNodeState();
|
||||
|
||||
overseerElector = new LeaderElector(zkClient);
|
||||
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
|
||||
overseerElector.setup(context);
|
||||
|
@ -364,6 +369,27 @@ public final class ZkController {
|
|||
|
||||
}
|
||||
|
||||
/*
|
||||
* sync internal state with zk on startup
|
||||
*/
|
||||
private void syncNodeState() throws KeeperException, InterruptedException {
|
||||
log.debug("Syncing internal state with zk. Current: " + coreStates);
|
||||
final String path = Overseer.STATES_NODE + "/" + getNodeName();
|
||||
|
||||
final byte[] data = zkClient.getData(path, null, null, true);
|
||||
|
||||
coreStates = new HashMap<String,CoreState>();
|
||||
|
||||
if (data != null) {
|
||||
CoreState[] states = CoreState.load(data);
|
||||
List<CoreState> stateList = Arrays.asList(states);
|
||||
for(CoreState coreState: stateList) {
|
||||
coreStates.put(coreState.getCoreName(), coreState);
|
||||
}
|
||||
}
|
||||
log.debug("after sync: " + coreStates);
|
||||
}
|
||||
|
||||
public boolean isConnected() {
|
||||
return zkClient.isConnected();
|
||||
}
|
||||
|
@ -604,6 +630,7 @@ public final class ZkController {
|
|||
collection, shardZkNodeName, leaderProps, this, cc);
|
||||
|
||||
leaderElector.setup(context);
|
||||
electionContexts.put(shardZkNodeName, context);
|
||||
leaderElector.joinElection(context, core);
|
||||
}
|
||||
|
||||
|
@ -722,9 +749,20 @@ public final class ZkController {
|
|||
/**
|
||||
* @param coreName
|
||||
* @param cloudDesc
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void unregister(String coreName, CloudDescriptor cloudDesc) {
|
||||
// TODO : perhaps mark the core down in zk?
|
||||
public void unregister(String coreName, CloudDescriptor cloudDesc)
|
||||
throws InterruptedException, KeeperException {
|
||||
final String zkNodeName = getNodeName() + "_" + coreName;
|
||||
synchronized (coreStates) {
|
||||
coreStates.remove(zkNodeName);
|
||||
}
|
||||
publishState();
|
||||
ElectionContext context = electionContexts.remove(zkNodeName);
|
||||
if (context != null) {
|
||||
context.cancelElection();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -902,10 +940,18 @@ public final class ZkController {
|
|||
}
|
||||
CoreState coreState = new CoreState(coreName,
|
||||
cloudDesc.getCollectionName(), props, numShards);
|
||||
final String nodePath = "/node_states/" + getNodeName();
|
||||
|
||||
synchronized (coreStates) {
|
||||
coreStates.put(shardZkNodeName, coreState);
|
||||
}
|
||||
|
||||
publishState();
|
||||
}
|
||||
|
||||
private void publishState() {
|
||||
final String nodePath = "/node_states/" + getNodeName();
|
||||
|
||||
synchronized (coreStates) {
|
||||
try {
|
||||
zkClient.setData(nodePath, ZkStateReader.toJSON(coreStates.values()),
|
||||
true);
|
||||
|
@ -920,7 +966,6 @@ public final class ZkController {
|
|||
"could not publish node state", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor)
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
|
|||
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
||||
import org.apache.solr.util.NumberUtils;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -319,6 +320,17 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
try {
|
||||
SolrParams params = req.getParams();
|
||||
String name = params.get(CoreAdminParams.NAME);
|
||||
|
||||
//for now, do not allow creating new core with same name when in cloud mode
|
||||
//XXX perhaps it should just be unregistered from cloud before readding it?,
|
||||
//XXX perhaps we should also check that cores are of same type before adding new core to collection?
|
||||
if (coreContainer.getZkController() != null) {
|
||||
if (coreContainer.getCore(name) != null) {
|
||||
log.info("Re-creating a core with existing name is not allowed in cloud mode");
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Core with name '" + name + "' already exists.");
|
||||
}
|
||||
}
|
||||
|
||||
String instanceDir = params.get(CoreAdminParams.INSTANCE_DIR);
|
||||
if (instanceDir == null) {
|
||||
|
@ -454,7 +466,23 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
SolrCore core = coreContainer.remove(cname);
|
||||
if(core == null){
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"No such core exists '"+cname+"'");
|
||||
"No such core exists '" + cname + "'");
|
||||
} else {
|
||||
if (coreContainer.getZkController() != null) {
|
||||
log.info("Unregistering core " + cname + " from cloudstate.");
|
||||
try {
|
||||
coreContainer.getZkController().unregister(cname, core.getCoreDescriptor().getCloudDescriptor());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Could not unregister core " + cname + " from cloudstate: "
|
||||
+ e.getMessage(), e);
|
||||
} catch (KeeperException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Could not unregister core " + cname + " from cloudstate: "
|
||||
+ e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (params.getBool(CoreAdminParams.DELETE_INDEX, false)) {
|
||||
core.addCloseHook(new CloseHook() {
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to You under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
|
||||
import org.apache.solr.cloud.OverseerTest.MockZKController;
|
||||
import org.apache.solr.common.cloud.CoreState;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.BeforeClass;
|
||||
import java.util.Collection;
|
||||
|
||||
public class NodeStateWatcherTest extends SolrTestCaseJ4 {
|
||||
|
||||
private int TIMEOUT = 10000;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
initCore();
|
||||
}
|
||||
|
||||
public void testCoreAddDelete() throws Exception {
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
|
||||
ZkTestServer server = new ZkTestServer(zkDir);
|
||||
|
||||
SolrZkClient zkClient = null;
|
||||
ZkStateReader reader = null;
|
||||
SolrZkClient overseerClient = null;
|
||||
MockZKController controller = null;
|
||||
|
||||
try {
|
||||
final String NODE_NAME = "node1";
|
||||
server.run();
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
zkClient.makePath("/live_nodes", true);
|
||||
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "2");
|
||||
|
||||
reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
controller = new MockZKController(server.getZkAddress(), NODE_NAME, "collection1");
|
||||
|
||||
final String path = Overseer.STATES_NODE + "/" + NODE_NAME;
|
||||
|
||||
final AtomicInteger callCounter = new AtomicInteger();
|
||||
NodeStateWatcher watcher = new NodeStateWatcher(zkClient, NODE_NAME, path, new NodeStateChangeListener() {
|
||||
|
||||
@Override
|
||||
public void coreChanged(String nodeName, Set<CoreState> states)
|
||||
throws KeeperException, InterruptedException {
|
||||
callCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void coreDeleted(String nodeName, Collection<CoreState> states)
|
||||
throws KeeperException, InterruptedException {
|
||||
callCounter.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
controller.publishState("core1", "state1", 2);
|
||||
waitForCall(1, callCounter);
|
||||
assertEquals(1, watcher.getCurrentState().size());
|
||||
controller.publishState("core2", "state1", 2);
|
||||
waitForCall(2, callCounter);
|
||||
assertEquals(2, watcher.getCurrentState().size());
|
||||
controller.publishState("core1", null, 2);
|
||||
waitForCall(3, callCounter);
|
||||
assertEquals(1, watcher.getCurrentState().size());
|
||||
controller.publishState("core2", null, 2);
|
||||
waitForCall(4, callCounter);
|
||||
assertEquals(0, watcher.getCurrentState().size());
|
||||
} finally {
|
||||
|
||||
if (zkClient != null) {
|
||||
zkClient.close();
|
||||
}
|
||||
if (controller != null) {
|
||||
controller.close();
|
||||
}
|
||||
if (overseerClient != null) {
|
||||
overseerClient.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void waitForCall(int i, AtomicInteger callCounter) throws InterruptedException {
|
||||
while (i > callCounter.get()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ package org.apache.solr.cloud;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -35,11 +36,13 @@ import org.apache.solr.common.cloud.CloudState;
|
|||
import org.apache.solr.common.cloud.CoreState;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.Code;
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -51,25 +54,36 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
private static final boolean DEBUG = false;
|
||||
|
||||
|
||||
private static class MockZKController{
|
||||
public static class MockZKController{
|
||||
|
||||
private final SolrZkClient zkClient;
|
||||
private final ZkStateReader zkStateReader;
|
||||
private final String nodeName;
|
||||
private final String collection;
|
||||
private final LeaderElector elector;
|
||||
private final Map<String, CoreState> coreStates = Collections.synchronizedMap(new HashMap<String, CoreState>());
|
||||
private final Map<String, ElectionContext> electionContext = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
|
||||
|
||||
public MockZKController(String zkAddress, String nodeName) throws InterruptedException, TimeoutException, IOException, KeeperException {
|
||||
public MockZKController(String zkAddress, String nodeName, String collection) throws InterruptedException, TimeoutException, IOException, KeeperException {
|
||||
this.nodeName = nodeName;
|
||||
this.collection = collection;
|
||||
zkClient = new SolrZkClient(zkAddress, TIMEOUT);
|
||||
zkStateReader = new ZkStateReader(zkClient);
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
Overseer.createClientNodes(zkClient, nodeName);
|
||||
|
||||
// live node
|
||||
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
|
||||
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
|
||||
zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
|
||||
elector = new LeaderElector(zkClient);
|
||||
}
|
||||
|
||||
private void deleteNode(final String path) {
|
||||
try {
|
||||
Stat stat = zkClient.exists(path, null, false);
|
||||
zkClient.delete(path, stat.getVersion(), false);
|
||||
if (stat != null) {
|
||||
zkClient.delete(path, stat.getVersion(), false);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
fail("Unexpected KeeperException!" + e);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -82,22 +96,72 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1");
|
||||
zkClient.close();
|
||||
} catch (InterruptedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
//e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void publishState(String coreName, String stateName, int numShards) throws KeeperException, InterruptedException{
|
||||
HashMap<String,String> coreProps = new HashMap<String,String>();
|
||||
coreProps.put(ZkStateReader.STATE_PROP, stateName);
|
||||
coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName);
|
||||
coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
|
||||
CoreState state = new CoreState(coreName, "collection1", coreProps, numShards);
|
||||
public void publishState(String coreName, String stateName, int numShards)
|
||||
throws KeeperException, InterruptedException, IOException {
|
||||
if (stateName == null) {
|
||||
coreStates.remove(coreName);
|
||||
ElectionContext ec = electionContext.remove(coreName);
|
||||
if (ec != null) {
|
||||
ec.cancelElection();
|
||||
}
|
||||
} else {
|
||||
HashMap<String,String> coreProps = new HashMap<String,String>();
|
||||
coreProps.put(ZkStateReader.STATE_PROP, stateName);
|
||||
coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName);
|
||||
coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
|
||||
coreProps.put(ZkStateReader.COLLECTION_PROP, collection);
|
||||
coreProps.put(ZkStateReader.BASE_URL_PROP, "http://" + nodeName
|
||||
+ "/solr/");
|
||||
CoreState state = new CoreState(coreName, collection, coreProps,
|
||||
numShards);
|
||||
coreStates.remove(coreName);
|
||||
coreStates.put(coreName, state);
|
||||
}
|
||||
final String statePath = Overseer.STATES_NODE + "/" + nodeName;
|
||||
zkClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}), true);
|
||||
zkClient.setData(
|
||||
statePath,
|
||||
ZkStateReader.toJSON(coreStates.values().toArray(
|
||||
new CoreState[coreStates.size()])), true);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String shardId = getShardId(coreName);
|
||||
if (shardId != null) {
|
||||
try {
|
||||
zkClient.makePath("/collections/" + collection + "/leader_elect/"
|
||||
+ shardId + "/election", true);
|
||||
} catch (NodeExistsException nee) {}
|
||||
ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
|
||||
"http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP,
|
||||
nodeName, ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.SHARD_ID_PROP, shardId,
|
||||
ZkStateReader.COLLECTION_PROP, collection);
|
||||
ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase(
|
||||
elector, shardId, collection, nodeName + "_" + coreName, props,
|
||||
zkStateReader);
|
||||
elector.joinElection(ctx, null);
|
||||
break;
|
||||
}
|
||||
Thread.sleep(200);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
private String getShardId(final String coreName) {
|
||||
Map<String,Slice> slices = zkStateReader.getCloudState().getSlices(
|
||||
collection);
|
||||
if (slices != null) {
|
||||
for (Slice slice : slices.values()) {
|
||||
if (slice.getShards().containsKey(nodeName + "_" + coreName))
|
||||
;
|
||||
return slice.getName();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
|
@ -126,7 +190,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
|
||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
|
||||
|
||||
|
@ -208,8 +275,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
|
||||
reader = new ZkStateReader(zkClient);
|
||||
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, Integer.valueOf(sliceCount).toString());
|
||||
|
||||
for (int i = 0; i < nodeCount; i++) {
|
||||
|
@ -464,6 +534,21 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
fail("Illegal state, was:" + coreState + " expected:" + expectedState + "cloudState:" + reader.getCloudState());
|
||||
}
|
||||
|
||||
private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) throws InterruptedException, KeeperException {
|
||||
int maxIterations = 100;
|
||||
while(maxIterations-->0) {
|
||||
ZkNodeProps props = reader.getCloudState().getLeader(collection, shard);
|
||||
if(props!=null) {
|
||||
if(expectedCore.equals(props.get(ZkStateReader.CORE_NAME_PROP))) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
assertEquals("Unexpected shard leader coll:" + collection + " shard:" + shard, expectedCore, (reader.getCloudState().getLeader(collection, shard)!=null)?reader.getCloudState().getLeader(collection, shard).get(ZkStateReader.CORE_NAME_PROP):null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverseerFailure() throws Exception {
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
|
@ -485,7 +570,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1");
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
|
@ -519,6 +604,12 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
.getLiveNodes().size());
|
||||
assertEquals("Shard count does not match", 1, reader.getCloudState()
|
||||
.getSlice("collection1", "shard1").getShards().size());
|
||||
version = getCloudStateVersion(controllerClient);
|
||||
mockController.publishState("core1", null,1);
|
||||
while(version == getCloudStateVersion(controllerClient));
|
||||
Thread.sleep(100);
|
||||
assertEquals("Shard count does not match", 0, reader.getCloudState()
|
||||
.getSlice("collection1", "shard1").getShards().size());
|
||||
} finally {
|
||||
|
||||
if (mockController != null) {
|
||||
|
@ -537,7 +628,111 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
server.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private class OverseerRestarter implements Runnable{
|
||||
SolrZkClient overseerClient = null;
|
||||
public volatile boolean run = true;
|
||||
private final String zkAddress;
|
||||
|
||||
public OverseerRestarter(String zkAddress) {
|
||||
this.zkAddress = zkAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
overseerClient = electNewOverseer(zkAddress);
|
||||
} catch (Throwable t) {
|
||||
//t.printStackTrace();
|
||||
}
|
||||
while (run) {
|
||||
if(random.nextInt(20)==1){
|
||||
try {
|
||||
overseerClient.close();
|
||||
overseerClient = electNewOverseer(zkAddress);
|
||||
} catch (Throwable e) {
|
||||
//e.printStackTrace();
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Throwable e) {
|
||||
//e.printStackTrace();
|
||||
}
|
||||
}
|
||||
try {
|
||||
overseerClient.close();
|
||||
} catch (Throwable e) {
|
||||
//e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testShardLeaderChange() throws Exception {
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
final ZkTestServer server = new ZkTestServer(zkDir);
|
||||
SolrZkClient controllerClient = null;
|
||||
ZkStateReader reader = null;
|
||||
MockZKController mockController = null;
|
||||
MockZKController mockController2 = null;
|
||||
OverseerRestarter killer = null;
|
||||
Thread killerThread = null;
|
||||
try {
|
||||
server.run();
|
||||
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
|
||||
killer = new OverseerRestarter(server.getZkAddress());
|
||||
killerThread = new Thread(killer);
|
||||
killerThread.start();
|
||||
|
||||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
|
||||
mockController.publishState("core1", "state1",1);
|
||||
if(mockController2!=null) {
|
||||
mockController2.close();
|
||||
mockController2 = null;
|
||||
}
|
||||
mockController.publishState("core1", "state2",1);
|
||||
mockController2 = new MockZKController(server.getZkAddress(), "node2", "collection1");
|
||||
mockController.publishState("core1", "state1",1);
|
||||
verifyShardLeader(reader, "collection1", "shard1", "core1");
|
||||
mockController2.publishState("core4", "state2" ,1);
|
||||
mockController.close();
|
||||
mockController = null;
|
||||
verifyShardLeader(reader, "collection1", "shard1", "core4");
|
||||
}
|
||||
} finally {
|
||||
if (killer != null) {
|
||||
killer.run = false;
|
||||
if (killerThread != null) {
|
||||
killerThread.join();
|
||||
}
|
||||
}
|
||||
if (mockController != null) {
|
||||
mockController.close();
|
||||
}
|
||||
if (mockController2 != null) {
|
||||
mockController2.close();
|
||||
}
|
||||
if (controllerClient != null) {
|
||||
controllerClient.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
server.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoubleAssignment() throws Exception {
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
|
@ -561,7 +756,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1");
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
|
@ -575,7 +770,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
int version = getCloudStateVersion(controllerClient);
|
||||
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1");
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
|
||||
mockController.publishState("core1", ZkStateReader.RECOVERING, 1);
|
||||
|
||||
while (version == getCloudStateVersion(controllerClient));
|
||||
|
@ -593,7 +788,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
assertEquals("Shard was found in more than 1 times in CloudState", 1,
|
||||
numFound);
|
||||
|
||||
} finally {
|
||||
if (overseerClient != null) {
|
||||
overseerClient.close();
|
||||
|
@ -635,7 +829,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1");
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.solr.common.cloud.SolrZkClient;
|
|||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrConfig;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -55,6 +54,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
|
|||
try {
|
||||
server.run();
|
||||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
|
@ -142,6 +142,96 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoreUnload() throws Exception {
|
||||
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
|
||||
ZkTestServer server = new ZkTestServer(zkDir);
|
||||
|
||||
ZkController zkController = null;
|
||||
SolrZkClient zkClient = null;
|
||||
try {
|
||||
server.run();
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
|
||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "1");
|
||||
System.setProperty("solrcloud.skip.autorecovery", "true");
|
||||
|
||||
zkController = new ZkController(null, server.getZkAddress(), TIMEOUT,
|
||||
10000, "localhost", "8983", "solr",
|
||||
new CurrentCoreDescriptorProvider() {
|
||||
|
||||
@Override
|
||||
public List<CoreDescriptor> getCurrentDescriptors() {
|
||||
// do nothing
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
System.setProperty("bootstrap_confdir", getFile("solr/conf")
|
||||
.getAbsolutePath());
|
||||
|
||||
final int numShards = 2;
|
||||
final String[] ids = new String[numShards];
|
||||
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
CloudDescriptor collection1Desc = new CloudDescriptor();
|
||||
collection1Desc.setCollectionName("collection1");
|
||||
CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
|
||||
desc1.setCloudDescriptor(collection1Desc);
|
||||
zkController.preRegisterSetup(null, desc1, false);
|
||||
ids[i] = zkController.register("core" + (i + 1), desc1);
|
||||
}
|
||||
|
||||
assertEquals("shard1", ids[0]);
|
||||
assertEquals("shard1", ids[1]);
|
||||
|
||||
assertNotNull(reader.getLeaderUrl("collection1", "shard1", 15000));
|
||||
|
||||
// unregister current leader
|
||||
final ZkNodeProps shard1LeaderProps = reader.getLeaderProps(
|
||||
"collection1", "shard1");
|
||||
final String leaderUrl = reader.getLeaderUrl("collection1", "shard1",
|
||||
15000);
|
||||
|
||||
final CloudDescriptor collection1Desc = new CloudDescriptor();
|
||||
collection1Desc.setCollectionName("collection1");
|
||||
final CoreDescriptor desc1 = new CoreDescriptor(null,
|
||||
shard1LeaderProps.get(ZkStateReader.CORE_NAME_PROP), "");
|
||||
desc1.setCloudDescriptor(collection1Desc);
|
||||
zkController.unregister(
|
||||
shard1LeaderProps.get(ZkStateReader.CORE_NAME_PROP), collection1Desc);
|
||||
assertNotSame(
|
||||
"New leader was not promoted after unregistering the current leader.",
|
||||
leaderUrl, reader.getLeaderUrl("collection1", "shard1", 15000));
|
||||
assertNotNull("New leader was null.",
|
||||
reader.getLeaderUrl("collection1", "shard1", 15000));
|
||||
} finally {
|
||||
if (DEBUG) {
|
||||
if (zkController != null) {
|
||||
zkClient.printLayoutToStdOut();
|
||||
}
|
||||
}
|
||||
if (zkClient != null) {
|
||||
zkClient.close();
|
||||
}
|
||||
if (zkController != null) {
|
||||
zkController.close();
|
||||
}
|
||||
server.shutdown();
|
||||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
|
|
Loading…
Reference in New Issue