From d47a11f077281c7f691332f4fd864a011ccf9919 Mon Sep 17 00:00:00 2001 From: Sami Siren Date: Thu, 23 Feb 2012 07:19:37 +0000 Subject: [PATCH] SOLR-3080: make election cancellable through EC git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1292680 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/solr/cloud/ElectionContext.java | 27 ++++++++++------- .../org/apache/solr/cloud/LeaderElector.java | 16 +++++----- .../apache/solr/cloud/LeaderElectionTest.java | 29 ++++++++++++++++++- 3 files changed, 53 insertions(+), 19 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 50371c798d0..ff10bbcedb8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -8,6 +8,7 @@ import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.CloudState; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkClientConnectionStrategy; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -40,19 +41,25 @@ public abstract class ElectionContext { final ZkNodeProps leaderProps; final String id; final String leaderPath; + String leaderSeqPath; + private SolrZkClient zkClient; public ElectionContext(final String shardZkNodeName, - final String electionPath, final String leaderPath, final ZkNodeProps leaderProps) { + final String electionPath, final String leaderPath, final ZkNodeProps leaderProps, final SolrZkClient zkClient) { this.id = shardZkNodeName; this.electionPath = electionPath; this.leaderPath = leaderPath; this.leaderProps = leaderProps; + this.zkClient = zkClient; } + public void cancelElection() throws InterruptedException, KeeperException { + zkClient.delete(leaderSeqPath, -1, true); + } // the given core may or may not be null - if you need access to the current core, you must pass // the core container and core name to your context impl - then use this core ref if it is not null // else access it from the core container - abstract void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException; + abstract void runLeaderProcess(boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException; } class ShardLeaderElectionContextBase extends ElectionContext { @@ -66,7 +73,7 @@ class ShardLeaderElectionContextBase extends ElectionContext { final String collection, final String shardZkNodeName, ZkNodeProps props, ZkStateReader zkStateReader) { super(shardZkNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/" + shardId, ZkStateReader.getShardLeadersPath(collection, shardId), - props); + props, zkStateReader.getZkClient()); this.leaderElector = leaderElector; this.zkClient = zkStateReader.getZkClient(); this.shardId = shardId; @@ -74,7 +81,7 @@ class ShardLeaderElectionContextBase extends ElectionContext { } @Override - void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core) + void runLeaderProcess(boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException { try { @@ -109,7 +116,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } @Override - void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore startupCore) + void runLeaderProcess(boolean weAreReplacement, SolrCore startupCore) throws KeeperException, InterruptedException, IOException { if (cc != null) { String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP); @@ -123,7 +130,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { core = startupCore; } if (core == null) { - zkClient.delete(leaderSeqPath, -1, true); + cancelElection(); throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames()); } // should I be leader? @@ -159,7 +166,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } - super.runLeaderProcess(leaderSeqPath, weAreReplacement, startupCore); + super.runLeaderProcess(weAreReplacement, startupCore); } private void rejoinLeaderElection(String leaderSeqPath, SolrCore core) @@ -170,7 +177,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN); - zkClient.delete(leaderSeqPath, -1, true); + cancelElection(); core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName()); @@ -236,13 +243,13 @@ final class OverseerElectionContext extends ElectionContext { private final ZkStateReader stateReader; public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) { - super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null); + super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient()); this.zkClient = zkClient; this.stateReader = stateReader; } @Override - void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore firstCore) throws KeeperException, InterruptedException { + void runLeaderProcess(boolean weAreReplacement, SolrCore firstCore) throws KeeperException, InterruptedException { final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1); ZkNodeProps myProps = new ZkNodeProps("id", id); diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java index 9a9b085814e..be4bda459ee 100644 --- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java @@ -86,7 +86,7 @@ public class LeaderElector { * @throws IOException * @throws UnsupportedEncodingException */ - private void checkIfIamLeader(final String leaderSeqPath, final int seq, final ElectionContext context, boolean replacement, SolrCore core) throws KeeperException, + private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement, SolrCore core) throws KeeperException, InterruptedException, IOException { // get all other numbers... final String holdElectionPath = context.electionPath + ELECTION_NODE; @@ -95,7 +95,7 @@ public class LeaderElector { sortSeqs(seqs); List intSeqs = getSeqs(seqs); if (seq <= intSeqs.get(0)) { - runIamLeaderProcess(leaderSeqPath, context, replacement, core); + runIamLeaderProcess(context, replacement, core); } else { // I am not the leader - watch the node below me int i = 1; @@ -119,7 +119,7 @@ public class LeaderElector { public void process(WatchedEvent event) { // am I the next leader? try { - checkIfIamLeader(leaderSeqPath, seq, context, true, null); + checkIfIamLeader(seq, context, true, null); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); @@ -137,16 +137,15 @@ public class LeaderElector { } catch (KeeperException e) { // we couldn't set our watch - the node before us may already be down? // we need to check if we are the leader again - checkIfIamLeader(leaderSeqPath, seq, context, true, null); + checkIfIamLeader(seq, context, true, null); } } } // TODO: get this core param out of here - protected void runIamLeaderProcess(String leaderSeqPath, final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException, + protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException { - - context.runLeaderProcess(leaderSeqPath, weAreReplacement, core); + context.runLeaderProcess(weAreReplacement, core); } /** @@ -219,6 +218,7 @@ public class LeaderElector { try { leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null, CreateMode.EPHEMERAL_SEQUENTIAL, false); + context.leaderSeqPath = leaderSeqPath; cont = false; } catch (ConnectionLossException e) { // we don't know if we made our node or not... @@ -249,7 +249,7 @@ public class LeaderElector { } } int seq = getSeq(leaderSeqPath); - checkIfIamLeader(leaderSeqPath, seq, context, false, core); + checkIfIamLeader(seq, context, false, core); return seq; } diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java index 929bff7eeae..54d91c288d0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java @@ -157,7 +157,34 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { assertEquals("http://127.0.0.1/solr/", getLeaderUrl("collection1", "shard2")); } - + + @Test + public void testCancelElection() throws Exception { + LeaderElector first = new LeaderElector(zkClient); + ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, + "http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "1"); + ElectionContext firstContext = new ShardLeaderElectionContextBase(first, + "slice1", "collection2", "dummynode1", props, zkStateReader); + first.setup(firstContext); + first.joinElection(firstContext, null); + + Thread.sleep(1000); + assertEquals("original leader was not registered", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1")); + + LeaderElector second = new LeaderElector(zkClient); + props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, + "http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "2"); + ElectionContext context = new ShardLeaderElectionContextBase(second, + "slice1", "collection2", "dummynode1", props, zkStateReader); + second.setup(context); + second.joinElection(context, null); + Thread.sleep(1000); + assertEquals("original leader should have stayed leader", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1")); + firstContext.cancelElection(); + Thread.sleep(1000); + assertEquals("new leader was not registered", "http://127.0.0.1/solr/2/", getLeaderUrl("collection2", "slice1")); + } + private String getLeaderUrl(final String collection, final String slice) throws KeeperException, InterruptedException { int iterCount = 60;