From fba21341575de740ad515ee9709091883776be0a Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Thu, 14 May 2015 16:25:52 +0000 Subject: [PATCH] SOLR-7544: CollectionsHandler refactored to be more modular git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1679397 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 2 + .../handler/admin/CollectionsHandler.java | 1312 ++++++----------- .../solr/handler/admin/RebalanceLeaders.java | 327 ++++ .../apache/solr/common/params/SolrParams.java | 47 +- 4 files changed, 848 insertions(+), 840 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0493ded04d0..c0ee6ec1e0a 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -360,6 +360,8 @@ Other Changes the actual ErrorCode is used when available. (Hrishikesh Gadre via Shawn Heisey) +* SOLR-7544: CollectionsHandler refactored to be more modular (Noble Paul) + ================== 5.1.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index e5df46ab500..03e07c0db0c 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -17,45 +17,30 @@ package org.apache.solr.handler.admin; * limitations under the License. */ -import static org.apache.solr.cloud.Overseer.*; -import static org.apache.solr.cloud.OverseerCollectionProcessor.*; -import static org.apache.solr.common.cloud.DocCollection.*; -import static org.apache.solr.common.cloud.ZkStateReader.*; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; -import static org.apache.solr.common.params.CommonParams.*; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.collect.ImmutableSet; import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.SolrResponse; -import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard; import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.DistributedQueue.QueueEvent; -import org.apache.solr.cloud.LeaderElector; -import org.apache.solr.cloud.Overseer; -import org.apache.solr.cloud.OverseerCollectionProcessor; import org.apache.solr.cloud.OverseerSolrResponse; import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.cloud.rule.Rule; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.ImplicitDocRouter; -import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; @@ -63,7 +48,6 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; @@ -77,7 +61,36 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableSet; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION; +import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC; +import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF; +import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX; +import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET; +import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE; +import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES; +import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES; +import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN; +import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID; +import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP; +import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE; +import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER; +import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT; +import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS; +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CONFIGS_ZKNODE; +import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; +import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; +import static org.apache.solr.common.params.CommonParams.NAME; +import static org.apache.solr.common.params.CommonParams.VALUE_LONG; +import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR; +import static org.apache.solr.common.params.CoreAdminParams.INSTANCE_DIR; +import static org.apache.solr.common.params.ShardParams._ROUTE_; public class CollectionsHandler extends RequestHandlerBase { protected static Logger log = LoggerFactory.getLogger(CollectionsHandler.class); @@ -85,8 +98,8 @@ public class CollectionsHandler extends RequestHandlerBase { public CollectionsHandler() { super(); - // Unlike most request handlers, CoreContainer initialization - // should happen in the constructor... + // Unlike most request handlers, CoreContainer initialization + // should happen in the constructor... this.coreContainer = null; } @@ -133,528 +146,51 @@ public class CollectionsHandler extends RequestHandlerBase { // Pick the action SolrParams params = req.getParams(); - CollectionAction action = null; String a = params.get(CoreAdminParams.ACTION); if (a != null) { - action = CollectionAction.get(a); - } - if (action == null) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a); - } + CollectionAction action = CollectionAction.get(a); + if (action == null) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a); + CollectionOperation operation = CollectionOperation.get(action); + log.info("Invoked Collection Action :{} with params{} ", action.toLower(), req.getParamString()); + Map result = operation.call(req, rsp, this); + if (result != null) { + result.put(QUEUE_OPERATION, operation.action.toLower()); + ZkNodeProps props = new ZkNodeProps(result); + handleResponse(operation.action.toLower(), props, rsp, operation.timeOut); + } + } else { + throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param"); - switch (action) { - case CREATE: { - this.handleCreateAction(req, rsp); - break; - } - case DELETE: { - this.handleDeleteAction(req, rsp); - break; - } - case RELOAD: { - this.handleReloadAction(req, rsp); - break; - } - case SYNCSHARD: { - this.handleSyncShardAction(req, rsp); - break; - } - case CREATEALIAS: { - this.handleCreateAliasAction(req, rsp); - break; - } - case DELETEALIAS: { - this.handleDeleteAliasAction(req, rsp); - break; - } - case SPLITSHARD: { - this.handleSplitShardAction(req, rsp); - break; - } - case DELETESHARD: { - this.handleDeleteShardAction(req, rsp); - break; - } - case CREATESHARD: { - this.handleCreateShard(req, rsp); - break; - } - case DELETEREPLICA: { - this.handleRemoveReplica(req, rsp); - break; - } - case MIGRATE: { - this.handleMigrate(req, rsp); - break; - } - case ADDROLE: { - handleRole(ADDROLE, req, rsp); - break; - } - case REMOVEROLE: { - handleRole(REMOVEROLE, req, rsp); - break; - } - case CLUSTERPROP: { - this.handleProp(req, rsp); - break; - } - case ADDREPLICA: { - this.handleAddReplica(req, rsp); - break; - } - case REQUESTSTATUS: { - this.handleRequestStatus(req, rsp); - break; - } - case OVERSEERSTATUS: { - this.handleOverseerStatus(req, rsp); - break; - } - case LIST: { - this.handleListAction(req, rsp); - break; - } - case CLUSTERSTATUS: { - this.handleClusterStatus(req, rsp); - break; - } - case ADDREPLICAPROP: { - this.handleAddReplicaProp(req, rsp); - break; - } - case DELETEREPLICAPROP: { - this.handleDeleteReplicaProp(req, rsp); - break; - } - case BALANCESHARDUNIQUE: { - this.handleBalanceShardUnique(req, rsp); - break; - } - case REBALANCELEADERS: { - this.handleBalanceLeaders(req, rsp); - break; - } - default: { - throw new RuntimeException("Unknown action: " + action); - } } rsp.setHttpCaching(false); } - private void handleBalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - req.getParams().required().check(COLLECTION_PROP); - - String collectionName = req.getParams().get(COLLECTION_PROP); - if (StringUtils.isBlank(collectionName)) { - throw new SolrException(ErrorCode.BAD_REQUEST, - String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command.")); - } - coreContainer.getZkController().getZkStateReader().updateClusterState(true); - ClusterState clusterState = coreContainer.getZkController().getClusterState(); - DocCollection dc = clusterState.getCollection(collectionName); - if (dc == null) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken."); - } - Map currentRequests = new HashMap<>(); - int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE); - if (max <= 0) max = Integer.MAX_VALUE; - int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60); - NamedList results = new NamedList<>(); - - boolean keepGoing = true; - for (Slice slice : dc.getSlices()) { - insurePreferredIsLeader(req, results, slice, currentRequests); - if (currentRequests.size() == max) { - log.info("Queued " + max + " leader reassignments, waiting for some to complete."); - keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results); - if (keepGoing == false) { - break; // If we've waited longer than specified, don't continue to wait! - } - } - } - if (keepGoing == true) { - keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results); - } - if (keepGoing == true) { - log.info("All leader reassignments completed."); - } else { - log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned"); - } - - rsp.getValues().addAll(results); - } - - private void insurePreferredIsLeader(SolrQueryRequest req, NamedList results, - Slice slice, Map currentRequests) throws KeeperException, InterruptedException { - final String inactivePreferreds = "inactivePreferreds"; - final String alreadyLeaders = "alreadyLeaders"; - String collectionName = req.getParams().get(COLLECTION_PROP); - - for (Replica replica : slice.getReplicas()) { - // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already - if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) { - continue; - } - // OK, we are the preferred leader, are we the actual leader? - if (replica.getBool(LEADER_PROP, false)) { - //We're a preferred leader, but we're _also_ the leader, don't need to do anything. - NamedList noops = (NamedList) results.get(alreadyLeaders); - if (noops == null) { - noops = new NamedList<>(); - results.add(alreadyLeaders, noops); - } - NamedList res = new NamedList<>(); - res.add("status", "success"); - res.add("msg", "Already leader"); - res.add("shard", slice.getName()); - res.add("nodeName", replica.getNodeName()); - noops.add(replica.getName(), res); - return; // already the leader, do nothing. - } - - // We're the preferred leader, but someone else is leader. Only become leader if we're active. - if (replica.getState() != Replica.State.ACTIVE) { - NamedList inactives = (NamedList) results.get(inactivePreferreds); - if (inactives == null) { - inactives = new NamedList<>(); - results.add(inactivePreferreds, inactives); - } - NamedList res = new NamedList<>(); - res.add("status", "skipped"); - res.add("msg", "Node is a referredLeader, but it's inactive. Skipping"); - res.add("shard", slice.getName()); - res.add("nodeName", replica.getNodeName()); - inactives.add(replica.getName(), res); - return; // Don't try to become the leader if we're not active! - } - - // Replica is the preferred leader but not the actual leader, do something about that. - // "Something" is - // 1> if the preferred leader isn't first in line, tell it to re-queue itself. - // 2> tell the actual leader to re-queue itself. - - ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); - - List electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), - ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); - - if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway. - log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " + - "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing"); - return; - } - - // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole - // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK. - // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are - // watching the leader node.. - - String firstWatcher = electionNodes.get(1); - - if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) { - makeReplicaFirstWatcher(collectionName, slice, replica); - } - - String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP); - rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false); - waitForNodeChange(collectionName, slice, electionNodes.get(0)); - return; // Done with this slice, skip the rest of the replicas. - } - } - // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list - void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica) - throws KeeperException, InterruptedException { - - ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); - List electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), - ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); - - // First, queue up the preferred leader at the head of the queue. - int newSeq = -1; - for (String electionNode : electionNodes) { - if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) { - String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP); - rejoinElection(collectionName, slice, electionNode, coreName, true); - newSeq = waitForNodeChange(collectionName, slice, electionNode); - break; - } - } - if (newSeq == -1) { - return; // let's not continue if we didn't get what we expect. Possibly we're offline etc.. - } - - List electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), - ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); - - - // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue. - electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), - ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); - - for (String thisNode : electionNodes) { - if (LeaderElector.getSeq(thisNode) > newSeq) { - break; - } - if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) { - continue; - } - if (LeaderElector.getSeq(thisNode) == newSeq) { - String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP); - rejoinElection(collectionName, slice, thisNode, coreName, false); - waitForNodeChange(collectionName, slice, thisNode); - } - } - } - - int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException { - String nodeName = LeaderElector.getNodeName(electionNode); - int oldSeq = LeaderElector.getSeq(electionNode); - for (int idx = 0; idx < 600; ++idx) { - ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); - List electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), - ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); - for (String testNode : electionNodes) { - if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) { - return LeaderElector.getSeq(testNode); - } - } - - Thread.sleep(100); - } - return -1; - } - private void rejoinElection(String collectionName, Slice slice, String electionNode, String core, - boolean rejoinAtHead) throws KeeperException, InterruptedException { - Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode)); - Map propMap = new HashMap<>(); - propMap.put(COLLECTION_PROP, collectionName); - propMap.put(SHARD_ID_PROP, slice.getName()); - propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower()); - propMap.put(CORE_NAME_PROP, core); - propMap.put(NODE_NAME_PROP, replica.getName()); - propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP)); - propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line. - propMap.put(ELECTION_NODE_PROP, electionNode); - String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime()); - propMap.put(ASYNC, asyncId); - ZkNodeProps m = new ZkNodeProps(propMap); - SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response - handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here. - } - - // currentAsyncIds - map of request IDs and reporting data (value) - // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds. - // waitForAll - if true, do not return until all assignments have been made. - // results - a place to stash results for reporting back to the user. - // - private boolean waitForLeaderChange(Map currentAsyncIds, final int maxWaitSecs, - Boolean waitForAll, NamedList results) - throws KeeperException, InterruptedException { - - if (currentAsyncIds.size() == 0) return true; - - for (int idx = 0; idx < maxWaitSecs * 10; ++idx) { - Iterator> iter = currentAsyncIds.entrySet().iterator(); - boolean foundChange = false; - while (iter.hasNext()) { - Map.Entry pair = iter.next(); - String asyncId = pair.getKey(); - if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) { - coreContainer.getZkController().getOverseerFailureMap().remove(asyncId); - NamedList fails = (NamedList) results.get("failures"); - if (fails == null) { - fails = new NamedList<>(); - results.add("failures", fails); - } - NamedList res = new NamedList<>(); - res.add("status", "failed"); - res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader"); - fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res); - iter.remove(); - foundChange = true; - } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) { - coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId); - NamedList successes = (NamedList) results.get("successes"); - if (successes == null) { - successes = new NamedList<>(); - results.add("successes", successes); - } - NamedList res = new NamedList<>(); - res.add("status", "success"); - res.add("msg", "Assigned '" + pair.getValue() + "' to be leader"); - successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res); - iter.remove(); - foundChange = true; - } - } - // We're done if we're processing a few at a time or all requests are processed. - if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) { - return true; - } - Thread.sleep(100); //TODO: Is there a better thing to do than sleep here? - } - return false; - } - private void handleAddReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_VALUE_PROP); - - - Map map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower()); - copyIfNotNull(req.getParams(), map, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, - SHARD_UNIQUE, PROPERTY_VALUE_PROP); - - String property = (String) map.get(PROPERTY_PROP); - if (property.startsWith(OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) { - property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property; - } - - boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE)); - - // Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas - // in a slice on properties that are known to only be one-per-slice and error out if so. - if (StringUtils.isNotBlank((String)map.get(SHARD_UNIQUE)) && - SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) && - uniquePerSlice == false) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Overseer replica property command received for property " + property + - " with the " + SHARD_UNIQUE + - " parameter set to something other than 'true'. No action taken."); - } - handleResponse(ADDREPLICAPROP.toLower(), new ZkNodeProps(map), rsp); - } - - private void handleDeleteReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP); - - Map map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower()); - copyIfNotNull(req.getParams(), map, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP); - - handleResponse(DELETEREPLICAPROP.toLower(), new ZkNodeProps(map), rsp); - } - - - - private void handleBalanceShardUnique(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP); - Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE)); - String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT); - if (StringUtils.startsWith(prop, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) { - prop = OverseerCollectionProcessor.COLL_PROP_PREFIX + prop; - } - - if (shardUnique == false && - SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop) == false) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that" - + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " + - " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique)); - } - - Map map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower()); - copyIfNotNull(req.getParams(), map, COLLECTION_PROP, PROPERTY_PROP, ONLY_ACTIVE_NODES, SHARD_UNIQUE); - - handleResponse(BALANCESHARDUNIQUE.toLower(), new ZkNodeProps(map), rsp); - } - - private void handleOverseerStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - Map props = ZkNodeProps.makeMap( - Overseer.QUEUE_OPERATION, OVERSEERSTATUS.toLower()); - handleResponse(OVERSEERSTATUS.toLower(), new ZkNodeProps(props), rsp); - } - - private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - req.getParams().required().check(NAME); - String name = req.getParams().get(NAME); - String val = req.getParams().get(VALUE_LONG); - coreContainer.getZkController().getZkStateReader().setClusterProperty(name, val); - } - - static Set KNOWN_ROLES = ImmutableSet.of("overseer"); - - private void handleRole(CollectionAction action, SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - req.getParams().required().check("role", "node"); - Map map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, action.toLower()); - copyIfNotNull(req.getParams(), map,"role", "node"); - ZkNodeProps m = new ZkNodeProps(map); - if(!KNOWN_ROLES.contains(m.getStr("role"))) throw new SolrException(ErrorCode.BAD_REQUEST,"Unknown role. Supported roles are ,"+ KNOWN_ROLES); - handleResponse(action.toString().toLowerCase(Locale.ROOT), m, rsp); - } + static final Set KNOWN_ROLES = ImmutableSet.of("overseer"); public static long DEFAULT_ZK_TIMEOUT = 180*1000; - private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.debug("REQUESTSTATUS action invoked: " + req.getParamString()); - req.getParams().required().check(REQUESTID); - - String requestId = req.getParams().get(REQUESTID); - - if (requestId.equals("-1")) { - // Special taskId (-1), clears up the request state maps. - if(requestId.equals("-1")) { - coreContainer.getZkController().getOverseerCompletedMap().clear(); - coreContainer.getZkController().getOverseerFailureMap().clear(); - return; - } - } else { - NamedList results = new NamedList<>(); - if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) { - SimpleOrderedMap success = new SimpleOrderedMap(); - success.add("state", "completed"); - success.add("msg", "found " + requestId + " in completed tasks"); - results.add("status", success); - } else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) { - SimpleOrderedMap success = new SimpleOrderedMap(); - success.add("state", "failed"); - success.add("msg", "found " + requestId + " in failed tasks"); - results.add("status", success); - } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) { - SimpleOrderedMap success = new SimpleOrderedMap(); - success.add("state", "running"); - success.add("msg", "found " + requestId + " in running tasks"); - results.add("status", success); - } else if(overseerCollectionQueueContains(requestId)){ - SimpleOrderedMap success = new SimpleOrderedMap(); - success.add("state", "submitted"); - success.add("msg", "found " + requestId + " in submitted tasks"); - results.add("status", success); - } else { - SimpleOrderedMap failure = new SimpleOrderedMap(); - failure.add("state", "notfound"); - failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue"); - results.add("status", failure); - } - SolrResponse response = new OverseerSolrResponse(results); - - rsp.getValues().addAll(response.getResponse()); - } - } - - private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException { - DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue(); - return collectionQueue.containsTaskWithRequestId(asyncId); - } - - private void handleResponse(String operation, ZkNodeProps m, + void handleResponse(String operation, ZkNodeProps m, SolrQueryResponse rsp) throws KeeperException, InterruptedException { handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT); } - + private void handleResponse(String operation, ZkNodeProps m, SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException { long time = System.nanoTime(); if(m.containsKey(ASYNC) && m.get(ASYNC) != null) { - + String asyncId = m.getStr(ASYNC); - + if(asyncId.equals("-1")) { throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes."); } - + NamedList r = new NamedList<>(); if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) || @@ -662,16 +198,16 @@ public class CollectionsHandler extends RequestHandlerBase { coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) || overseerCollectionQueueContains(asyncId)) { r.add("error", "Task with the same requestid already exists."); - + } else { coreContainer.getZkController().getOverseerCollectionQueue() .offer(ZkStateReader.toJSON(m)); } r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC)); SolrResponse response = new OverseerSolrResponse(r); - + rsp.getValues().addAll(response.getResponse()); - + return; } @@ -702,139 +238,10 @@ public class CollectionsHandler extends RequestHandlerBase { } } } - - private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.info("Reloading Collection : " + req.getParamString()); - String name = req.getParams().required().get(NAME); - - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, - RELOAD.toLower(), NAME, name); - handleResponse(RELOAD.toLower(), m, rsp); - } - - private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException { - log.info("Syncing shard : " + req.getParamString()); - String collection = req.getParams().required().get("collection"); - String shard = req.getParams().required().get("shard"); - - ClusterState clusterState = coreContainer.getZkController().getClusterState(); - - ZkNodeProps leaderProps = clusterState.getLeader(collection, shard); - ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps); - - ; - try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) { - client.setConnectionTimeout(15000); - client.setSoTimeout(60000); - RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard(); - reqSyncShard.setCollection(collection); - reqSyncShard.setShard(shard); - reqSyncShard.setCoreName(nodeProps.getCoreName()); - client.request(reqSyncShard); - } - } - - private void handleCreateAliasAction(SolrQueryRequest req, - SolrQueryResponse rsp) throws Exception { - log.info("Create alias action : " + req.getParamString()); - String name = req.getParams().required().get(NAME); - String collections = req.getParams().required().get("collections"); - - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, - CREATEALIAS.toLower(), NAME, name, "collections", - collections); - - handleResponse(CREATEALIAS.toLower(), m, rsp); - } - - private void handleDeleteAliasAction(SolrQueryRequest req, - SolrQueryResponse rsp) throws Exception { - log.info("Delete alias action : " + req.getParamString()); - String name = req.getParams().required().get(NAME); - - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, - DELETEALIAS.toLower(), NAME, name); - - handleResponse(DELETEALIAS.toLower(), m, rsp); - } - - private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.info("Deleting Collection : " + req.getParamString()); - - String name = req.getParams().required().get(NAME); - - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, - DELETE.toLower(), NAME, name); - - handleResponse(DELETE.toLower(), m, rsp); - } - - // very simple currently, you can pass a template collection, and the new collection is created on - // every node the template collection is on - // there is a lot more to add - you should also be able to create with an explicit server list - // we might also want to think about error handling (add the request to a zk queue and involve overseer?) - // as well as specific replicas= options - private void handleCreateAction(SolrQueryRequest req, - SolrQueryResponse rsp) throws InterruptedException, KeeperException { - log.info("Creating Collection : " + req.getParamString()); - String name = req.getParams().required().get(NAME); - if (name == null) { - log.error("Collection name is required to create a new collection"); - throw new SolrException(ErrorCode.BAD_REQUEST, - "Collection name is required to create a new collection"); - } - - Map props = ZkNodeProps.makeMap( - Overseer.QUEUE_OPERATION, - CREATE.toLower(), - "fromApi","true"); - copyIfNotNull(req.getParams(),props, - NAME, - REPLICATION_FACTOR, - COLL_CONF, - NUM_SLICES, - MAX_SHARDS_PER_NODE, - CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE, - SHARDS_PROP, - ASYNC, - DocCollection.STATE_FORMAT, - AUTO_ADD_REPLICAS, - "router."); - if(props.get(DocCollection.STATE_FORMAT) == null){ - props.put(DocCollection.STATE_FORMAT,"2"); - } - addRuleMap(req.getParams(), props, "rule"); - addRuleMap(req.getParams(), props, "snitch"); - - if(SYSTEM_COLL.equals(name)){ - //We must always create asystem collection with only a single shard - props.put(NUM_SLICES,1); - props.remove(SHARDS_PROP); - createSysConfigSet(); - - } - copyPropertiesIfNotNull(req.getParams(), props); - - ZkNodeProps m = new ZkNodeProps(props); - handleResponse(CREATE.toLower(), m, rsp); - } - - private void addRuleMap(SolrParams params, Map props, String key) { - String[] rules = params.getParams(key); - if(rules!= null && rules.length >0){ - ArrayList l = new ArrayList<>(); - for (String rule : rules) l.add(Rule.parseRule(rule)); - props.put(key, l); - } - } - - private void createSysConfigSet() throws KeeperException, InterruptedException { - SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient(); - createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE, null); - createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL, null); - createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL+"/schema.xml", BlobHandler.SCHEMA.replaceAll("'","\"").getBytes(StandardCharsets.UTF_8)); - createNodeIfNotExists(zk, ZkStateReader.CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/solrconfig.xml", BlobHandler.CONF.replaceAll("'", "\"").getBytes(StandardCharsets.UTF_8)); + private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException { + DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue(); + return collectionQueue.containsTaskWithRequestId(asyncId); } public static void createNodeIfNotExists(SolrZkClient zk, String path, byte[] data) throws KeeperException, InterruptedException { @@ -848,189 +255,21 @@ public class CollectionsHandler extends RequestHandlerBase { } } - - private void handleRemoveReplica(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.info("Remove replica: " + req.getParamString()); - req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica"); - Map map = makeMap(QUEUE_OPERATION, DELETEREPLICA.toLower()); - copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC, ONLY_IF_DOWN); - ZkNodeProps m = new ZkNodeProps(map); - handleResponse(DELETEREPLICA.toLower(), m, rsp); - } - - - private void handleCreateShard(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.info("Create shard: " + req.getParamString()); - req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP); - ClusterState clusterState = coreContainer.getZkController().getClusterState(); - if (!ImplicitDocRouter.NAME.equals(((Map) clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).get(DOC_ROUTER)).get(NAME))) - throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" ); - - Map map = makeMap(QUEUE_OPERATION, CREATESHARD.toLower()); - copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, ZkStateReader.REPLICATION_FACTOR, CREATE_NODE_SET, ASYNC); - copyPropertiesIfNotNull(req.getParams(), map); - ZkNodeProps m = new ZkNodeProps(map); - handleResponse(CREATESHARD.toLower(), m, rsp); - } - - private static void copyIfNotNull(SolrParams params, Map props, String... keys) { - ArrayList prefixes = new ArrayList<>(1); - if(keys !=null){ - for (String key : keys) { - if(key.endsWith(".")) { - prefixes.add(key); - continue; - } - String v = params.get(key); - if(v != null) props.put(key,v); - } - } - if(prefixes.isEmpty()) return; - Iterator it = params.getParameterNamesIterator(); - String prefix = null; - for(;it.hasNext();){ - String name = it.next(); - for (int i = 0; i < prefixes.size(); i++) { - if(name.startsWith(prefixes.get(i))){ - String val = params.get(name); - if(val !=null) props.put(name,val); - } - } - } - - } - - private void copyPropertiesIfNotNull(SolrParams params, Map props) { + private static Map copyPropertiesWithPrefix(SolrParams params, Map props, String prefix) { Iterator iter = params.getParameterNamesIterator(); while (iter.hasNext()) { String param = iter.next(); - if (param.startsWith(OverseerCollectionProcessor.COLL_PROP_PREFIX)) { + if (param.startsWith(prefix)) { props.put(param, params.get(param)); } } + return props; } - - private void handleDeleteShardAction(SolrQueryRequest req, - SolrQueryResponse rsp) throws InterruptedException, KeeperException { - log.info("Deleting Shard : " + req.getParamString()); - String name = req.getParams().required().get(ZkStateReader.COLLECTION_PROP); - String shard = req.getParams().required().get(ZkStateReader.SHARD_ID_PROP); - - Map props = new HashMap<>(); - props.put(ZkStateReader.COLLECTION_PROP, name); - props.put(Overseer.QUEUE_OPERATION, DELETESHARD.toLower()); - props.put(ZkStateReader.SHARD_ID_PROP, shard); - - ZkNodeProps m = new ZkNodeProps(props); - handleResponse(DELETESHARD.toLower(), m, rsp); - } - - private void handleSplitShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.info("Splitting shard : " + req.getParamString()); - String name = req.getParams().required().get("collection"); - // TODO : add support for multiple shards - String shard = req.getParams().get("shard"); - String rangesStr = req.getParams().get(CoreAdminParams.RANGES); - String splitKey = req.getParams().get("split.key"); - - if (splitKey == null && shard == null) { - throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Missing required parameter: shard"); - } - if (splitKey != null && shard != null) { - throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, - "Only one of 'shard' or 'split.key' should be specified"); - } - if (splitKey != null && rangesStr != null) { - throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, - "Only one of 'ranges' or 'split.key' should be specified"); - } - - Map props = new HashMap<>(); - props.put(Overseer.QUEUE_OPERATION, SPLITSHARD.toLower()); - props.put("collection", name); - if (shard != null) { - props.put(ZkStateReader.SHARD_ID_PROP, shard); - } - if (splitKey != null) { - props.put("split.key", splitKey); - } - if (rangesStr != null) { - props.put(CoreAdminParams.RANGES, rangesStr); - } - - if (req.getParams().get(ASYNC) != null) - props.put(ASYNC, req.getParams().get(ASYNC)); - - copyPropertiesIfNotNull(req.getParams(), props); - - ZkNodeProps m = new ZkNodeProps(props); - - handleResponse(SPLITSHARD.toLower(), m, rsp, DEFAULT_ZK_TIMEOUT * 5); - } - - private void handleMigrate(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.info("Migrate action invoked: " + req.getParamString()); - req.getParams().required().check("collection", "split.key", "target.collection"); - Map props = new HashMap<>(); - props.put(Overseer.QUEUE_OPERATION, MIGRATE.toLower()); - copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC); - ZkNodeProps m = new ZkNodeProps(props); - handleResponse(MIGRATE.toLower(), m, rsp, DEFAULT_ZK_TIMEOUT * 20); - } - - private void handleAddReplica(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - log.info("Add replica action invoked: " + req.getParamString()); - Map props = new HashMap<>(); - props.put(Overseer.QUEUE_OPERATION, CollectionAction.ADDREPLICA.toString()); - copyIfNotNull(req.getParams(), props, COLLECTION_PROP, "node", SHARD_ID_PROP, ShardParams._ROUTE_, - CoreAdminParams.NAME, CoreAdminParams.INSTANCE_DIR, CoreAdminParams.DATA_DIR, ASYNC); - copyPropertiesIfNotNull(req.getParams(), props); - ZkNodeProps m = new ZkNodeProps(props); - handleResponse(CollectionAction.ADDREPLICA.toString(), m, rsp); - } - - /** - * Handle cluster status request. - * Can return status per specific collection/shard or per all collections. - * - * @param req solr request - * @param rsp solr response - */ - private void handleClusterStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - Map props = new HashMap<>(); - props.put(Overseer.QUEUE_OPERATION, CollectionAction.CLUSTERSTATUS.toLower()); - copyIfNotNull(req.getParams(), props, COLLECTION_PROP, SHARD_ID_PROP, ShardParams._ROUTE_); - handleResponse(CollectionAction.CLUSTERSTATUS.toString(), new ZkNodeProps(props), rsp); - } - - /** - * Handled list collection request. - * Do list collection request to zk host - * - * @param req solr request - * @param rsp solr response - * @throws KeeperException zk connection failed - * @throws InterruptedException connection interrupted - */ - private void handleListAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { - NamedList results = new NamedList<>(); - Set collections = coreContainer.getZkController().getZkStateReader().getClusterState().getCollections(); - List collectionList = new ArrayList<>(); - for (String collection : collections) { - collectionList.add(collection); - } - results.add("collections", collectionList); - SolrResponse response = new OverseerSolrResponse(results); - - rsp.getValues().addAll(response.getResponse()); - } - - public static ModifiableSolrParams params(String... params) { ModifiableSolrParams msp = new ModifiableSolrParams(); - for (int i=0; i call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) + throws KeeperException, InterruptedException { + Map props = req.getParams().required().getAll(null, NAME); + props.put("fromApi", "true"); + req.getParams().getAll(props, + NAME, + REPLICATION_FACTOR, + COLL_CONF, + NUM_SLICES, + MAX_SHARDS_PER_NODE, + CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE, + SHARDS_PROP, + ASYNC, + STATE_FORMAT, + AUTO_ADD_REPLICAS); + + if (props.get(STATE_FORMAT) == null) { + props.put(STATE_FORMAT, "2"); + } + addRuleMap(req.getParams(), props, "rule"); + addRuleMap(req.getParams(), props, "snitch"); + + if (SYSTEM_COLL.equals(props.get(NAME))) { + //We must always create asystem collection with only a single shard + props.put(NUM_SLICES, 1); + props.remove(SHARDS_PROP); + createSysConfigSet(h.coreContainer); + + } + copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); + return copyPropertiesWithPrefix(req.getParams(), props, "router."); + + } + + private void addRuleMap(SolrParams params, Map props, String key) { + String[] rules = params.getParams(key); + if (rules != null && rules.length > 0) { + ArrayList l = new ArrayList<>(); + for (String rule : rules) l.add(Rule.parseRule(rule)); + props.put(key, l); + } + } + + private void createSysConfigSet(CoreContainer coreContainer) throws KeeperException, InterruptedException { + SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient(); + createNodeIfNotExists(zk, CONFIGS_ZKNODE, null); + createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL, null); + createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/schema.xml", + BlobHandler.SCHEMA.replaceAll("'", "\"").getBytes(UTF_8)); + createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/solrconfig.xml", + BlobHandler.CONF.replaceAll("'", "\"").getBytes(UTF_8)); + } + }, + DELETE_OP(DELETE) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) + throws Exception { + return req.getParams().required().getAll(null, NAME); + } + }, + RELOAD_OP(RELOAD) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) + throws Exception { + return req.getParams().required().getAll(null, NAME); + + } + }, + SYNCSHARD_OP(SYNCSHARD) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) + throws Exception { + String collection = req.getParams().required().get("collection"); + String shard = req.getParams().required().get("shard"); + + ClusterState clusterState = h.coreContainer.getZkController().getClusterState(); + + ZkNodeProps leaderProps = clusterState.getLeader(collection, shard); + ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps); + + ; + try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) { + client.setConnectionTimeout(15000); + client.setSoTimeout(60000); + RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard(); + reqSyncShard.setCollection(collection); + reqSyncShard.setShard(shard); + reqSyncShard.setCoreName(nodeProps.getCoreName()); + client.request(reqSyncShard); + } + return null; + } + + }, + CREATEALIAS_OP(CREATEALIAS) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) + throws Exception { + return req.getParams().required().getAll(null, NAME, "collections"); + } + }, + DELETEALIAS_OP(DELETEALIAS) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) + throws Exception { + return req.getParams().required().getAll(null, NAME); + } + + }, + SPLITSHARD_OP(SPLITSHARD, DEFAULT_ZK_TIMEOUT * 5) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) + throws Exception { + String name = req.getParams().required().get(COLLECTION_PROP); + // TODO : add support for multiple shards + String shard = req.getParams().get(SHARD_ID_PROP); + String rangesStr = req.getParams().get(CoreAdminParams.RANGES); + String splitKey = req.getParams().get("split.key"); + + if (splitKey == null && shard == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Missing required parameter: shard"); + } + if (splitKey != null && shard != null) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Only one of 'shard' or 'split.key' should be specified"); + } + if (splitKey != null && rangesStr != null) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Only one of 'ranges' or 'split.key' should be specified"); + } + + Map map = req.getParams().getAll(null, + COLLECTION_PROP, + SHARD_ID_PROP, + "split.key", + CoreAdminParams.RANGES, + ASYNC); + return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); + } + }, + DELETESHARD_OP(DELETESHARD) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception { + return req.getParams().required().getAll(null, + COLLECTION_PROP, + SHARD_ID_PROP); + } + }, + CREATESHARD_OP(CREATESHARD) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception { + Map map = req.getParams().required().getAll(null, + COLLECTION_PROP, + SHARD_ID_PROP); + ClusterState clusterState = handler.coreContainer.getZkController().getClusterState(); + if (!ImplicitDocRouter.NAME.equals(((Map) clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).get(DOC_ROUTER)).get(NAME))) + throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections"); + req.getParams().getAll(map, + REPLICATION_FACTOR, + CREATE_NODE_SET, ASYNC); + return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); + } + }, + DELETEREPLICA_OP(DELETEREPLICA) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception { + Map map = req.getParams().required().getAll(null, + COLLECTION_PROP, + SHARD_ID_PROP, + REPLICA_PROP); + return req.getParams().getAll(map, ASYNC, ONLY_IF_DOWN); + } + }, + MIGRATE_OP(MIGRATE) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + Map map = req.getParams().required().getAll(null, COLLECTION_PROP, "split.key", "target.collection"); + return req.getParams().getAll(map, "forward.timeout", ASYNC); + } + }, + ADDROLE_OP(ADDROLE) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception { + Map map = req.getParams().required().getAll(null, "role", "node"); + if (!KNOWN_ROLES.contains(map.get("role"))) + throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown role. Supported roles are ," + KNOWN_ROLES); + return map; + } + }, + REMOVEROLE_OP(REMOVEROLE) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + Map map = req.getParams().required().getAll(null, "role", "node"); + if (!KNOWN_ROLES.contains(map.get("role"))) + throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown role. Supported roles are ," + KNOWN_ROLES); + return map; + } + }, + CLUSTERPROP_OP(CLUSTERPROP) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + String name = req.getParams().required().get(NAME); + String val = req.getParams().get(VALUE_LONG); + h.coreContainer.getZkController().getZkStateReader().setClusterProperty(name, val); + return null; + } + }, + REQUESTSTATUS_OP(REQUESTSTATUS) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + CoreContainer coreContainer = h.coreContainer; + req.getParams().required().check(REQUESTID); + + String requestId = req.getParams().get(REQUESTID); + + if (requestId.equals("-1")) { + // Special taskId (-1), clears up the request state maps. + if (requestId.equals("-1")) { + coreContainer.getZkController().getOverseerCompletedMap().clear(); + coreContainer.getZkController().getOverseerFailureMap().clear(); + return null; + } + } else { + NamedList results = new NamedList<>(); + if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) { + SimpleOrderedMap success = new SimpleOrderedMap(); + success.add("state", "completed"); + success.add("msg", "found " + requestId + " in completed tasks"); + results.add("status", success); + } else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) { + SimpleOrderedMap success = new SimpleOrderedMap(); + success.add("state", "failed"); + success.add("msg", "found " + requestId + " in failed tasks"); + results.add("status", success); + } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) { + SimpleOrderedMap success = new SimpleOrderedMap(); + success.add("state", "running"); + success.add("msg", "found " + requestId + " in running tasks"); + results.add("status", success); + } else if (h.overseerCollectionQueueContains(requestId)) { + SimpleOrderedMap success = new SimpleOrderedMap(); + success.add("state", "submitted"); + success.add("msg", "found " + requestId + " in submitted tasks"); + results.add("status", success); + } else { + SimpleOrderedMap failure = new SimpleOrderedMap(); + failure.add("state", "notfound"); + failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue"); + results.add("status", failure); + } + SolrResponse response = new OverseerSolrResponse(results); + + rsp.getValues().addAll(response.getResponse()); + } + return null; + } + + }, + ADDREPLICA_OP(ADDREPLICA) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) + throws Exception { + Map props = req.getParams().getAll(null, + COLLECTION_PROP, + "node", + SHARD_ID_PROP, + _ROUTE_, + CoreAdminParams.NAME, + INSTANCE_DIR, + DATA_DIR, + ASYNC); + return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); + } + }, + OVERSEERSTATUS_OP(OVERSEERSTATUS) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + return new LinkedHashMap<>(); + } + }, + + /** + * Handle list collection request. + * Do list collection request to zk host + */ + LIST_OP(LIST) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception { + NamedList results = new NamedList<>(); + Set collections = handler.coreContainer.getZkController().getZkStateReader().getClusterState().getCollections(); + List collectionList = new ArrayList<>(); + for (String collection : collections) { + collectionList.add(collection); + } + results.add("collections", collectionList); + SolrResponse response = new OverseerSolrResponse(results); + rsp.getValues().addAll(response.getResponse()); + return null; + } + }, + /** + * Handle cluster status request. + * Can return status per specific collection/shard or per all collections. + */ + CLUSTERSTATUS_OP(CLUSTERSTATUS) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) + throws KeeperException, InterruptedException { + return req.getParams().getAll(null, + COLLECTION_PROP, + SHARD_ID_PROP, + _ROUTE_); + } + }, + ADDREPLICAPROP_OP(ADDREPLICAPROP) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + Map map = req.getParams().required().getAll(null, + COLLECTION_PROP, + PROPERTY_PROP, + SHARD_ID_PROP, + REPLICA_PROP, + PROPERTY_VALUE_PROP); + req.getParams().getAll(map, SHARD_UNIQUE); + String property = (String) map.get(PROPERTY_PROP); + if (!property.startsWith(COLL_PROP_PREFIX)) { + property = COLL_PROP_PREFIX + property; + } + + boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE)); + + // Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas + // in a slice on properties that are known to only be one-per-slice and error out if so. + if (StringUtils.isNotBlank((String) map.get(SHARD_UNIQUE)) && + SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) && + uniquePerSlice == false) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Overseer replica property command received for property " + property + + " with the " + SHARD_UNIQUE + + " parameter set to something other than 'true'. No action taken."); + } + return map; + } + }, + DELETEREPLICAPROP_OP(DELETEREPLICAPROP) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + Map map = req.getParams().required().getAll(null, + COLLECTION_PROP, + PROPERTY_PROP, + SHARD_ID_PROP, + REPLICA_PROP); + return req.getParams().getAll(map, PROPERTY_PROP); + } + }, + BALANCESHARDUNIQUE_OP(BALANCESHARDUNIQUE) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + Map map = req.getParams().required().getAll(null, + COLLECTION_PROP, + PROPERTY_PROP); + Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE)); + String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT); + if (!StringUtils.startsWith(prop, COLL_PROP_PREFIX)) { + prop = COLL_PROP_PREFIX + prop; + } + + if (!shardUnique && + !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that" + + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " + + " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique)); + } + + return req.getParams().getAll(map, ONLY_ACTIVE_NODES, SHARD_UNIQUE); + } + }, + REBALANCELEADERS_OP(REBALANCELEADERS) { + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + new RebalanceLeaders(req,rsp,h).execute(); + return null; + } + }; + CollectionAction action; + long timeOut; + + CollectionOperation(CollectionAction action) { + this(action, DEFAULT_ZK_TIMEOUT); + } + + CollectionOperation(CollectionAction action, long timeOut) { + this.action = action; + this.timeOut = timeOut; + } + + /** + * All actions must implement this method. If a non null map is returned , the action name is added to + * the map and sent to overseer for processing. If it returns a null, the call returns immediately + */ + abstract Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception; + + public static CollectionOperation get(CollectionAction action) { + for (CollectionOperation op : values()) { + if (op.action == action) return op; + } + throw new SolrException(ErrorCode.SERVER_ERROR, "No such action" + action); + } + } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java new file mode 100644 index 00000000000..17096e1ae57 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java @@ -0,0 +1,327 @@ +package org.apache.solr.handler.admin; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.solr.cloud.LeaderElector; +import org.apache.solr.cloud.OverseerCollectionProcessor; +import org.apache.solr.cloud.overseer.SliceMutator; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.zookeeper.KeeperException; + +import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION; +import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC; +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS; + +class RebalanceLeaders { + final SolrQueryRequest req; + final SolrQueryResponse rsp; + final CollectionsHandler collectionsHandler; + final CoreContainer coreContainer; + + RebalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler collectionsHandler) { + this.req = req; + this.rsp = rsp; + this.collectionsHandler = collectionsHandler; + coreContainer = collectionsHandler.getCoreContainer(); + } + + void execute() throws KeeperException, InterruptedException { + req.getParams().required().check(COLLECTION_PROP); + + String collectionName = req.getParams().get(COLLECTION_PROP); + if (StringUtils.isBlank(collectionName)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command.")); + } + coreContainer.getZkController().getZkStateReader().updateClusterState(true); + ClusterState clusterState = coreContainer.getZkController().getClusterState(); + DocCollection dc = clusterState.getCollection(collectionName); + if (dc == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken."); + } + Map currentRequests = new HashMap<>(); + int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE); + if (max <= 0) max = Integer.MAX_VALUE; + int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60); + NamedList results = new NamedList<>(); + + boolean keepGoing = true; + for (Slice slice : dc.getSlices()) { + insurePreferredIsLeader(results, slice, currentRequests); + if (currentRequests.size() == max) { + CollectionsHandler.log.info("Queued " + max + " leader reassignments, waiting for some to complete."); + keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results); + if (keepGoing == false) { + break; // If we've waited longer than specified, don't continue to wait! + } + } + } + if (keepGoing == true) { + keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results); + } + if (keepGoing == true) { + CollectionsHandler.log.info("All leader reassignments completed."); + } else { + CollectionsHandler.log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned"); + } + + rsp.getValues().addAll(results); + } + + private void insurePreferredIsLeader(NamedList results, + Slice slice, Map currentRequests) throws KeeperException, InterruptedException { + final String inactivePreferreds = "inactivePreferreds"; + final String alreadyLeaders = "alreadyLeaders"; + String collectionName = req.getParams().get(COLLECTION_PROP); + + for (Replica replica : slice.getReplicas()) { + // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already + if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) { + continue; + } + // OK, we are the preferred leader, are we the actual leader? + if (replica.getBool(LEADER_PROP, false)) { + //We're a preferred leader, but we're _also_ the leader, don't need to do anything. + NamedList noops = (NamedList) results.get(alreadyLeaders); + if (noops == null) { + noops = new NamedList<>(); + results.add(alreadyLeaders, noops); + } + NamedList res = new NamedList<>(); + res.add("status", "success"); + res.add("msg", "Already leader"); + res.add("shard", slice.getName()); + res.add("nodeName", replica.getNodeName()); + noops.add(replica.getName(), res); + return; // already the leader, do nothing. + } + + // We're the preferred leader, but someone else is leader. Only become leader if we're active. + if (replica.getState() != Replica.State.ACTIVE) { + NamedList inactives = (NamedList) results.get(inactivePreferreds); + if (inactives == null) { + inactives = new NamedList<>(); + results.add(inactivePreferreds, inactives); + } + NamedList res = new NamedList<>(); + res.add("status", "skipped"); + res.add("msg", "Node is a referredLeader, but it's inactive. Skipping"); + res.add("shard", slice.getName()); + res.add("nodeName", replica.getNodeName()); + inactives.add(replica.getName(), res); + return; // Don't try to become the leader if we're not active! + } + + // Replica is the preferred leader but not the actual leader, do something about that. + // "Something" is + // 1> if the preferred leader isn't first in line, tell it to re-queue itself. + // 2> tell the actual leader to re-queue itself. + + ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); + + List electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), + ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); + + if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway. + CollectionsHandler.log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " + + "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing"); + return; + } + + // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole + // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK. + // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are + // watching the leader node.. + + String firstWatcher = electionNodes.get(1); + + if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) { + makeReplicaFirstWatcher(collectionName, slice, replica); + } + + String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP); + rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false); + waitForNodeChange(collectionName, slice, electionNodes.get(0)); + + + return; // Done with this slice, skip the rest of the replicas. + } + } + // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list + void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica) + throws KeeperException, InterruptedException { + + ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); + List electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), + ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); + + // First, queue up the preferred leader at the head of the queue. + int newSeq = -1; + for (String electionNode : electionNodes) { + if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) { + String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP); + rejoinElection(collectionName, slice, electionNode, coreName, true); + newSeq = waitForNodeChange(collectionName, slice, electionNode); + break; + } + } + if (newSeq == -1) { + return; // let's not continue if we didn't get what we expect. Possibly we're offline etc.. + } + + List electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), + ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); + + + // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue. + electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), + ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); + + for (String thisNode : electionNodes) { + if (LeaderElector.getSeq(thisNode) > newSeq) { + break; + } + if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) { + continue; + } + if (LeaderElector.getSeq(thisNode) == newSeq) { + String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP); + rejoinElection(collectionName, slice, thisNode, coreName, false); + waitForNodeChange(collectionName, slice, thisNode); + } + } + } + + int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException { + String nodeName = LeaderElector.getNodeName(electionNode); + int oldSeq = LeaderElector.getSeq(electionNode); + for (int idx = 0; idx < 600; ++idx) { + ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader(); + List electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(), + ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName())); + for (String testNode : electionNodes) { + if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) { + return LeaderElector.getSeq(testNode); + } + } + + Thread.sleep(100); + } + return -1; + } + private void rejoinElection(String collectionName, Slice slice, String electionNode, String core, + boolean rejoinAtHead) throws KeeperException, InterruptedException { + Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode)); + Map propMap = new HashMap<>(); + propMap.put(COLLECTION_PROP, collectionName); + propMap.put(SHARD_ID_PROP, slice.getName()); + propMap.put(QUEUE_OPERATION, REBALANCELEADERS.toLower()); + propMap.put(CORE_NAME_PROP, core); + propMap.put(NODE_NAME_PROP, replica.getName()); + propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP)); + propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line. + propMap.put(ELECTION_NODE_PROP, electionNode); + String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime()); + propMap.put(ASYNC, asyncId); + ZkNodeProps m = new ZkNodeProps(propMap); + SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response + collectionsHandler.handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here. + } + + // currentAsyncIds - map of request IDs and reporting data (value) + // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds. + // waitForAll - if true, do not return until all assignments have been made. + // results - a place to stash results for reporting back to the user. + // + private boolean waitForLeaderChange(Map currentAsyncIds, final int maxWaitSecs, + Boolean waitForAll, NamedList results) + throws KeeperException, InterruptedException { + + if (currentAsyncIds.size() == 0) return true; + + for (int idx = 0; idx < maxWaitSecs * 10; ++idx) { + Iterator> iter = currentAsyncIds.entrySet().iterator(); + boolean foundChange = false; + while (iter.hasNext()) { + Map.Entry pair = iter.next(); + String asyncId = pair.getKey(); + if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) { + coreContainer.getZkController().getOverseerFailureMap().remove(asyncId); + NamedList fails = (NamedList) results.get("failures"); + if (fails == null) { + fails = new NamedList<>(); + results.add("failures", fails); + } + NamedList res = new NamedList<>(); + res.add("status", "failed"); + res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader"); + fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res); + iter.remove(); + foundChange = true; + } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) { + coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId); + NamedList successes = (NamedList) results.get("successes"); + if (successes == null) { + successes = new NamedList<>(); + results.add("successes", successes); + } + NamedList res = new NamedList<>(); + res.add("status", "success"); + res.add("msg", "Assigned '" + pair.getValue() + "' to be leader"); + successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res); + iter.remove(); + foundChange = true; + } + } + // We're done if we're processing a few at a time or all requests are processed. + if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) { + return true; + } + Thread.sleep(100); //TODO: Is there a better thing to do than sleep here? + } + return false; + } + + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java b/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java index 6a798dede35..70b23eb798b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java @@ -25,6 +25,7 @@ import org.apache.solr.common.util.StrUtils; import java.io.Serializable; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -48,14 +49,14 @@ public abstract class SolrParams implements Serializable { String val = get(param); return val==null ? def : val; } - + /** returns a RequiredSolrParams wrapping this */ public RequiredSolrParams required() { // TODO? should we want to stash a reference? return new RequiredSolrParams(this); } - + protected String fpname(String field, String param) { return "f."+field+'.'+param; } @@ -75,7 +76,7 @@ public abstract class SolrParams implements Serializable { String val = get(fpname(field,param)); return val!=null ? val : get(param, def); } - + /** returns the String values of the field parameter, "f.field.param", or * the values for "param" if that is not set. */ @@ -95,15 +96,15 @@ public abstract class SolrParams implements Serializable { String val = get(param); return val==null ? def : StrUtils.parseBool(val); } - - /** Returns the Boolean value of the field param, + + /** Returns the Boolean value of the field param, or the value for param, or null if neither is set. */ public Boolean getFieldBool(String field, String param) { String val = getFieldParam(field, param); return val==null ? null : StrUtils.parseBool(val); } - - /** Returns the boolean value of the field param, + + /** Returns the boolean value of the field param, or the value for param, or def if neither is set. */ public boolean getFieldBool(String field, String param, boolean def) { String val = getFieldParam(field, param); @@ -165,8 +166,8 @@ public abstract class SolrParams implements Serializable { /** - * @return The int value of the field param, or the value for param - * or null if neither is set. + * @return The int value of the field param, or the value for param + * or null if neither is set. **/ public Integer getFieldInt(String field, String param) { String val = getFieldParam(field, param); @@ -177,8 +178,8 @@ public abstract class SolrParams implements Serializable { throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, ex.getMessage(), ex ); } } - - /** Returns the int value of the field param, + + /** Returns the int value of the field param, or the value for param, or def if neither is set. */ public int getFieldInt(String field, String param, int def) { String val = getFieldParam(field, param); @@ -323,7 +324,7 @@ public abstract class SolrParams implements Serializable { // always use MultiMap for easier processing further down the chain return new MultiMapSolrParams(toMultiMap(params)); } - + /** Create filtered SolrParams. */ public SolrParams toFilteredSolrParams(List names) { NamedList nl = new NamedList<>(); @@ -338,11 +339,11 @@ public abstract class SolrParams implements Serializable { } return toSolrParams(nl); } - + /** Convert this to a NamedList */ public NamedList toNamedList() { final SimpleOrderedMap result = new SimpleOrderedMap<>(); - + for(Iterator it=getParameterNamesIterator(); it.hasNext(); ) { final String name = it.next(); final String [] values = getParams(name); @@ -355,4 +356,22 @@ public abstract class SolrParams implements Serializable { } return result; } + + /**Copy all params to the given map or if the given map is null + * create a new one + */ + public Map getAll(Map sink, String... params){ + if(sink == null) sink = new LinkedHashMap<>(); + for (String param : params) { + String[] v = getParams(param); + if(v != null && v.length>0 ) { + if(v.length == 1) { + sink.put(param, v[0]); + } else { + sink.put(param,v); + } + } + } + return sink; + } }