mirror of https://github.com/apache/lucene.git
SOLR-3126: hardening around peer sync and tests
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1245836 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d15af1ccbd
commit
458ff3ead7
|
@ -49,7 +49,10 @@ public abstract class ElectionContext {
|
|||
this.leaderProps = leaderProps;
|
||||
}
|
||||
|
||||
abstract void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException, IOException;
|
||||
// the given core may or may not be null - if you need access to the current core, you must pass
|
||||
// the core container and core name to your context impl - then use this core ref if it is not null
|
||||
// else access it from the core container
|
||||
abstract void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException;
|
||||
}
|
||||
|
||||
class ShardLeaderElectionContextBase extends ElectionContext {
|
||||
|
@ -71,7 +74,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement)
|
||||
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core)
|
||||
throws KeeperException, InterruptedException, IOException {
|
||||
|
||||
try {
|
||||
|
@ -106,13 +109,19 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement)
|
||||
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore startupCore)
|
||||
throws KeeperException, InterruptedException, IOException {
|
||||
if (cc != null) {
|
||||
String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
|
||||
SolrCore core = null;
|
||||
try {
|
||||
core = cc.getCore(coreName);
|
||||
// the first time we are run, we will get a startupCore - after
|
||||
// we will get null and must use cc.getCore
|
||||
if (startupCore == null) {
|
||||
core = cc.getCore(coreName);
|
||||
} else {
|
||||
core = startupCore;
|
||||
}
|
||||
if (core == null) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Core not found:" + coreName);
|
||||
}
|
||||
|
@ -144,17 +153,17 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
// If I am going to be the leader I have to be active
|
||||
|
||||
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
|
||||
zkController.publish(core, ZkStateReader.ACTIVE);
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
|
||||
|
||||
} finally {
|
||||
if (core != null) {
|
||||
if (core != null && startupCore == null) {
|
||||
core.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
super.runLeaderProcess(leaderSeqPath, weAreReplacement);
|
||||
super.runLeaderProcess(leaderSeqPath, weAreReplacement, startupCore);
|
||||
}
|
||||
|
||||
private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
|
||||
|
@ -162,12 +171,12 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
// remove our ephemeral and re join the election
|
||||
// System.out.println("sync failed, delete our election node:"
|
||||
// + leaderSeqPath);
|
||||
zkController.publish(core, ZkStateReader.DOWN);
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
|
||||
zkClient.delete(leaderSeqPath, -1, true);
|
||||
|
||||
core.getUpdateHandler().getSolrCoreState().doRecovery(core);
|
||||
|
||||
leaderElector.joinElection(this);
|
||||
leaderElector.joinElection(this, null);
|
||||
}
|
||||
|
||||
private boolean shouldIBeLeader(ZkNodeProps leaderProps) {
|
||||
|
@ -215,7 +224,7 @@ final class OverseerElectionContext extends ElectionContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException {
|
||||
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore firstCore) throws KeeperException, InterruptedException {
|
||||
|
||||
final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
|
||||
ZkNodeProps myProps = new ZkNodeProps("id", id);
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.ConnectionLossException;
|
||||
|
@ -79,12 +80,13 @@ public class LeaderElector {
|
|||
* @param seq
|
||||
* @param context
|
||||
* @param replacement has someone else been the leader already?
|
||||
* @param core
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
private void checkIfIamLeader(final String leaderSeqPath, final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
|
||||
private void checkIfIamLeader(final String leaderSeqPath, final int seq, final ElectionContext context, boolean replacement, SolrCore core) throws KeeperException,
|
||||
InterruptedException, IOException {
|
||||
// get all other numbers...
|
||||
final String holdElectionPath = context.electionPath + ELECTION_NODE;
|
||||
|
@ -93,7 +95,7 @@ public class LeaderElector {
|
|||
sortSeqs(seqs);
|
||||
List<Integer> intSeqs = getSeqs(seqs);
|
||||
if (seq <= intSeqs.get(0)) {
|
||||
runIamLeaderProcess(leaderSeqPath, context, replacement);
|
||||
runIamLeaderProcess(leaderSeqPath, context, replacement, core);
|
||||
} else {
|
||||
// I am not the leader - watch the node below me
|
||||
int i = 1;
|
||||
|
@ -117,7 +119,7 @@ public class LeaderElector {
|
|||
public void process(WatchedEvent event) {
|
||||
// am I the next leader?
|
||||
try {
|
||||
checkIfIamLeader(leaderSeqPath, seq, context, true);
|
||||
checkIfIamLeader(leaderSeqPath, seq, context, true, null);
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -135,16 +137,16 @@ public class LeaderElector {
|
|||
} catch (KeeperException e) {
|
||||
// we couldn't set our watch - the node before us may already be down?
|
||||
// we need to check if we are the leader again
|
||||
checkIfIamLeader(leaderSeqPath, seq, context, true);
|
||||
checkIfIamLeader(leaderSeqPath, seq, context, true, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: get this core param out of here
|
||||
protected void runIamLeaderProcess(String leaderSeqPath, final ElectionContext context, boolean weAreReplacement) throws KeeperException,
|
||||
protected void runIamLeaderProcess(String leaderSeqPath, final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException,
|
||||
InterruptedException, IOException {
|
||||
|
||||
context.runLeaderProcess(leaderSeqPath, weAreReplacement);
|
||||
context.runLeaderProcess(leaderSeqPath, weAreReplacement, core);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -205,7 +207,7 @@ public class LeaderElector {
|
|||
* @throws IOException
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public int joinElection(ElectionContext context) throws KeeperException, InterruptedException, IOException {
|
||||
public int joinElection(ElectionContext context, SolrCore core) throws KeeperException, InterruptedException, IOException {
|
||||
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
|
||||
|
||||
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
|
||||
|
@ -247,7 +249,7 @@ public class LeaderElector {
|
|||
}
|
||||
}
|
||||
int seq = getSeq(leaderSeqPath);
|
||||
checkIfIamLeader(leaderSeqPath, seq, context, false);
|
||||
checkIfIamLeader(leaderSeqPath, seq, context, false, core);
|
||||
|
||||
return seq;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.concurrent.TimeoutException;
|
|||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
|
@ -173,10 +173,13 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl);
|
||||
server.setConnectionTimeout(45000);
|
||||
server.setSoTimeout(45000);
|
||||
PrepRecovery prepCmd = new PrepRecovery();
|
||||
WaitForState prepCmd = new WaitForState();
|
||||
prepCmd.setCoreName(leaderCoreName);
|
||||
prepCmd.setNodeName(zkController.getNodeName());
|
||||
prepCmd.setCoreNodeName(coreZkNodeName);
|
||||
prepCmd.setState(ZkStateReader.RECOVERING);
|
||||
prepCmd.setCheckLive(true);
|
||||
prepCmd.setPauseFor(4000);
|
||||
|
||||
server.request(prepCmd);
|
||||
server.shutdown();
|
||||
|
@ -206,7 +209,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
while (!succesfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
|
||||
try {
|
||||
// first thing we just try to sync
|
||||
zkController.publish(core, ZkStateReader.RECOVERING);
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
|
||||
|
||||
CloudDescriptor cloudDesc = core.getCoreDescriptor()
|
||||
.getCloudDescriptor();
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.solr.cloud;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -32,6 +33,8 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.CloudState;
|
||||
|
@ -177,22 +180,24 @@ public final class ZkController {
|
|||
//Overseer.createClientNodes(zkClient, getNodeName());
|
||||
|
||||
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
|
||||
overseerElector.joinElection(context);
|
||||
overseerElector.joinElection(context, null);
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
List<CoreDescriptor> descriptors = registerOnReconnect
|
||||
.getCurrentDescriptors();
|
||||
if (descriptors != null) {
|
||||
// before registering as live, make sure everyone is in a
|
||||
// recovery state
|
||||
// down state
|
||||
for (CoreDescriptor descriptor : descriptors) {
|
||||
final String shardZkNodeName = getNodeName() + "_"
|
||||
final String coreZkNodeName = getNodeName() + "_"
|
||||
+ descriptor.getName();
|
||||
publishAsDown(getBaseUrl(), descriptor, shardZkNodeName,
|
||||
publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
|
||||
descriptor.getName());
|
||||
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// we have to register as live first to pick up docs in the buffer
|
||||
createEphemeralLiveNode();
|
||||
|
||||
|
@ -218,6 +223,8 @@ public final class ZkController {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
});
|
||||
cmdExecutor = new ZkCmdExecutor();
|
||||
leaderElector = new LeaderElector(zkClient);
|
||||
|
@ -337,7 +344,7 @@ public final class ZkController {
|
|||
overseerElector = new LeaderElector(zkClient);
|
||||
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
|
||||
overseerElector.setup(context);
|
||||
overseerElector.joinElection(context);
|
||||
overseerElector.joinElection(context, null);
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
} catch (IOException e) {
|
||||
|
@ -478,30 +485,23 @@ public final class ZkController {
|
|||
String shardId = cloudDesc.getShardId();
|
||||
|
||||
Map<String,String> props = new HashMap<String,String>();
|
||||
// we only put a subset of props into the leader node
|
||||
props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, coreName);
|
||||
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
|
||||
props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
|
||||
props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
|
||||
|
||||
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info("Register shard - core:" + coreName + " address:"
|
||||
+ baseUrl + " shardId:" + shardId);
|
||||
}
|
||||
|
||||
// we only put a subset of props into the leader node
|
||||
ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
|
||||
props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
|
||||
props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.NODE_NAME_PROP,
|
||||
props.get(ZkStateReader.NODE_NAME_PROP));
|
||||
|
||||
|
||||
joinElection(collection, coreZkNodeName, shardId, leaderProps);
|
||||
ZkNodeProps leaderProps = new ZkNodeProps(props);
|
||||
|
||||
// rather than look in the cluster state file, we go straight to the zknodes
|
||||
// here, because on cluster restart there could be stale leader info in the
|
||||
// cluster state node that won't be updated for a moment
|
||||
String leaderUrl = getLeaderUrl(collection, cloudDesc.getShardId());
|
||||
String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
|
||||
|
||||
// now wait until our currently cloud state contains the latest leader
|
||||
String cloudStateLeader = zkStateReader.getLeaderUrl(collection, cloudDesc.getShardId(), 30000);
|
||||
|
@ -573,7 +573,7 @@ public final class ZkController {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get leader URL directly from zk nodes.
|
||||
* Get leader props directly from zk nodes.
|
||||
*
|
||||
* @param collection
|
||||
* @param slice
|
||||
|
@ -581,7 +581,7 @@ public final class ZkController {
|
|||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private String getLeaderUrl(final String collection, final String slice)
|
||||
private ZkCoreNodeProps getLeaderProps(final String collection, final String slice)
|
||||
throws KeeperException, InterruptedException {
|
||||
int iterCount = 60;
|
||||
while (iterCount-- > 0)
|
||||
|
@ -591,7 +591,7 @@ public final class ZkController {
|
|||
true);
|
||||
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
|
||||
ZkNodeProps.load(data));
|
||||
return leaderProps.getCoreUrl();
|
||||
return leaderProps;
|
||||
} catch (NoNodeException e) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
@ -600,12 +600,12 @@ public final class ZkController {
|
|||
|
||||
|
||||
private void joinElection(final String collection,
|
||||
final String shardZkNodeName, String shardId, ZkNodeProps leaderProps) throws InterruptedException, KeeperException, IOException {
|
||||
final String shardZkNodeName, String shardId, ZkNodeProps leaderProps, SolrCore core) throws InterruptedException, KeeperException, IOException {
|
||||
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
|
||||
collection, shardZkNodeName, leaderProps, this, cc);
|
||||
|
||||
leaderElector.setup(context);
|
||||
leaderElector.joinElection(context);
|
||||
leaderElector.joinElection(context, core);
|
||||
}
|
||||
|
||||
|
||||
|
@ -671,15 +671,14 @@ public final class ZkController {
|
|||
publishState(cd, shardZkNodeName, coreName, finalProps);
|
||||
}
|
||||
|
||||
public void publish(SolrCore core, String state) {
|
||||
CoreDescriptor cd = core.getCoreDescriptor();
|
||||
public void publish(CoreDescriptor cd, String state) {
|
||||
Map<String,String> finalProps = new HashMap<String,String>();
|
||||
finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
|
||||
finalProps.put(ZkStateReader.CORE_NAME_PROP, core.getName());
|
||||
finalProps.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
|
||||
finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
|
||||
finalProps.put(ZkStateReader.STATE_PROP, state);
|
||||
publishState(cd, getNodeName() + "_" + core.getName(),
|
||||
core.getName(), finalProps);
|
||||
publishState(cd, getNodeName() + "_" + cd.getName(),
|
||||
cd.getName(), finalProps);
|
||||
}
|
||||
|
||||
void publishAsDown(String baseUrl,
|
||||
|
@ -959,4 +958,93 @@ public final class ZkController {
|
|||
uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
|
||||
}
|
||||
|
||||
public void preRegisterSetup(SolrCore core, CoreDescriptor cd) {
|
||||
// before becoming available, make sure we are not live and active
|
||||
// this also gets us our assigned shard id if it was not specified
|
||||
publish(cd, ZkStateReader.DOWN);
|
||||
|
||||
String shardId = cd.getCloudDescriptor().getShardId();
|
||||
|
||||
Map<String,String> props = new HashMap<String,String>();
|
||||
// we only put a subset of props into the leader node
|
||||
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
|
||||
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
|
||||
|
||||
final String coreZkNodeName = getNodeName() + "_" + cd.getName();
|
||||
ZkNodeProps ourProps = new ZkNodeProps(props);
|
||||
String collection = cd.getCloudDescriptor()
|
||||
.getCollectionName();
|
||||
|
||||
try {
|
||||
joinElection(collection, coreZkNodeName, shardId, ourProps, core);
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
|
||||
} catch (KeeperException e) {
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
|
||||
} catch (IOException e) {
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
|
||||
}
|
||||
|
||||
|
||||
waitForLeaderToSeeDownState(cd, coreZkNodeName);
|
||||
|
||||
}
|
||||
|
||||
private ZkCoreNodeProps waitForLeaderToSeeDownState(
|
||||
CoreDescriptor descriptor, final String shardZkNodeName) {
|
||||
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
|
||||
String collection = cloudDesc.getCollectionName();
|
||||
String shard = cloudDesc.getShardId();
|
||||
ZkCoreNodeProps leaderProps;
|
||||
try {
|
||||
// go straight to zk, not the cloud state - we must have current info
|
||||
leaderProps = getLeaderProps(collection, shard);
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
|
||||
} catch (KeeperException e) {
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
|
||||
}
|
||||
|
||||
String leaderBaseUrl = leaderProps.getBaseUrl();
|
||||
String leaderCoreName = leaderProps.getCoreName();
|
||||
|
||||
String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(),
|
||||
descriptor.getName());
|
||||
|
||||
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
|
||||
if (!isLeader && !SKIP_AUTO_RECOVERY) {
|
||||
// wait until the leader sees us as down before we are willing to accept
|
||||
// updates.
|
||||
CommonsHttpSolrServer server = null;
|
||||
try {
|
||||
server = new CommonsHttpSolrServer(leaderBaseUrl);
|
||||
} catch (MalformedURLException e) {
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
||||
e);
|
||||
}
|
||||
server.setConnectionTimeout(45000);
|
||||
server.setSoTimeout(45000);
|
||||
WaitForState prepCmd = new WaitForState();
|
||||
prepCmd.setCoreName(leaderCoreName);
|
||||
prepCmd.setNodeName(getNodeName());
|
||||
prepCmd.setCoreNodeName(shardZkNodeName);
|
||||
prepCmd.setState(ZkStateReader.DOWN);
|
||||
prepCmd.setCheckLive(false);
|
||||
|
||||
try {
|
||||
server.request(prepCmd);
|
||||
} catch (Exception e) {
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Could not talk to the leader", e);
|
||||
}
|
||||
server.shutdown();
|
||||
}
|
||||
return leaderProps;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -539,9 +539,8 @@ public class CoreContainer
|
|||
}
|
||||
|
||||
if (zkController != null) {
|
||||
// before becoming available, make sure we are not live and active
|
||||
// this also gets us our assigned shard id if it was not specified
|
||||
zkController.publish(core, ZkStateReader.DOWN);
|
||||
// this happens before we can receive requests
|
||||
zkController.preRegisterSetup(core, core.getCoreDescriptor());
|
||||
}
|
||||
|
||||
SolrCore old = null;
|
||||
|
@ -587,7 +586,7 @@ public class CoreContainer
|
|||
} catch (Exception e) {
|
||||
// if register fails, this is really bad - close the zkController to
|
||||
// minimize any damage we can cause
|
||||
zkController.publish(core, ZkStateReader.DOWN);
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
|
||||
log.error("", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
||||
e);
|
||||
|
|
|
@ -178,7 +178,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
}
|
||||
|
||||
case PREPRECOVERY: {
|
||||
this.handlePrepRecoveryAction(req, rsp);
|
||||
this.handleWaitForStateAction(req, rsp);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -614,7 +614,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected void handlePrepRecoveryAction(SolrQueryRequest req,
|
||||
protected void handleWaitForStateAction(SolrQueryRequest req,
|
||||
SolrQueryResponse rsp) throws IOException, InterruptedException {
|
||||
final SolrParams params = req.getParams();
|
||||
|
||||
|
@ -625,8 +625,9 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
|
||||
String nodeName = params.get("nodeName");
|
||||
String coreNodeName = params.get("coreNodeName");
|
||||
|
||||
|
||||
String waitForState = params.get("state");
|
||||
boolean checkLive = params.getBool("checkLive", true);
|
||||
int pauseFor = params.getInt("pauseFor", 0);
|
||||
SolrCore core = null;
|
||||
|
||||
try {
|
||||
|
@ -653,16 +654,19 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
|
||||
state = nodeProps.get(ZkStateReader.STATE_PROP);
|
||||
live = cloudState.liveNodesContain(nodeName);
|
||||
if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)
|
||||
&& live) {
|
||||
break;
|
||||
if (nodeProps != null && state.equals(waitForState)) {
|
||||
if (checkLive && live) {
|
||||
break;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (retry++ == 30) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"I was asked to prep for recovery for " + nodeName
|
||||
+ " but she is not live or not in a recovery state - state: " + state + " live:" + live);
|
||||
"I was asked to wait on state " + waitForState + " for " + nodeName
|
||||
+ " but I still do not see the request state. I see state: " + state + " live:" + live);
|
||||
}
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
@ -672,21 +676,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
// kept it from sending the update to be buffered -
|
||||
// pause for a while to let any outstanding updates finish
|
||||
|
||||
Thread.sleep(4000);
|
||||
|
||||
UpdateRequestProcessorChain processorChain = core
|
||||
.getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
|
||||
|
||||
ModifiableSolrParams reqParams = new ModifiableSolrParams(req.getParams());
|
||||
reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
|
||||
|
||||
SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
|
||||
UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
|
||||
new SolrQueryResponse());
|
||||
CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
|
||||
|
||||
processor.processCommit(cuc);
|
||||
processor.finish();
|
||||
Thread.sleep(pauseFor);
|
||||
|
||||
// solrcloud_debug
|
||||
// try {
|
||||
|
|
|
@ -170,7 +170,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
forwardToLeader = false;
|
||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
||||
.getReplicaProps(collection, shardId, zkController.getNodeName(),
|
||||
coreName);
|
||||
coreName, null, ZkStateReader.DOWN);
|
||||
if (replicaProps != null) {
|
||||
nodes = new ArrayList<Node>(replicaProps.size());
|
||||
for (ZkCoreNodeProps props : replicaProps) {
|
||||
|
|
|
@ -44,7 +44,6 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
super.setUp();
|
||||
log.info("####SETUP_START " + getName());
|
||||
createTempDir();
|
||||
ignoreException("java.nio.channels.ClosedChannelException");
|
||||
|
||||
String zkDir = testDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
|
@ -84,13 +83,13 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
}
|
||||
|
||||
protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose)
|
||||
throws KeeperException, InterruptedException {
|
||||
waitForRecoveriesToFinish(collection, zkStateReader, verbose, false);
|
||||
throws Exception {
|
||||
waitForRecoveriesToFinish(collection, zkStateReader, verbose, true);
|
||||
}
|
||||
|
||||
protected void waitForRecoveriesToFinish(String collection,
|
||||
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
|
||||
throws KeeperException, InterruptedException {
|
||||
throws Exception {
|
||||
boolean cont = true;
|
||||
int cnt = 0;
|
||||
|
||||
|
@ -117,12 +116,13 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!sawLiveRecovering || cnt == 15) {
|
||||
if (!sawLiveRecovering || cnt == 120) {
|
||||
if (!sawLiveRecovering) {
|
||||
if (verbose) System.out.println("no one is recoverying");
|
||||
} else {
|
||||
if (failOnTimeout) {
|
||||
fail("There are still nodes recoverying");
|
||||
printLayout();
|
||||
return;
|
||||
}
|
||||
if (verbose) System.out
|
||||
|
@ -130,7 +130,7 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
|
|||
}
|
||||
cont = false;
|
||||
} else {
|
||||
Thread.sleep(2000);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
cnt++;
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
// we expect this time of exception as shards go up and down...
|
||||
ignoreException(".*");
|
||||
//ignoreException(".*");
|
||||
|
||||
// sometimes we cannot get the same port
|
||||
ignoreException("java\\.net\\.BindException: Address already in use");
|
||||
|
|
|
@ -648,7 +648,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
|
|||
}
|
||||
|
||||
protected void waitForRecoveriesToFinish(boolean verbose)
|
||||
throws KeeperException, InterruptedException {
|
||||
throws Exception {
|
||||
super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose);
|
||||
}
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
|
|||
|
||||
try {
|
||||
elector.setup(context);
|
||||
seq = elector.joinElection(context);
|
||||
seq = elector.joinElection(context, null);
|
||||
electionDone = true;
|
||||
seqToThread.put(seq, this);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -153,7 +153,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
|
|||
ElectionContext context = new ShardLeaderElectionContextBase(elector,
|
||||
"shard2", "collection1", "dummynode1", props, zkStateReader);
|
||||
elector.setup(context);
|
||||
elector.joinElection(context);
|
||||
elector.joinElection(context, null);
|
||||
assertEquals("http://127.0.0.1/solr/",
|
||||
getLeaderUrl("collection1", "shard2"));
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.zookeeper.CreateMode;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.Code;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -100,6 +101,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
System.setProperty("solrcloud.skip.autorecovery", "true");
|
||||
initCore();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
System.clearProperty("solrcloud.skip.autorecovery");
|
||||
initCore();
|
||||
}
|
||||
|
||||
|
@ -143,9 +151,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
collection1Desc.setCollectionName("collection1");
|
||||
CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
|
||||
desc1.setCloudDescriptor(collection1Desc);
|
||||
zkController.publishAsDown(zkController.getBaseUrl(), desc1,
|
||||
zkController.getNodeName() + "_" + "core" + (i + 1), "core"
|
||||
+ (i + 1));
|
||||
zkController.preRegisterSetup(null, desc1);
|
||||
ids[i] = zkController.register("core" + (i + 1), desc1);
|
||||
}
|
||||
|
||||
|
@ -242,10 +248,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
final CoreDescriptor desc = new CoreDescriptor(null, coreName, "");
|
||||
desc.setCloudDescriptor(collection1Desc);
|
||||
try {
|
||||
controllers[slot % nodeCount].publishAsDown(controllers[slot
|
||||
% nodeCount].getBaseUrl(), desc, controllers[slot
|
||||
% nodeCount].getNodeName()
|
||||
+ "_" + coreName, coreName);
|
||||
controllers[slot % nodeCount].preRegisterSetup(null, desc);
|
||||
ids[slot] = controllers[slot % nodeCount]
|
||||
.register(coreName, desc);
|
||||
} catch (Throwable e) {
|
||||
|
@ -673,7 +676,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
LeaderElector overseerElector = new LeaderElector(zkClient);
|
||||
ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, reader);
|
||||
overseerElector.setup(ec);
|
||||
overseerElector.joinElection(ec);
|
||||
overseerElector.joinElection(ec, null);
|
||||
return zkClient;
|
||||
}
|
||||
}
|
|
@ -115,11 +115,14 @@ public class CoreAdminRequest extends SolrRequest
|
|||
|
||||
}
|
||||
|
||||
public static class PrepRecovery extends CoreAdminRequest {
|
||||
public static class WaitForState extends CoreAdminRequest {
|
||||
protected String nodeName;
|
||||
protected String coreNodeName;
|
||||
|
||||
public PrepRecovery() {
|
||||
protected String state;
|
||||
protected Boolean checkLive;
|
||||
protected Integer pauseFor;
|
||||
|
||||
public WaitForState() {
|
||||
action = CoreAdminAction.PREPRECOVERY;
|
||||
}
|
||||
|
||||
|
@ -139,6 +142,30 @@ public class CoreAdminRequest extends SolrRequest
|
|||
this.coreNodeName = coreNodeName;
|
||||
}
|
||||
|
||||
public String getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public void setState(String state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public Boolean getCheckLive() {
|
||||
return checkLive;
|
||||
}
|
||||
|
||||
public void setCheckLive(Boolean checkLive) {
|
||||
this.checkLive = checkLive;
|
||||
}
|
||||
|
||||
public Integer getPauseFor() {
|
||||
return pauseFor;
|
||||
}
|
||||
|
||||
public void setPauseFor(Integer pauseFor) {
|
||||
this.pauseFor = pauseFor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
if( action == null ) {
|
||||
|
@ -156,6 +183,18 @@ public class CoreAdminRequest extends SolrRequest
|
|||
if (coreNodeName != null) {
|
||||
params.set( "coreNodeName", coreNodeName);
|
||||
}
|
||||
|
||||
if (state != null) {
|
||||
params.set( "state", state);
|
||||
}
|
||||
|
||||
if (checkLive != null) {
|
||||
params.set( "checkLive", checkLive);
|
||||
}
|
||||
|
||||
if (pauseFor != null) {
|
||||
params.set( "pauseFor", pauseFor);
|
||||
}
|
||||
|
||||
return params;
|
||||
}
|
||||
|
|
|
@ -420,7 +420,12 @@ public class ZkStateReader {
|
|||
}
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection,
|
||||
String shardId, String thisNodeName, String coreName, String stateFilter) {
|
||||
String shardId, String thisNodeName, String coreName, String mustMatchStateFilter) {
|
||||
return getReplicaProps(collection, shardId, thisNodeName, coreName, mustMatchStateFilter, null);
|
||||
}
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection,
|
||||
String shardId, String thisNodeName, String coreName, String mustMatchStateFilter, String mustNotMatchStateFilter) {
|
||||
CloudState cloudState = this.cloudState;
|
||||
if (cloudState == null) {
|
||||
return null;
|
||||
|
@ -444,8 +449,10 @@ public class ZkStateReader {
|
|||
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
|
||||
String coreNodeName = nodeProps.getNodeName() + "_" + nodeProps.getCoreName();
|
||||
if (cloudState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(filterNodeName)) {
|
||||
if (stateFilter == null || stateFilter.equals(nodeProps.getState())) {
|
||||
nodes.add(nodeProps);
|
||||
if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState())) {
|
||||
if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState())) {
|
||||
nodes.add(nodeProps);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue