SOLR-5799: Fix impl.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1573401 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-03-02 23:09:13 +00:00
parent 3c0c0b4c38
commit a828f6ff05
4 changed files with 37 additions and 27 deletions

View File

@ -113,25 +113,26 @@ class ShardLeaderElectionContextBase extends ElectionContext {
}
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart)
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
// register as leader - if an ephemeral is already there, wait just a bit
// to see if it goes away
RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
new RetryCmd() {
@Override
public void execute() throws InterruptedException {
try {
try {
RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
new RetryCmd() {
@Override
public void execute() throws Throwable {
zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
} catch (KeeperException e) {
throw new SolrException(
ErrorCode.SERVER_ERROR,
"Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", e);
}
}
});
});
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError) t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
}
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,

View File

@ -1132,4 +1132,8 @@ public class Overseer {
return !"false".equals(clusterProps.get(ZkStateReader.LEGACY_CLOUD));
}
public ZkStateReader getZkStateReader() {
return reader;
}
}

View File

@ -532,19 +532,23 @@ public class OverseerTest extends SolrTestCaseJ4 {
+ "zookeeper/server1/data";
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
SolrZkClient zkClient = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(controllerClient);
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1",
@ -559,25 +563,25 @@ public class OverseerTest extends SolrTestCaseJ4 {
waitForCollections(reader, "collection1");
verifyStatus(reader, ZkStateReader.RECOVERING);
int version = getClusterStateVersion(controllerClient);
int version = getClusterStateVersion(zkClient);
mockController.publishState("core1", "core_node1", ZkStateReader.ACTIVE,
1);
while (version == getClusterStateVersion(controllerClient));
while (version == getClusterStateVersion(zkClient));
verifyStatus(reader, ZkStateReader.ACTIVE);
version = getClusterStateVersion(controllerClient);
version = getClusterStateVersion(zkClient);
overseerClient.close();
Thread.sleep(1000); // wait for overseer to get killed
mockController.publishState("core1", "core_node1",
ZkStateReader.RECOVERING, 1);
version = getClusterStateVersion(controllerClient);
version = getClusterStateVersion(zkClient);
overseerClient = electNewOverseer(server.getZkAddress());
while (version == getClusterStateVersion(controllerClient));
while (version == getClusterStateVersion(zkClient));
verifyStatus(reader, ZkStateReader.RECOVERING);
@ -585,16 +589,16 @@ public class OverseerTest extends SolrTestCaseJ4 {
.getClusterState().getLiveNodes().size());
assertEquals("Shard count does not match", 1, reader.getClusterState()
.getSlice("collection1", "shard1").getReplicasMap().size());
version = getClusterStateVersion(controllerClient);
version = getClusterStateVersion(zkClient);
mockController.publishState("core1", "core_node1", null, 1);
while (version == getClusterStateVersion(controllerClient));
while (version == getClusterStateVersion(zkClient));
Thread.sleep(500);
assertFalse("collection1 should be gone after publishing the null state",
reader.getClusterState().getCollections().contains("collection1"));
} finally {
close(mockController);
close(overseerClient);
close(controllerClient);
close(zkClient);
close(reader);
server.shutdown();
}
@ -911,6 +915,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
LeaderElector overseerElector = new LeaderElector(zkClient);
if (overseers.size() > 0) {
overseers.get(overseers.size() -1).close();
overseers.get(overseers.size() -1).getZkStateReader().getZkClient().close();
}
Overseer overseer = new Overseer(
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader);

View File

@ -21,10 +21,10 @@ import java.util.concurrent.TimeUnit;
public class RetryUtil {
public static interface RetryCmd {
public void execute() throws InterruptedException;
public void execute() throws Throwable;
}
public static void retryOnThrowable(Class clazz, long timeoutms, long intervalms, RetryCmd cmd) throws InterruptedException {
public static void retryOnThrowable(Class clazz, long timeoutms, long intervalms, RetryCmd cmd) throws Throwable {
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
while (true) {
try {