CollectionsAPI call REBALANCELEADERS

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1632594 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2014-10-17 14:50:14 +00:00
parent c4f4d6efab
commit f1759bc6ad
12 changed files with 291 additions and 29 deletions

View File

@ -178,6 +178,9 @@ New Features
* SOLR-4715: Add CloudSolrServer constructors which accept a HttpClient instance. * SOLR-4715: Add CloudSolrServer constructors which accept a HttpClient instance.
(Hardik Upadhyay, Shawn Heisey, shalin) (Hardik Upadhyay, Shawn Heisey, shalin)
* SOLR-6517: CollectionsAPI call REBALANCELEADERS. Used to balance leaders
across nodes for a particular collection
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -212,7 +212,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
int leaderVoteWait = cc.getZkController().getLeaderVoteWait(); int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
if (!weAreReplacement) { if (!weAreReplacement) {
waitForReplicasToComeUp(weAreReplacement, leaderVoteWait); waitForReplicasToComeUp(leaderVoteWait);
} }
try (SolrCore core = cc.getCore(coreName)) { try (SolrCore core = cc.getCore(coreName)) {
@ -226,7 +226,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// should I be leader? // should I be leader?
if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) { if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
rejoinLeaderElection(leaderSeqPath, core); rejoinLeaderElection(core);
return; return;
} }
@ -297,7 +297,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} }
} }
if (!success) { if (!success) {
rejoinLeaderElection(leaderSeqPath, core); rejoinLeaderElection(core);
return; return;
} }
@ -323,7 +323,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
core.getCoreDescriptor().getCloudDescriptor().setLeader(false); core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
// we could not publish ourselves as leader - try and rejoin election // we could not publish ourselves as leader - try and rejoin election
rejoinLeaderElection(leaderSeqPath, core); rejoinLeaderElection(core);
} }
} }
@ -401,7 +401,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} // core gets closed automagically } // core gets closed automagically
} }
private void waitForReplicasToComeUp(boolean weAreReplacement, int timeoutms) throws InterruptedException { private void waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS); long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE; final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
@ -448,11 +448,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} }
} }
private void rejoinLeaderElection(String leaderSeqPath, SolrCore core) private void rejoinLeaderElection(SolrCore core)
throws InterruptedException, KeeperException, IOException { throws InterruptedException, KeeperException, IOException {
// remove our ephemeral and re join the election // remove our ephemeral and re join the election
if (cc.isShutDown()) { if (cc.isShutDown()) {
log.info("Not rejoining election because CoreContainer is close"); log.info("Not rejoining election because CoreContainer is closed");
return; return;
} }

View File

@ -106,7 +106,6 @@ public class LeaderElector {
return; return;
} }
// first we delete the node advertising the old leader in case the ephem is still there // first we delete the node advertising the old leader in case the ephem is still there
// first we delete the node advertising the old leader in case the ephem is still there
try { try {
zkClient.delete(context.leaderPath, -1, true); zkClient.delete(context.leaderPath, -1, true);
}catch (KeeperException.NoNodeException nne){ }catch (KeeperException.NoNodeException nne){
@ -244,7 +243,7 @@ public class LeaderElector {
try { try {
if(joinAtHead){ if(joinAtHead){
log.info("node {} Trying to join election at the head ", id); log.info("node {} Trying to join election at the head ", id);
List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient); List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
if(nodes.size() <2){ if(nodes.size() <2){
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null, leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false); CreateMode.EPHEMERAL_SEQUENTIAL, false);

View File

@ -130,7 +130,9 @@ public class Overseer implements Closeable {
static enum LeaderStatus {DONT_KNOW, NO, YES} static enum LeaderStatus {DONT_KNOW, NO, YES}
public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of("property.preferredleader"); public static final String preferredLeaderProp = COLL_PROP_PREFIX + "preferredleader";
public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of(preferredLeaderProp);
private long lastUpdatedTime = 0; private long lastUpdatedTime = 0;
@ -1169,7 +1171,7 @@ public class Overseer implements Closeable {
return null; return null;
} }
ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) { private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates())); // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice); // System.out.println("Updating slice:" + slice);
DocCollection newCollection = null; DocCollection newCollection = null;
@ -1396,7 +1398,6 @@ public class Overseer implements Closeable {
} }
} }
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice. // Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
private class ExclusiveSliceProperty { private class ExclusiveSliceProperty {
private ClusterStateUpdater updater; private ClusterStateUpdater updater;
@ -1698,6 +1699,7 @@ public class Overseer implements Closeable {
this.replica = replica; this.replica = replica;
} }
} }
static void getShardNames(Integer numShards, List<String> shardNames) { static void getShardNames(Integer numShards, List<String> shardNames) {
if(numShards == null) if(numShards == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param"); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param");

View File

@ -18,7 +18,9 @@ package org.apache.solr.cloud;
*/ */
import static org.apache.solr.cloud.Assign.getNodesForNewShard; import static org.apache.solr.cloud.Assign.getNodesForNewShard;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP; import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP; import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@ -31,9 +33,9 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.CL
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD; import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD; import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -441,7 +443,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
String ldr = getLeaderNode(zk); String ldr = getLeaderNode(zk);
if(overseerDesignates.contains(ldr)) return; if(overseerDesignates.contains(ldr)) return;
log.info("prioritizing overseer nodes at {} overseer designates are {}", myId, overseerDesignates); log.info("prioritizing overseer nodes at {} overseer designates are {}", myId, overseerDesignates);
List<String> electionNodes = getSortedElectionNodes(zk); List<String> electionNodes = getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
if(electionNodes.size()<2) return; if(electionNodes.size()<2) return;
log.info("sorted nodes {}", electionNodes); log.info("sorted nodes {}", electionNodes);
@ -484,10 +486,10 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
return nodeNames; return nodeNames;
} }
public static List<String> getSortedElectionNodes(SolrZkClient zk) throws KeeperException, InterruptedException { public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
List<String> children = null; List<String> children = null;
try { try {
children = zk.getChildren(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE, null, true); children = zk.getChildren(path, null, true);
LeaderElector.sortSeqs(children); LeaderElector.sortSeqs(children);
return children; return children;
} catch (Exception e) { } catch (Exception e) {
@ -651,6 +653,9 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
case BALANCESLICEUNIQUE: case BALANCESLICEUNIQUE:
balanceProperty(message); balanceProperty(message);
break; break;
case REBALANCELEADERS:
processAssignLeaders(message);
break;
default: default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation); + operation);
@ -676,6 +681,32 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
return new OverseerSolrResponse(results); return new OverseerSolrResponse(results);
} }
@SuppressWarnings("unchecked")
// re-purpose BALANCELEADERS to reassign a single leader over here
private void processAssignLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
String collectionName = message.getStr(COLLECTION_PROP);
String shardId = message.getStr(SHARD_ID_PROP);
String baseURL = message.getStr(BASE_URL_PROP);
String coreName = message.getStr(CORE_NAME_PROP);
if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(shardId) || StringUtils.isBlank(baseURL) ||
StringUtils.isBlank(coreName)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The '%s', '%s', '%s' and '%s' parameters are required when assigning a leader",
COLLECTION_PROP, SHARD_ID_PROP, BASE_URL_PROP, CORE_NAME_PROP));
}
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, shardId);
propMap.put(BASE_URL_PROP, baseURL);
propMap.put(CORE_NAME_PROP, coreName);
inQueue.offer(zkStateReader.toJSON(propMap));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException { private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
@ -684,7 +715,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
StringUtils.isBlank(message.getStr(PROPERTY_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP)) ||
StringUtils.isBlank(message.getStr(PROPERTY_VALUE_PROP))) { StringUtils.isBlank(message.getStr(PROPERTY_VALUE_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST, throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete' operations", String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP)); COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP));
} }
SolrZkClient zkClient = zkStateReader.getZkClient(); SolrZkClient zkClient = zkStateReader.getZkClient();
@ -702,7 +733,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
StringUtils.isBlank(message.getStr(REPLICA_PROP)) || StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
StringUtils.isBlank(message.getStr(PROPERTY_PROP))) { StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST, throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete' operations", String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP)); COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP));
} }
SolrZkClient zkClient = zkStateReader.getZkClient(); SolrZkClient zkClient = zkStateReader.getZkClient();

