From b24e035e9f97af962f49b08ed813dea1cbb6c5d6 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Mon, 27 Feb 2012 01:09:14 +0000 Subject: [PATCH] remove some api ugliness around solrcloud - we don't actually need it anymore git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1293986 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/solr/cloud/ElectionContext.java | 22 ++--- .../org/apache/solr/cloud/LeaderElector.java | 17 ++-- .../org/apache/solr/cloud/ZkController.java | 91 ++++++++----------- .../org/apache/solr/core/CoreContainer.java | 2 +- .../solr/cloud/ChaosMonkeySafeLeaderTest.java | 2 +- .../apache/solr/cloud/LeaderElectionTest.java | 8 +- .../org/apache/solr/cloud/OverseerTest.java | 8 +- .../apache/solr/cloud/ZkControllerTest.java | 2 +- 8 files changed, 69 insertions(+), 83 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index ff10bbcedb8..412892681ea 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -59,7 +59,7 @@ public abstract class ElectionContext { // the given core may or may not be null - if you need access to the current core, you must pass // the core container and core name to your context impl - then use this core ref if it is not null // else access it from the core container - abstract void runLeaderProcess(boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException; + abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException; } class ShardLeaderElectionContextBase extends ElectionContext { @@ -81,7 +81,7 @@ class ShardLeaderElectionContextBase extends ElectionContext { } @Override - void runLeaderProcess(boolean weAreReplacement, SolrCore core) + void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException { try { @@ -116,7 +116,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } @Override - void runLeaderProcess(boolean weAreReplacement, SolrCore startupCore) + void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException { if (cc != null) { String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP); @@ -124,11 +124,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { try { // the first time we are run, we will get a startupCore - after // we will get null and must use cc.getCore - if (startupCore == null) { - core = cc.getCore(coreName); - } else { - core = startupCore; - } + + core = cc.getCore(coreName); + if (core == null) { cancelElection(); throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames()); @@ -159,14 +157,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE); } finally { - if (core != null && startupCore == null) { + if (core != null ) { core.close(); } } } - super.runLeaderProcess(weAreReplacement, startupCore); + super.runLeaderProcess(weAreReplacement); } private void rejoinLeaderElection(String leaderSeqPath, SolrCore core) @@ -181,7 +179,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName()); - leaderElector.joinElection(this, null); // don't pass core, pass null + leaderElector.joinElection(this); } private boolean shouldIBeLeader(ZkNodeProps leaderProps) { @@ -249,7 +247,7 @@ final class OverseerElectionContext extends ElectionContext { } @Override - void runLeaderProcess(boolean weAreReplacement, SolrCore firstCore) throws KeeperException, InterruptedException { + void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException { final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1); ZkNodeProps myProps = new ZkNodeProps("id", id); diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java index be4bda459ee..c6ac424b9fd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java @@ -80,13 +80,12 @@ public class LeaderElector { * @param seq * @param context * @param replacement has someone else been the leader already? - * @param core * @throws KeeperException * @throws InterruptedException * @throws IOException * @throws UnsupportedEncodingException */ - private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement, SolrCore core) throws KeeperException, + private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException { // get all other numbers... final String holdElectionPath = context.electionPath + ELECTION_NODE; @@ -95,7 +94,7 @@ public class LeaderElector { sortSeqs(seqs); List intSeqs = getSeqs(seqs); if (seq <= intSeqs.get(0)) { - runIamLeaderProcess(context, replacement, core); + runIamLeaderProcess(context, replacement); } else { // I am not the leader - watch the node below me int i = 1; @@ -119,7 +118,7 @@ public class LeaderElector { public void process(WatchedEvent event) { // am I the next leader? try { - checkIfIamLeader(seq, context, true, null); + checkIfIamLeader(seq, context, true); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); @@ -137,15 +136,15 @@ public class LeaderElector { } catch (KeeperException e) { // we couldn't set our watch - the node before us may already be down? // we need to check if we are the leader again - checkIfIamLeader(seq, context, true, null); + checkIfIamLeader(seq, context, true); } } } // TODO: get this core param out of here - protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException, + protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException, InterruptedException, IOException { - context.runLeaderProcess(weAreReplacement, core); + context.runLeaderProcess(weAreReplacement); } /** @@ -206,7 +205,7 @@ public class LeaderElector { * @throws IOException * @throws UnsupportedEncodingException */ - public int joinElection(ElectionContext context, SolrCore core) throws KeeperException, InterruptedException, IOException { + public int joinElection(ElectionContext context) throws KeeperException, InterruptedException, IOException { final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE; long sessionId = zkClient.getSolrZooKeeper().getSessionId(); @@ -249,7 +248,7 @@ public class LeaderElector { } } int seq = getSeq(leaderSeqPath); - checkIfIamLeader(seq, context, false, core); + checkIfIamLeader(seq, context, false); return seq; } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 8fcaeb003fb..62fe8b99bc0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.MalformedURLException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -190,7 +189,7 @@ public final class ZkController { //Overseer.createClientNodes(zkClient, getNodeName()); ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader); - overseerElector.joinElection(context, null); + overseerElector.joinElection(context); zkStateReader.createClusterStateWatchersAndUpdate(); List descriptors = registerOnReconnect @@ -203,7 +202,7 @@ public final class ZkController { + descriptor.getName(); publishAsDown(getBaseUrl(), descriptor, coreZkNodeName, descriptor.getName()); - waitForLeaderToSeeDownState(descriptor, coreZkNodeName, true); + waitForLeaderToSeeDownState(descriptor, coreZkNodeName); } } @@ -367,7 +366,7 @@ public final class ZkController { overseerElector = new LeaderElector(zkClient); ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader); overseerElector.setup(context); - overseerElector.joinElection(context, null); + overseerElector.joinElection(context); zkStateReader.createClusterStateWatchersAndUpdate(); } catch (IOException e) { @@ -542,6 +541,18 @@ public final class ZkController { ZkNodeProps leaderProps = new ZkNodeProps(props); + try { + joinElection(desc); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (KeeperException e) { + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (IOException e) { + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } + // rather than look in the cluster state file, we go straight to the zknodes // here, because on cluster restart there could be stale leader info in the // cluster state node that won't be updated for a moment @@ -572,7 +583,7 @@ public final class ZkController { try { core = cc.getCore(desc.getName()); - + // recover from local transaction log and wait for it to complete before // going active // TODO: should this be moved to another thread? To recoveryStrat? @@ -593,7 +604,6 @@ public final class ZkController { // TODO: in the future we could do peerync in parallel with recoverFromLog } } - boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc, collection, coreZkNodeName, shardId, leaderProps, core, cc); @@ -642,14 +652,27 @@ public final class ZkController { } - private void joinElection(final String collection, - final String shardZkNodeName, String shardId, ZkNodeProps leaderProps, SolrCore core) throws InterruptedException, KeeperException, IOException { - ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, - collection, shardZkNodeName, leaderProps, this, cc); + private void joinElection(CoreDescriptor cd) throws InterruptedException, KeeperException, IOException { + String shardId = cd.getCloudDescriptor().getShardId(); + + Map props = new HashMap(); + // we only put a subset of props into the leader node + props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl()); + props.put(ZkStateReader.CORE_NAME_PROP, cd.getName()); + props.put(ZkStateReader.NODE_NAME_PROP, getNodeName()); + + final String coreZkNodeName = getNodeName() + "_" + cd.getName(); + ZkNodeProps ourProps = new ZkNodeProps(props); + String collection = cd.getCloudDescriptor() + .getCollectionName(); + + ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, + collection, coreZkNodeName, ourProps, this, cc); + leaderElector.setup(context); - electionContexts.put(shardZkNodeName, context); - leaderElector.joinElection(context, core); + electionContexts.put(coreZkNodeName, context); + leaderElector.joinElection(context); } @@ -1037,43 +1060,14 @@ public final class ZkController { uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName); } - public void preRegisterSetup(SolrCore core, CoreDescriptor cd, boolean waitForNotLive) { + public void preRegister(CoreDescriptor cd) { // before becoming available, make sure we are not live and active // this also gets us our assigned shard id if it was not specified - publish(cd, ZkStateReader.DOWN); - - String shardId = cd.getCloudDescriptor().getShardId(); - - Map props = new HashMap(); - // we only put a subset of props into the leader node - props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl()); - props.put(ZkStateReader.CORE_NAME_PROP, cd.getName()); - props.put(ZkStateReader.NODE_NAME_PROP, getNodeName()); - - final String coreZkNodeName = getNodeName() + "_" + cd.getName(); - ZkNodeProps ourProps = new ZkNodeProps(props); - String collection = cd.getCloudDescriptor() - .getCollectionName(); - - try { - joinElection(collection, coreZkNodeName, shardId, ourProps, core); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); - } catch (KeeperException e) { - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); - } catch (IOException e) { - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); - } - - - waitForLeaderToSeeDownState(cd, coreZkNodeName, waitForNotLive); - + publish(cd, ZkStateReader.DOWN); } private ZkCoreNodeProps waitForLeaderToSeeDownState( - CoreDescriptor descriptor, final String shardZkNodeName, boolean waitForNotLive) { + CoreDescriptor descriptor, final String coreZkNodeName) { CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); String collection = cloudDesc.getCollectionName(); String shard = cloudDesc.getShardId(); @@ -1097,8 +1091,6 @@ public final class ZkController { boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl); if (!isLeader && !SKIP_AUTO_RECOVERY) { - // wait until the leader sees us as down before we are willing to accept - // updates. CommonsHttpSolrServer server = null; try { server = new CommonsHttpSolrServer(leaderBaseUrl); @@ -1111,12 +1103,9 @@ public final class ZkController { WaitForState prepCmd = new WaitForState(); prepCmd.setCoreName(leaderCoreName); prepCmd.setNodeName(getNodeName()); - prepCmd.setCoreNodeName(shardZkNodeName); + prepCmd.setCoreNodeName(coreZkNodeName); prepCmd.setState(ZkStateReader.DOWN); - prepCmd.setPauseFor(5000); - if (waitForNotLive){ - prepCmd.setCheckLive(false); - } + prepCmd.setPauseFor(0); // let's retry a couple times - perhaps the leader just went down, // or perhaps he is just not quite ready for us yet diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 86911ba9f2c..4f5b4895984 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -542,7 +542,7 @@ public class CoreContainer if (zkController != null) { // this happens before we can receive requests - zkController.preRegisterSetup(core, core.getCoreDescriptor(), false); + zkController.preRegister(core.getCoreDescriptor()); } SolrCore old = null; diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java index 24635de636e..a932524e2dc 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java @@ -32,7 +32,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; -@Ignore("SOLR-3126") +//@Ignore("SOLR-3126") public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest { @BeforeClass diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java index 54d91c288d0..81a23530205 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java @@ -110,7 +110,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { try { elector.setup(context); - seq = elector.joinElection(context, null); + seq = elector.joinElection(context); electionDone = true; seqToThread.put(seq, this); } catch (InterruptedException e) { @@ -153,7 +153,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { ElectionContext context = new ShardLeaderElectionContextBase(elector, "shard2", "collection1", "dummynode1", props, zkStateReader); elector.setup(context); - elector.joinElection(context, null); + elector.joinElection(context); assertEquals("http://127.0.0.1/solr/", getLeaderUrl("collection1", "shard2")); } @@ -166,7 +166,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { ElectionContext firstContext = new ShardLeaderElectionContextBase(first, "slice1", "collection2", "dummynode1", props, zkStateReader); first.setup(firstContext); - first.joinElection(firstContext, null); + first.joinElection(firstContext); Thread.sleep(1000); assertEquals("original leader was not registered", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1")); @@ -177,7 +177,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { ElectionContext context = new ShardLeaderElectionContextBase(second, "slice1", "collection2", "dummynode1", props, zkStateReader); second.setup(context); - second.joinElection(context, null); + second.joinElection(context); Thread.sleep(1000); assertEquals("original leader should have stayed leader", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1")); firstContext.cancelElection(); diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 136b3e97b5e..ecee92bdc4d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -142,7 +142,7 @@ public class OverseerTest extends SolrTestCaseJ4 { ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase( elector, shardId, collection, nodeName + "_" + coreName, props, zkStateReader); - elector.joinElection(ctx, null); + elector.joinElection(ctx); break; } Thread.sleep(200); @@ -218,7 +218,7 @@ public class OverseerTest extends SolrTestCaseJ4 { collection1Desc.setCollectionName("collection1"); CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), ""); desc1.setCloudDescriptor(collection1Desc); - zkController.preRegisterSetup(null, desc1, false); + zkController.preRegister(desc1); ids[i] = zkController.register("core" + (i + 1), desc1); } @@ -318,7 +318,7 @@ public class OverseerTest extends SolrTestCaseJ4 { final CoreDescriptor desc = new CoreDescriptor(null, coreName, ""); desc.setCloudDescriptor(collection1Desc); try { - controllers[slot % nodeCount].preRegisterSetup(null, desc, false); + controllers[slot % nodeCount].preRegister(desc); ids[slot] = controllers[slot % nodeCount] .register(coreName, desc); } catch (Throwable e) { @@ -870,7 +870,7 @@ public class OverseerTest extends SolrTestCaseJ4 { LeaderElector overseerElector = new LeaderElector(zkClient); ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, reader); overseerElector.setup(ec); - overseerElector.joinElection(ec, null); + overseerElector.joinElection(ec); return zkClient; } } \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index 364c793b5b2..92f59bb8c2c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -190,7 +190,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 { collection1Desc.setCollectionName("collection1"); CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), ""); desc1.setCloudDescriptor(collection1Desc); - zkController.preRegisterSetup(null, desc1, false); + zkController.preRegister(desc1); ids[i] = zkController.register("core" + (i + 1), desc1); }