SOLR-3080: make election cancellable through EC

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1292680 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sami Siren 2012-02-23 07:19:37 +00:00
parent 5713824147
commit d47a11f077
3 changed files with 53 additions and 19 deletions

View File

@ -8,6 +8,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkClientConnectionStrategy;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -40,19 +41,25 @@ public abstract class ElectionContext {
final ZkNodeProps leaderProps;
final String id;
final String leaderPath;
String leaderSeqPath;
private SolrZkClient zkClient;
public ElectionContext(final String shardZkNodeName,
final String electionPath, final String leaderPath, final ZkNodeProps leaderProps) {
final String electionPath, final String leaderPath, final ZkNodeProps leaderProps, final SolrZkClient zkClient) {
this.id = shardZkNodeName;
this.electionPath = electionPath;
this.leaderPath = leaderPath;
this.leaderProps = leaderProps;
this.zkClient = zkClient;
}
public void cancelElection() throws InterruptedException, KeeperException {
zkClient.delete(leaderSeqPath, -1, true);
}
// the given core may or may not be null - if you need access to the current core, you must pass
// the core container and core name to your context impl - then use this core ref if it is not null
// else access it from the core container
abstract void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException;
abstract void runLeaderProcess(boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException;
}
class ShardLeaderElectionContextBase extends ElectionContext {
@ -66,7 +73,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
final String collection, final String shardZkNodeName, ZkNodeProps props, ZkStateReader zkStateReader) {
super(shardZkNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
+ shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
props);
props, zkStateReader.getZkClient());
this.leaderElector = leaderElector;
this.zkClient = zkStateReader.getZkClient();
this.shardId = shardId;
@ -74,7 +81,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
}
@Override
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core)
void runLeaderProcess(boolean weAreReplacement, SolrCore core)
throws KeeperException, InterruptedException, IOException {
try {
@ -109,7 +116,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
@Override
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore startupCore)
void runLeaderProcess(boolean weAreReplacement, SolrCore startupCore)
throws KeeperException, InterruptedException, IOException {
if (cc != null) {
String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
@ -123,7 +130,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
core = startupCore;
}
if (core == null) {
zkClient.delete(leaderSeqPath, -1, true);
cancelElection();
throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames());
}
// should I be leader?
@ -159,7 +166,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
super.runLeaderProcess(leaderSeqPath, weAreReplacement, startupCore);
super.runLeaderProcess(weAreReplacement, startupCore);
}
private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
@ -170,7 +177,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
zkClient.delete(leaderSeqPath, -1, true);
cancelElection();
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName());
@ -236,13 +243,13 @@ final class OverseerElectionContext extends ElectionContext {
private final ZkStateReader stateReader;
public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null);
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
this.zkClient = zkClient;
this.stateReader = stateReader;
}
@Override
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore firstCore) throws KeeperException, InterruptedException {
void runLeaderProcess(boolean weAreReplacement, SolrCore firstCore) throws KeeperException, InterruptedException {
final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
ZkNodeProps myProps = new ZkNodeProps("id", id);

View File

@ -86,7 +86,7 @@ public class LeaderElector {
* @throws IOException
* @throws UnsupportedEncodingException
*/
private void checkIfIamLeader(final String leaderSeqPath, final int seq, final ElectionContext context, boolean replacement, SolrCore core) throws KeeperException,
private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement, SolrCore core) throws KeeperException,
InterruptedException, IOException {
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
@ -95,7 +95,7 @@ public class LeaderElector {
sortSeqs(seqs);
List<Integer> intSeqs = getSeqs(seqs);
if (seq <= intSeqs.get(0)) {
runIamLeaderProcess(leaderSeqPath, context, replacement, core);
runIamLeaderProcess(context, replacement, core);
} else {
// I am not the leader - watch the node below me
int i = 1;
@ -119,7 +119,7 @@ public class LeaderElector {
public void process(WatchedEvent event) {
// am I the next leader?
try {
checkIfIamLeader(leaderSeqPath, seq, context, true, null);
checkIfIamLeader(seq, context, true, null);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@ -137,16 +137,15 @@ public class LeaderElector {
} catch (KeeperException e) {
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
checkIfIamLeader(leaderSeqPath, seq, context, true, null);
checkIfIamLeader(seq, context, true, null);
}
}
}
// TODO: get this core param out of here
protected void runIamLeaderProcess(String leaderSeqPath, final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException,
protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException,
InterruptedException, IOException {
context.runLeaderProcess(leaderSeqPath, weAreReplacement, core);
context.runLeaderProcess(weAreReplacement, core);
}
/**
@ -219,6 +218,7 @@ public class LeaderElector {
try {
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);
context.leaderSeqPath = leaderSeqPath;
cont = false;
} catch (ConnectionLossException e) {
// we don't know if we made our node or not...
@ -249,7 +249,7 @@ public class LeaderElector {
}
}
int seq = getSeq(leaderSeqPath);
checkIfIamLeader(leaderSeqPath, seq, context, false, core);
checkIfIamLeader(seq, context, false, core);
return seq;
}

View File

@ -157,7 +157,34 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
assertEquals("http://127.0.0.1/solr/",
getLeaderUrl("collection1", "shard2"));
}
@Test
public void testCancelElection() throws Exception {
LeaderElector first = new LeaderElector(zkClient);
ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
"http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "1");
ElectionContext firstContext = new ShardLeaderElectionContextBase(first,
"slice1", "collection2", "dummynode1", props, zkStateReader);
first.setup(firstContext);
first.joinElection(firstContext, null);
Thread.sleep(1000);
assertEquals("original leader was not registered", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1"));
LeaderElector second = new LeaderElector(zkClient);
props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
"http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "2");
ElectionContext context = new ShardLeaderElectionContextBase(second,
"slice1", "collection2", "dummynode1", props, zkStateReader);
second.setup(context);
second.joinElection(context, null);
Thread.sleep(1000);
assertEquals("original leader should have stayed leader", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1"));
firstContext.cancelElection();
Thread.sleep(1000);
assertEquals("new leader was not registered", "http://127.0.0.1/solr/2/", getLeaderUrl("collection2", "slice1"));
}
private String getLeaderUrl(final String collection, final String slice)
throws KeeperException, InterruptedException {
int iterCount = 60;