remove some api ugliness around solrcloud - we don't actually need it anymore

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1293986 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-02-27 01:09:14 +00:00
parent 3ef57d09df
commit b24e035e9f
8 changed files with 69 additions and 83 deletions

View File

@ -59,7 +59,7 @@ public abstract class ElectionContext {
// the given core may or may not be null - if you need access to the current core, you must pass // the given core may or may not be null - if you need access to the current core, you must pass
// the core container and core name to your context impl - then use this core ref if it is not null // the core container and core name to your context impl - then use this core ref if it is not null
// else access it from the core container // else access it from the core container
abstract void runLeaderProcess(boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException; abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException;
} }
class ShardLeaderElectionContextBase extends ElectionContext { class ShardLeaderElectionContextBase extends ElectionContext {
@ -81,7 +81,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
} }
@Override @Override
void runLeaderProcess(boolean weAreReplacement, SolrCore core) void runLeaderProcess(boolean weAreReplacement)
throws KeeperException, InterruptedException, IOException { throws KeeperException, InterruptedException, IOException {
try { try {
@ -116,7 +116,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} }
@Override @Override
void runLeaderProcess(boolean weAreReplacement, SolrCore startupCore) void runLeaderProcess(boolean weAreReplacement)
throws KeeperException, InterruptedException, IOException { throws KeeperException, InterruptedException, IOException {
if (cc != null) { if (cc != null) {
String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP); String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
@ -124,11 +124,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
try { try {
// the first time we are run, we will get a startupCore - after // the first time we are run, we will get a startupCore - after
// we will get null and must use cc.getCore // we will get null and must use cc.getCore
if (startupCore == null) {
core = cc.getCore(coreName); core = cc.getCore(coreName);
} else {
core = startupCore;
}
if (core == null) { if (core == null) {
cancelElection(); cancelElection();
throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames()); throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames());
@ -159,14 +157,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE); zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
} finally { } finally {
if (core != null && startupCore == null) { if (core != null ) {
core.close(); core.close();
} }
} }
} }
super.runLeaderProcess(weAreReplacement, startupCore); super.runLeaderProcess(weAreReplacement);
} }
private void rejoinLeaderElection(String leaderSeqPath, SolrCore core) private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
@ -181,7 +179,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName()); core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName());
leaderElector.joinElection(this, null); // don't pass core, pass null leaderElector.joinElection(this);
} }
private boolean shouldIBeLeader(ZkNodeProps leaderProps) { private boolean shouldIBeLeader(ZkNodeProps leaderProps) {
@ -249,7 +247,7 @@ final class OverseerElectionContext extends ElectionContext {
} }
@Override @Override
void runLeaderProcess(boolean weAreReplacement, SolrCore firstCore) throws KeeperException, InterruptedException { void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1); final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
ZkNodeProps myProps = new ZkNodeProps("id", id); ZkNodeProps myProps = new ZkNodeProps("id", id);

View File

@ -80,13 +80,12 @@ public class LeaderElector {
* @param seq * @param seq
* @param context * @param context
* @param replacement has someone else been the leader already? * @param replacement has someone else been the leader already?
* @param core
* @throws KeeperException * @throws KeeperException
* @throws InterruptedException * @throws InterruptedException
* @throws IOException * @throws IOException
* @throws UnsupportedEncodingException * @throws UnsupportedEncodingException
*/ */
private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement, SolrCore core) throws KeeperException, private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException { InterruptedException, IOException {
// get all other numbers... // get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE; final String holdElectionPath = context.electionPath + ELECTION_NODE;
@ -95,7 +94,7 @@ public class LeaderElector {
sortSeqs(seqs); sortSeqs(seqs);
List<Integer> intSeqs = getSeqs(seqs); List<Integer> intSeqs = getSeqs(seqs);
if (seq <= intSeqs.get(0)) { if (seq <= intSeqs.get(0)) {
runIamLeaderProcess(context, replacement, core); runIamLeaderProcess(context, replacement);
} else { } else {
// I am not the leader - watch the node below me // I am not the leader - watch the node below me
int i = 1; int i = 1;
@ -119,7 +118,7 @@ public class LeaderElector {
public void process(WatchedEvent event) { public void process(WatchedEvent event) {
// am I the next leader? // am I the next leader?
try { try {
checkIfIamLeader(seq, context, true, null); checkIfIamLeader(seq, context, true);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Restore the interrupted status // Restore the interrupted status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -137,15 +136,15 @@ public class LeaderElector {
} catch (KeeperException e) { } catch (KeeperException e) {
// we couldn't set our watch - the node before us may already be down? // we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again // we need to check if we are the leader again
checkIfIamLeader(seq, context, true, null); checkIfIamLeader(seq, context, true);
} }
} }
} }
// TODO: get this core param out of here // TODO: get this core param out of here
protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException, protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException { InterruptedException, IOException {
context.runLeaderProcess(weAreReplacement, core); context.runLeaderProcess(weAreReplacement);
} }
/** /**
@ -206,7 +205,7 @@ public class LeaderElector {
* @throws IOException * @throws IOException
* @throws UnsupportedEncodingException * @throws UnsupportedEncodingException
*/ */
public int joinElection(ElectionContext context, SolrCore core) throws KeeperException, InterruptedException, IOException { public int joinElection(ElectionContext context) throws KeeperException, InterruptedException, IOException {
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE; final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
long sessionId = zkClient.getSolrZooKeeper().getSessionId(); long sessionId = zkClient.getSolrZooKeeper().getSessionId();
@ -249,7 +248,7 @@ public class LeaderElector {
} }
} }
int seq = getSeq(leaderSeqPath); int seq = getSeq(leaderSeqPath);
checkIfIamLeader(seq, context, false, core); checkIfIamLeader(seq, context, false);
return seq; return seq;
} }

View File

@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -190,7 +189,7 @@ public final class ZkController {
//Overseer.createClientNodes(zkClient, getNodeName()); //Overseer.createClientNodes(zkClient, getNodeName());
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader); ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
overseerElector.joinElection(context, null); overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate(); zkStateReader.createClusterStateWatchersAndUpdate();
List<CoreDescriptor> descriptors = registerOnReconnect List<CoreDescriptor> descriptors = registerOnReconnect
@ -203,7 +202,7 @@ public final class ZkController {
+ descriptor.getName(); + descriptor.getName();
publishAsDown(getBaseUrl(), descriptor, coreZkNodeName, publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
descriptor.getName()); descriptor.getName());
waitForLeaderToSeeDownState(descriptor, coreZkNodeName, true); waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} }
} }
@ -367,7 +366,7 @@ public final class ZkController {
overseerElector = new LeaderElector(zkClient); overseerElector = new LeaderElector(zkClient);
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader); ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
overseerElector.setup(context); overseerElector.setup(context);
overseerElector.joinElection(context, null); overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate(); zkStateReader.createClusterStateWatchersAndUpdate();
} catch (IOException e) { } catch (IOException e) {
@ -542,6 +541,18 @@ public final class ZkController {
ZkNodeProps leaderProps = new ZkNodeProps(props); ZkNodeProps leaderProps = new ZkNodeProps(props);
try {
joinElection(desc);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (KeeperException e) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (IOException e) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
// rather than look in the cluster state file, we go straight to the zknodes // rather than look in the cluster state file, we go straight to the zknodes
// here, because on cluster restart there could be stale leader info in the // here, because on cluster restart there could be stale leader info in the
// cluster state node that won't be updated for a moment // cluster state node that won't be updated for a moment
@ -572,7 +583,7 @@ public final class ZkController {
try { try {
core = cc.getCore(desc.getName()); core = cc.getCore(desc.getName());
// recover from local transaction log and wait for it to complete before // recover from local transaction log and wait for it to complete before
// going active // going active
// TODO: should this be moved to another thread? To recoveryStrat? // TODO: should this be moved to another thread? To recoveryStrat?
@ -593,7 +604,6 @@ public final class ZkController {
// TODO: in the future we could do peerync in parallel with recoverFromLog // TODO: in the future we could do peerync in parallel with recoverFromLog
} }
} }
boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc, boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
collection, coreZkNodeName, shardId, leaderProps, core, cc); collection, coreZkNodeName, shardId, leaderProps, core, cc);
@ -642,14 +652,27 @@ public final class ZkController {
} }
private void joinElection(final String collection, private void joinElection(CoreDescriptor cd) throws InterruptedException, KeeperException, IOException {
final String shardZkNodeName, String shardId, ZkNodeProps leaderProps, SolrCore core) throws InterruptedException, KeeperException, IOException {
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
collection, shardZkNodeName, leaderProps, this, cc);
String shardId = cd.getCloudDescriptor().getShardId();
Map<String,String> props = new HashMap<String,String>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
final String coreZkNodeName = getNodeName() + "_" + cd.getName();
ZkNodeProps ourProps = new ZkNodeProps(props);
String collection = cd.getCloudDescriptor()
.getCollectionName();
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
collection, coreZkNodeName, ourProps, this, cc);
leaderElector.setup(context); leaderElector.setup(context);
electionContexts.put(shardZkNodeName, context); electionContexts.put(coreZkNodeName, context);
leaderElector.joinElection(context, core); leaderElector.joinElection(context);
} }
@ -1037,43 +1060,14 @@ public final class ZkController {
uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName); uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
} }
public void preRegisterSetup(SolrCore core, CoreDescriptor cd, boolean waitForNotLive) { public void preRegister(CoreDescriptor cd) {
// before becoming available, make sure we are not live and active // before becoming available, make sure we are not live and active
// this also gets us our assigned shard id if it was not specified // this also gets us our assigned shard id if it was not specified
publish(cd, ZkStateReader.DOWN); publish(cd, ZkStateReader.DOWN);
String shardId = cd.getCloudDescriptor().getShardId();
Map<String,String> props = new HashMap<String,String>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
final String coreZkNodeName = getNodeName() + "_" + cd.getName();
ZkNodeProps ourProps = new ZkNodeProps(props);
String collection = cd.getCloudDescriptor()
.getCollectionName();
try {
joinElection(collection, coreZkNodeName, shardId, ourProps, core);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (KeeperException e) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (IOException e) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
waitForLeaderToSeeDownState(cd, coreZkNodeName, waitForNotLive);
} }
private ZkCoreNodeProps waitForLeaderToSeeDownState( private ZkCoreNodeProps waitForLeaderToSeeDownState(
CoreDescriptor descriptor, final String shardZkNodeName, boolean waitForNotLive) { CoreDescriptor descriptor, final String coreZkNodeName) {
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
String collection = cloudDesc.getCollectionName(); String collection = cloudDesc.getCollectionName();
String shard = cloudDesc.getShardId(); String shard = cloudDesc.getShardId();
@ -1097,8 +1091,6 @@ public final class ZkController {
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl); boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
if (!isLeader && !SKIP_AUTO_RECOVERY) { if (!isLeader && !SKIP_AUTO_RECOVERY) {
// wait until the leader sees us as down before we are willing to accept
// updates.
CommonsHttpSolrServer server = null; CommonsHttpSolrServer server = null;
try { try {
server = new CommonsHttpSolrServer(leaderBaseUrl); server = new CommonsHttpSolrServer(leaderBaseUrl);
@ -1111,12 +1103,9 @@ public final class ZkController {
WaitForState prepCmd = new WaitForState(); WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName); prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(getNodeName()); prepCmd.setNodeName(getNodeName());
prepCmd.setCoreNodeName(shardZkNodeName); prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.DOWN); prepCmd.setState(ZkStateReader.DOWN);
prepCmd.setPauseFor(5000); prepCmd.setPauseFor(0);
if (waitForNotLive){
prepCmd.setCheckLive(false);
}
// let's retry a couple times - perhaps the leader just went down, // let's retry a couple times - perhaps the leader just went down,
// or perhaps he is just not quite ready for us yet // or perhaps he is just not quite ready for us yet

View File

@ -542,7 +542,7 @@ public class CoreContainer
if (zkController != null) { if (zkController != null) {
// this happens before we can receive requests // this happens before we can receive requests
zkController.preRegisterSetup(core, core.getCoreDescriptor(), false); zkController.preRegister(core.getCoreDescriptor());
} }
SolrCore old = null; SolrCore old = null;

View File

@ -32,7 +32,7 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
@Ignore("SOLR-3126") //@Ignore("SOLR-3126")
public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest { public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
@BeforeClass @BeforeClass

View File

@ -110,7 +110,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
try { try {
elector.setup(context); elector.setup(context);
seq = elector.joinElection(context, null); seq = elector.joinElection(context);
electionDone = true; electionDone = true;
seqToThread.put(seq, this); seqToThread.put(seq, this);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -153,7 +153,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
ElectionContext context = new ShardLeaderElectionContextBase(elector, ElectionContext context = new ShardLeaderElectionContextBase(elector,
"shard2", "collection1", "dummynode1", props, zkStateReader); "shard2", "collection1", "dummynode1", props, zkStateReader);
elector.setup(context); elector.setup(context);
elector.joinElection(context, null); elector.joinElection(context);
assertEquals("http://127.0.0.1/solr/", assertEquals("http://127.0.0.1/solr/",
getLeaderUrl("collection1", "shard2")); getLeaderUrl("collection1", "shard2"));
} }
@ -166,7 +166,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
ElectionContext firstContext = new ShardLeaderElectionContextBase(first, ElectionContext firstContext = new ShardLeaderElectionContextBase(first,
"slice1", "collection2", "dummynode1", props, zkStateReader); "slice1", "collection2", "dummynode1", props, zkStateReader);
first.setup(firstContext); first.setup(firstContext);
first.joinElection(firstContext, null); first.joinElection(firstContext);
Thread.sleep(1000); Thread.sleep(1000);
assertEquals("original leader was not registered", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1")); assertEquals("original leader was not registered", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1"));
@ -177,7 +177,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
ElectionContext context = new ShardLeaderElectionContextBase(second, ElectionContext context = new ShardLeaderElectionContextBase(second,
"slice1", "collection2", "dummynode1", props, zkStateReader); "slice1", "collection2", "dummynode1", props, zkStateReader);
second.setup(context); second.setup(context);
second.joinElection(context, null); second.joinElection(context);
Thread.sleep(1000); Thread.sleep(1000);
assertEquals("original leader should have stayed leader", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1")); assertEquals("original leader should have stayed leader", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1"));
firstContext.cancelElection(); firstContext.cancelElection();

View File

@ -142,7 +142,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase( ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase(
elector, shardId, collection, nodeName + "_" + coreName, props, elector, shardId, collection, nodeName + "_" + coreName, props,
zkStateReader); zkStateReader);
elector.joinElection(ctx, null); elector.joinElection(ctx);
break; break;
} }
Thread.sleep(200); Thread.sleep(200);
@ -218,7 +218,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
collection1Desc.setCollectionName("collection1"); collection1Desc.setCollectionName("collection1");
CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), ""); CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
desc1.setCloudDescriptor(collection1Desc); desc1.setCloudDescriptor(collection1Desc);
zkController.preRegisterSetup(null, desc1, false); zkController.preRegister(desc1);
ids[i] = zkController.register("core" + (i + 1), desc1); ids[i] = zkController.register("core" + (i + 1), desc1);
} }
@ -318,7 +318,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
final CoreDescriptor desc = new CoreDescriptor(null, coreName, ""); final CoreDescriptor desc = new CoreDescriptor(null, coreName, "");
desc.setCloudDescriptor(collection1Desc); desc.setCloudDescriptor(collection1Desc);
try { try {
controllers[slot % nodeCount].preRegisterSetup(null, desc, false); controllers[slot % nodeCount].preRegister(desc);
ids[slot] = controllers[slot % nodeCount] ids[slot] = controllers[slot % nodeCount]
.register(coreName, desc); .register(coreName, desc);
} catch (Throwable e) { } catch (Throwable e) {
@ -870,7 +870,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
LeaderElector overseerElector = new LeaderElector(zkClient); LeaderElector overseerElector = new LeaderElector(zkClient);
ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, reader); ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, reader);
overseerElector.setup(ec); overseerElector.setup(ec);
overseerElector.joinElection(ec, null); overseerElector.joinElection(ec);
return zkClient; return zkClient;
} }
} }

View File

@ -190,7 +190,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
collection1Desc.setCollectionName("collection1"); collection1Desc.setCollectionName("collection1");
CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), ""); CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
desc1.setCloudDescriptor(collection1Desc); desc1.setCloudDescriptor(collection1Desc);
zkController.preRegisterSetup(null, desc1, false); zkController.preRegister(desc1);
ids[i] = zkController.register("core" + (i + 1), desc1); ids[i] = zkController.register("core" + (i + 1), desc1);
} }