SOLR-5799: When registering as the leader, if an existing ephemeral registration exists, wait a short time to see if it goes away.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1573242 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-03-02 00:27:25 +00:00
parent b32e9d6036
commit f1cddb92ee
4 changed files with 89 additions and 23 deletions

View File

@ -129,6 +129,10 @@ Other Changes
* SOLR-5771: Add SolrTestCaseJ4.SuppressSSL annotation to disable SSL (instead of static boolean).
(Robert Muir)
* SOLR-5799: When registering as the leader, if an existing ephemeral
registration exists, wait a short time to see if it goes away.
(Mark Miller)
================== 4.7.0 ==================
Versions of Major Components

View File

@ -14,6 +14,8 @@ import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.RetryUtil;
import org.apache.solr.common.util.RetryUtil.RetryCmd;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.search.SolrIndexSearcher;
@ -22,6 +24,7 @@ import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,24 +82,28 @@ public abstract class ElectionContext {
}
class ShardLeaderElectionContextBase extends ElectionContext {
private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContextBase.class);
private static Logger log = LoggerFactory
.getLogger(ShardLeaderElectionContextBase.class);
protected final SolrZkClient zkClient;
protected String shardId;
protected String collection;
protected LeaderElector leaderElector;
public ShardLeaderElectionContextBase(LeaderElector leaderElector, final String shardId,
final String collection, final String coreNodeName, ZkNodeProps props, ZkStateReader zkStateReader) {
super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
+ shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
props, zkStateReader.getZkClient());
public ShardLeaderElectionContextBase(LeaderElector leaderElector,
final String shardId, final String collection, final String coreNodeName,
ZkNodeProps props, ZkStateReader zkStateReader) {
super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+ "/leader_elect/" + shardId, ZkStateReader.getShardLeadersPath(
collection, shardId), props, zkStateReader.getZkClient());
this.leaderElector = leaderElector;
this.zkClient = zkStateReader.getZkClient();
this.shardId = shardId;
this.collection = collection;
try {
new ZkCmdExecutor(zkStateReader.getZkClient().getZkClientTimeout()).ensureExists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, zkClient);
new ZkCmdExecutor(zkStateReader.getZkClient().getZkClientTimeout())
.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection,
zkClient);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
@ -104,24 +111,39 @@ class ShardLeaderElectionContextBase extends ElectionContext {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
InterruptedException, IOException {
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart)
throws KeeperException, InterruptedException, IOException {
// register as leader - if an ephemeral is already there, wait just a bit
// to see if it goes away
RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
new RetryCmd() {
@Override
public void execute() throws InterruptedException {
try {
zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
} catch (KeeperException e) {
throw new SolrException(
ErrorCode.SERVER_ERROR,
"Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", e);
}
}
});
zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, ZkStateReader.LEADER_PROP,
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
collection, ZkStateReader.BASE_URL_PROP, leaderProps.getProperties()
.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
ZkStateReader.LEADER_PROP, ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP,
leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
}
}
// add core container and stop passing core around...

View File

@ -564,8 +564,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
mockController.publishState("core1", "core_node1", ZkStateReader.ACTIVE,
1);
while (version == getClusterStateVersion(controllerClient))
;
while (version == getClusterStateVersion(controllerClient));
verifyStatus(reader, ZkStateReader.ACTIVE);
version = getClusterStateVersion(controllerClient);
@ -593,9 +592,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
assertFalse("collection1 should be gone after publishing the null state",
reader.getClusterState().getCollections().contains("collection1"));
} finally {
close(mockController);
close(overseerClient);
close(controllerClient);
close(reader);

View File

@ -0,0 +1,43 @@
package org.apache.solr.common.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.TimeUnit;
public class RetryUtil {
public static interface RetryCmd {
public void execute() throws InterruptedException;
}
public static void retryOnThrowable(Class clazz, long timeoutms, long intervalms, RetryCmd cmd) throws InterruptedException {
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
while (true) {
try {
cmd.execute();
} catch (Throwable t) {
if (clazz.isInstance(t) && System.nanoTime() < timeout) {
Thread.sleep(intervalms);
continue;
}
throw t;
}
// success
break;
}
}
}