View File

@ -834,7 +834,13 @@ public final class ZkController {
ZkNodeProps leaderProps = new ZkNodeProps(props); ZkNodeProps leaderProps = new ZkNodeProps(props);
try { try {
joinElection(desc, afterExpiration); // If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false;
Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
if (replica != null) {
joinAtHead = replica.getBool(Overseer.preferredLeaderProp, false);
}
joinElection(desc, afterExpiration, joinAtHead);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Restore the interrupted status // Restore the interrupted status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -988,7 +994,8 @@ public final class ZkController {
} }
private void joinElection(CoreDescriptor cd, boolean afterExpiration) throws InterruptedException, KeeperException, IOException { private void joinElection(CoreDescriptor cd, boolean afterExpiration, boolean joinAtHead)
throws InterruptedException, KeeperException, IOException {
// look for old context - if we find it, cancel it // look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName(); String collection = cd.getCloudDescriptor().getCollectionName();
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
@ -1018,7 +1025,7 @@ public final class ZkController {
leaderElector.setup(context); leaderElector.setup(context);
electionContexts.put(contextKey, context); electionContexts.put(contextKey, context);
leaderElector.joinElection(context, false); leaderElector.joinElection(context, false, joinAtHead);
} }

View File

@ -30,16 +30,23 @@ import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER; import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP; import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap; import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP; import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP; import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESLICEUNIQUE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESLICEUNIQUE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
@ -51,6 +58,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.DE
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD; import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD; import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD; import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
@ -80,6 +88,8 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
@ -252,6 +262,10 @@ public class CollectionsHandler extends RequestHandlerBase {
this.handleBalanceSliceUnique(req, rsp); this.handleBalanceSliceUnique(req, rsp);
break; break;
} }
case REBALANCELEADERS: {
this.handleBalanceLeaders(req, rsp);
break;
}
default: { default: {
throw new RuntimeException("Unknown action: " + action); throw new RuntimeException("Unknown action: " + action);
} }
@ -260,6 +274,156 @@ public class CollectionsHandler extends RequestHandlerBase {
rsp.setHttpCaching(false); rsp.setHttpCaching(false);
} }
private void handleBalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
req.getParams().required().check(COLLECTION_PROP);
String collectionName = req.getParams().get(COLLECTION_PROP);
if (StringUtils.isBlank(collectionName)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
}
coreContainer.getZkController().getZkStateReader().updateClusterState(true);
ClusterState clusterState = coreContainer.getZkController().getClusterState();
DocCollection dc = clusterState.getCollection(collectionName);
if (dc == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
}
Map<String, String> current = new HashMap<>();
int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
if (max <= 0) max = Integer.MAX_VALUE;
int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
NamedList<Object> results = new NamedList<>();
SolrQueryResponse rspIgnore = new SolrQueryResponse();
final String inactivePreferreds = "inactivePreferreds";
final String alreadyLeaders = "alreadyLeaders";
boolean keepGoing = true;
for (Slice slice : dc.getSlices()) {
for (Replica replica : slice.getReplicas()) {
// Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
if (replica.getBool(Overseer.preferredLeaderProp, false) == false) {
continue;
}
if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
if (inactives == null) {
inactives = new NamedList<>();
results.add(inactivePreferreds, inactives);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "skipped");
res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
res.add("nodeName", replica.getNodeName());
inactives.add(replica.getName(), res);
break; // Don't try to assign if we're not active!
} // OK, we're the one, get in the queue to become the leader.
if (replica.getBool(LEADER_PROP, false)) {
NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
if (noops == null) {
noops = new NamedList<>();
results.add(alreadyLeaders, noops);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "success");
res.add("msg", "Already leader");
res.add("nodeName", replica.getNodeName());
noops.add(replica.getName(), res);
break; // already the leader, do nothing.
}
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, slice.getName());
propMap.put(BASE_URL_PROP, replica.get(BASE_URL_PROP));
String coreName = (String) replica.get(CORE_NAME_PROP);
// Put it in the waiting list.
String asyncId = REBALANCELEADERS.toLower() + "_" + coreName;
current.put(asyncId, String.format(Locale.ROOT, "Collection: '%s', Shard: '%s', Core: '%s', BaseUrl: '%s'",
collectionName, slice.getName(), coreName, replica.get(BASE_URL_PROP)));
propMap.put(CORE_NAME_PROP, coreName);
propMap.put(ASYNC, asyncId);
ZkNodeProps m = new ZkNodeProps(propMap);
log.info("Queueing collection '" + collectionName + "' slice '" + slice.getName() + "' replica '" +
coreName + "' to become leader.");
handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
break; // Done with this slice, skip the rest of the replicas.
}
if (current.size() == max) {
log.info("Queued " + max + " leader reassgnments, waiting for some to complete.");
keepGoing = waitForLeaderChange(current, maxWaitSecs, false, results);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to wait!
}
}
}
if (keepGoing == true) {
keepGoing = waitForLeaderChange(current, maxWaitSecs, true, results);
}
if (keepGoing == true) {
log.info("All leader reassignments completed.");
} else {
log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
}
rsp.getValues().addAll(results);
}
// currentAsyncIds - map of request IDs and reporting data (value)
// maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
// waitForAll - if true, do not return until all assignments have been made.
// results - a place to stash results for reporting back to the user.
//
private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
Boolean waitForAll, NamedList<Object> results)
throws KeeperException, InterruptedException {
if (currentAsyncIds.size() == 0) return true;
for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
boolean foundChange = false;
while (iter.hasNext()) {
Map.Entry<String, String> pair = iter.next();
String asyncId = pair.getKey();
if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
NamedList<Object> fails = (NamedList<Object>) results.get("failures");
if (fails == null) {
fails = new NamedList<>();
results.add("failures", fails);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "failed");
res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
iter.remove();
foundChange = true;
} else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
NamedList<Object> successes = (NamedList<Object>) results.get("successes");
if (successes == null) {
successes = new NamedList<>();
results.add("successes", successes);
}
NamedList<Object> res = new NamedList<>();
res.add("status", "success");
res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
iter.remove();
foundChange = true;
}
}
// We're done if we're processing a few at a time or all requests are processed.
if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
return true;
}
Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
}
return false;
}
private void handleAddReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { private void handleAddReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_VALUE_PROP); req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_VALUE_PROP);
@ -425,7 +589,7 @@ public class CollectionsHandler extends RequestHandlerBase {
} }
NamedList<String> r = new NamedList<>(); NamedList<String> r = new NamedList<>();
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) || if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) || coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) || coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||

View File

@ -197,7 +197,9 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
assertNotNull("Could not find a jetty2 kill", leaderJetty); assertNotNull("Could not find a jetty2 kill", leaderJetty);
log.info("leader node {}", leaderJetty.getBaseUrl()); log.info("leader node {}", leaderJetty.getBaseUrl());
log.info ("current election Queue", OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient())); log.info ("current election Queue",
OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient(),
OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
ChaosMonkey.stop(leaderJetty); ChaosMonkey.stop(leaderJetty);
timeout = System.currentTimeMillis() + 10000; timeout = System.currentTimeMillis() + 10000;
leaderchanged = false; leaderchanged = false;

View File

@ -106,7 +106,9 @@ public class RollingRestartTest extends AbstractFullDistribZkTestBase {
if (!success) { if (!success) {
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient()); leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
if (leader == null) if (leader == null)
log.error("NOOVERSEER election queue is :" + OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient())); log.error("NOOVERSEER election queue is :" +
OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
fail("No overseer designate as leader found after restart #" + (i + 1) + ": " + leader); fail("No overseer designate as leader found after restart #" + (i + 1) + ": " + leader);
} }
} }
@ -115,7 +117,9 @@ public class RollingRestartTest extends AbstractFullDistribZkTestBase {
if (!success) { if (!success) {
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient()); leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
if (leader == null) if (leader == null)
log.error("NOOVERSEER election queue is :" + OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient())); log.error("NOOVERSEER election queue is :" +
OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
fail("No overseer leader found after restart #" + (i + 1) + ": " + leader); fail("No overseer leader found after restart #" + (i + 1) + ": " + leader);
} }

View File

@ -29,10 +29,13 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
import org.junit.Before; import org.junit.Before;
@Slow @Slow
@ -61,7 +64,7 @@ public class TestReplicaProperties extends ReplicaPropertiesBase {
// shards, replicationfactor, maxreplicaspernode // shards, replicationfactor, maxreplicaspernode
int shards = random().nextInt(7); int shards = random().nextInt(7);
if (shards < 2) shards = 2; if (shards < 2) shards = 2;
int rFactor = random().nextInt(3); int rFactor = random().nextInt(4);
if (rFactor < 2) rFactor = 2; if (rFactor < 2) rFactor = 2;
createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1"); createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
} finally { } finally {
@ -186,11 +189,56 @@ public class TestReplicaProperties extends ReplicaPropertiesBase {
verifyPropertyVal(client, COLLECTION_NAME, verifyPropertyVal(client, COLLECTION_NAME,
c1_s1_r2, "property.bogus1", "whatever"); c1_s1_r2, "property.bogus1", "whatever");
// At this point we've assigned a preferred leader. Make it happen and check that all the nodes that are
// leaders _also_ have the preferredLeader property set.
doPropertyAction(client,
"action", CollectionParams.CollectionAction.REBALANCELEADERS.toString(),
"collection", COLLECTION_NAME);
verifyLeaderAssignment(client, COLLECTION_NAME);
} finally { } finally {
client.shutdown(); client.shutdown();
} }
} }
private void verifyLeaderAssignment(CloudSolrServer client, String collectionName)
throws InterruptedException, KeeperException {
String lastFailMsg = "";
for (int idx = 0; idx < 300; ++idx) { // Keep trying while Overseer writes the ZK state for up to 30 seconds.
lastFailMsg = "";
client.getZkStateReader().updateClusterState(true);
ClusterState clusterState = client.getZkStateReader().getClusterState();
for (Slice slice : clusterState.getSlices(collectionName)) {
Boolean foundLeader = false;
Boolean foundPreferred = false;
for (Replica replica : slice.getReplicas()) {
Boolean isLeader = replica.getBool("leader", false);
Boolean isPreferred = replica.getBool("property.preferredleader", false);
if (isLeader != isPreferred) {
lastFailMsg = "Replica should NOT have preferredLeader != leader. Preferred: " + isPreferred.toString() +
" leader is " + isLeader.toString();
}
if (foundLeader && isLeader) {
lastFailMsg = "There should only be a single leader in _any_ shard! Replica " + replica.getName() +
" is the second leader in slice " + slice.getName();
}
if (foundPreferred && isPreferred) {
lastFailMsg = "There should only be a single preferredLeader in _any_ shard! Replica " + replica.getName() +
" is the second preferredLeader in slice " + slice.getName();
}
foundLeader = foundLeader ? foundLeader : isLeader;
foundPreferred = foundPreferred ? foundPreferred : isPreferred;
}
}
if (lastFailMsg.length() == 0) return;
Thread.sleep(100);
}
fail(lastFailMsg);
}
private void addProperty(CloudSolrServer client, String... paramsIn) throws IOException, SolrServerException { private void addProperty(CloudSolrServer client, String... paramsIn) throws IOException, SolrServerException {
assertTrue("paramsIn must be an even multiple of 2, it is: " + paramsIn.length, (paramsIn.length % 2) == 0); assertTrue("paramsIn must be an even multiple of 2, it is: " + paramsIn.length, (paramsIn.length % 2) == 0);
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();

View File

@ -71,7 +71,8 @@ public class ZkStateReader implements Closeable {
public static final String LEADER_PROP = "leader"; public static final String LEADER_PROP = "leader";
public static final String PROPERTY_PROP = "property"; public static final String PROPERTY_PROP = "property";
public static final String PROPERTY_VALUE_PROP = "property.value"; public static final String PROPERTY_VALUE_PROP = "property.value";
public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
public static final String COLLECTIONS_ZKNODE = "/collections"; public static final String COLLECTIONS_ZKNODE = "/collections";
public static final String LIVE_NODES_ZKNODE = "/live_nodes"; public static final String LIVE_NODES_ZKNODE = "/live_nodes";
public static final String ALIASES = "/aliases.json"; public static final String ALIASES = "/aliases.json";

View File

@ -49,7 +49,8 @@ public interface CollectionParams
CLUSTERSTATUS, CLUSTERSTATUS,
ADDREPLICAPROP, ADDREPLICAPROP,
DELETEREPLICAPROP, DELETEREPLICAPROP,
BALANCESLICEUNIQUE; BALANCESLICEUNIQUE,
REBALANCELEADERS;
public static CollectionAction get( String p ) public static CollectionAction get( String p )
{ {