mirror of https://github.com/apache/lucene.git
SOLR-7855: OverseerCollectionProcessor: separate general task management from collection message handling
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1694406 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7cdc63f3d4
commit
589a27eb27
|
@ -37,17 +37,13 @@ import org.apache.solr.common.cloud.ClusterState;
|
|||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
||||
|
||||
public class Assign {
|
||||
|
|
|
@ -280,7 +280,7 @@ public class LeaderElector {
|
|||
try {
|
||||
if(joinAtHead){
|
||||
log.info("Node {} trying to join election at the head", id);
|
||||
List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
|
||||
List<String> nodes = OverseerProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
|
||||
if(nodes.size() <2){
|
||||
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
|
||||
CreateMode.EPHEMERAL_SEQUENTIAL, false);
|
||||
|
|
|
@ -65,8 +65,8 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
|
||||
|
||||
/**
|
||||
|
@ -464,8 +464,8 @@ public class Overseer implements Closeable {
|
|||
ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
|
||||
this.clusterState = clusterState;
|
||||
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
|
||||
if (StringUtils.startsWith(tmp, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
|
||||
tmp = OverseerCollectionProcessor.COLL_PROP_PREFIX + tmp;
|
||||
if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
|
||||
tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp;
|
||||
}
|
||||
this.property = tmp.toLowerCase(Locale.ROOT);
|
||||
collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
|
@ -817,7 +817,8 @@ public class Overseer implements Closeable {
|
|||
|
||||
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
|
||||
|
||||
overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this);
|
||||
OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, adminPath, shardHandler.getShardHandlerFactory());
|
||||
overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
|
||||
ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id);
|
||||
ccThread.setDaemon(true);
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,80 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
|
||||
/**
|
||||
* Interface for processing messages received by an {@link OverseerProcessor}
|
||||
*/
|
||||
public interface OverseerMessageHandler {
|
||||
|
||||
/**
|
||||
* @param message the message to process
|
||||
* @param operation the operation to process
|
||||
*
|
||||
* @return response
|
||||
*/
|
||||
SolrResponse processMessage(ZkNodeProps message, String operation);
|
||||
|
||||
/**
|
||||
* @return the name of the OverseerMessageHandler
|
||||
*/
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* @param operation the operation to be timed
|
||||
*
|
||||
* @return the name of the timer to use for the operation
|
||||
*/
|
||||
String getTimerName(String operation);
|
||||
|
||||
/**
|
||||
* @param message the message being processed
|
||||
*
|
||||
* @return the taskKey for the message for handling task exclusivity
|
||||
*/
|
||||
String getTaskKey(ZkNodeProps message);
|
||||
|
||||
/**
|
||||
* @param taskKey the key associated with the task, cached from getTaskKey
|
||||
* @param message the message being processed
|
||||
*/
|
||||
void markExclusiveTask(String taskKey, ZkNodeProps message);
|
||||
|
||||
/**
|
||||
* @param taskKey the key associated with the task
|
||||
* @param operation the operation being processed
|
||||
*/
|
||||
void unmarkExclusiveTask(String taskKey, String operation);
|
||||
|
||||
/**
|
||||
* @param taskKey the key associated with the task
|
||||
* @param message the message being processed
|
||||
*
|
||||
* @return the exclusive marking
|
||||
*/
|
||||
ExclusiveMarking checkExclusiveMarking(String taskKey, ZkNodeProps message);
|
||||
|
||||
enum ExclusiveMarking {
|
||||
NOTDETERMINED, // not enough context, fall back to the processor (i.e. look at running tasks)
|
||||
EXCLUSIVE,
|
||||
NONEXCLUSIVE
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Responsible for prioritization of Overseer nodes, for example with the
|
||||
* ADDROLE collection command.
|
||||
*/
|
||||
public class OverseerNodePrioritizer {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(OverseerNodePrioritizer.class);
|
||||
|
||||
private final ZkStateReader zkStateReader;
|
||||
private final String adminPath;
|
||||
private final ShardHandlerFactory shardHandlerFactory;
|
||||
|
||||
public OverseerNodePrioritizer(ZkStateReader zkStateReader, String adminPath, ShardHandlerFactory shardHandlerFactory) {
|
||||
this.zkStateReader = zkStateReader;
|
||||
this.adminPath = adminPath;
|
||||
this.shardHandlerFactory = shardHandlerFactory;
|
||||
}
|
||||
|
||||
public synchronized void prioritizeOverseerNodes(String overseerId) throws KeeperException, InterruptedException {
|
||||
SolrZkClient zk = zkStateReader.getZkClient();
|
||||
if(!zk.exists(ZkStateReader.ROLES,true))return;
|
||||
Map m = (Map) Utils.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat(), true));
|
||||
|
||||
List overseerDesignates = (List) m.get("overseer");
|
||||
if(overseerDesignates==null || overseerDesignates.isEmpty()) return;
|
||||
String ldr = OverseerProcessor.getLeaderNode(zk);
|
||||
if(overseerDesignates.contains(ldr)) return;
|
||||
log.info("prioritizing overseer nodes at {} overseer designates are {}", overseerId, overseerDesignates);
|
||||
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
|
||||
if(electionNodes.size()<2) return;
|
||||
log.info("sorted nodes {}", electionNodes);
|
||||
|
||||
String designateNodeId = null;
|
||||
for (String electionNode : electionNodes) {
|
||||
if(overseerDesignates.contains( LeaderElector.getNodeName(electionNode))){
|
||||
designateNodeId = electionNode;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(designateNodeId == null){
|
||||
log.warn("No live overseer designate ");
|
||||
return;
|
||||
}
|
||||
if(!designateNodeId.equals( electionNodes.get(1))) { //checking if it is already at no:1
|
||||
log.info("asking node {} to come join election at head", designateNodeId);
|
||||
invokeOverseerOp(designateNodeId, "rejoinAtHead"); //ask designate to come first
|
||||
log.info("asking the old first in line {} to rejoin election ",electionNodes.get(1) );
|
||||
invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
|
||||
}
|
||||
//now ask the current leader to QUIT , so that the designate can takeover
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
|
||||
Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
|
||||
"id", OverseerProcessor.getLeaderId(zkStateReader.getZkClient()))));
|
||||
|
||||
}
|
||||
|
||||
private void invokeOverseerOp(String electionNode, String op) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
|
||||
params.set("op", op);
|
||||
params.set("qt", adminPath);
|
||||
params.set("electionNode", electionNode);
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
sreq.purpose = 1;
|
||||
String replica = zkStateReader.getBaseUrlForNodeName(LeaderElector.getNodeName(electionNode));
|
||||
sreq.shards = new String[]{replica};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
shardHandler.submit(sreq, replica, sreq.params);
|
||||
shardHandler.takeCompletedOrError();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,571 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
|
||||
import org.apache.solr.cloud.Overseer.LeaderStatus;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.stats.TimerContext;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
/**
|
||||
* A generic processor run in the Overseer, used for handling items added
|
||||
* to a distributed work queue. Has support for handling exclusive tasks
|
||||
* (i.e. tasks that should not run in parallel with each other).
|
||||
*
|
||||
* An {@link OverseerMessageHandlerSelector} determines which
|
||||
* {@link OverseerMessageHandler} handles specific messages in the
|
||||
* queue.
|
||||
*/
|
||||
public class OverseerProcessor implements Runnable, Closeable {
|
||||
|
||||
public int maxParallelThreads = 10;
|
||||
|
||||
public ExecutorService tpe ;
|
||||
|
||||
private static Logger log = LoggerFactory
|
||||
.getLogger(OverseerProcessor.class);
|
||||
|
||||
private DistributedQueue workQueue;
|
||||
private DistributedMap runningMap;
|
||||
private DistributedMap completedMap;
|
||||
private DistributedMap failureMap;
|
||||
|
||||
// Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
|
||||
final private Set runningTasks;
|
||||
|
||||
// List of completed tasks. This is used to clean up workQueue in zk.
|
||||
final private HashMap<String, QueueEvent> completedTasks;
|
||||
|
||||
private String myId;
|
||||
|
||||
private final ShardHandlerFactory shardHandlerFactory;
|
||||
|
||||
private String adminPath;
|
||||
|
||||
private ZkStateReader zkStateReader;
|
||||
|
||||
private boolean isClosed;
|
||||
|
||||
private Overseer.Stats stats;
|
||||
|
||||
// Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
|
||||
// It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
|
||||
// deleted from the work-queue as that is a batched operation.
|
||||
final private Set<String> runningZKTasks;
|
||||
private final Object waitLock = new Object();
|
||||
|
||||
private OverseerMessageHandlerSelector selector;
|
||||
|
||||
private OverseerNodePrioritizer prioritizer;
|
||||
|
||||
public OverseerProcessor(ZkStateReader zkStateReader, String myId,
|
||||
final ShardHandlerFactory shardHandlerFactory,
|
||||
String adminPath,
|
||||
Overseer.Stats stats,
|
||||
OverseerMessageHandlerSelector selector,
|
||||
OverseerNodePrioritizer prioritizer,
|
||||
DistributedQueue workQueue,
|
||||
DistributedMap runningMap,
|
||||
DistributedMap completedMap,
|
||||
DistributedMap failureMap) {
|
||||
this.zkStateReader = zkStateReader;
|
||||
this.myId = myId;
|
||||
this.shardHandlerFactory = shardHandlerFactory;
|
||||
this.adminPath = adminPath;
|
||||
this.stats = stats;
|
||||
this.selector = selector;
|
||||
this.prioritizer = prioritizer;
|
||||
this.workQueue = workQueue;
|
||||
this.runningMap = runningMap;
|
||||
this.completedMap = completedMap;
|
||||
this.failureMap = failureMap;
|
||||
this.runningZKTasks = new HashSet<>();
|
||||
this.runningTasks = new HashSet();
|
||||
this.completedTasks = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
log.info("Process current queue of overseer operations");
|
||||
LeaderStatus isLeader = amILeader();
|
||||
while (isLeader == LeaderStatus.DONT_KNOW) {
|
||||
log.debug("am_i_leader unclear {}", isLeader);
|
||||
isLeader = amILeader(); // not a no, not a yes, try ask again
|
||||
}
|
||||
|
||||
String oldestItemInWorkQueue = null;
|
||||
// hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer.
|
||||
// This variable is set in case there's any task found on the workQueue when the OCP starts up and
|
||||
// the id for the queue tail is used as a marker to check for the task in completed/failed map in zk.
|
||||
// Beyond the marker, all tasks can safely be assumed to have never been executed.
|
||||
boolean hasLeftOverItems = true;
|
||||
|
||||
try {
|
||||
oldestItemInWorkQueue = workQueue.getTailId();
|
||||
} catch (KeeperException e) {
|
||||
// We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
|
||||
// async calls.
|
||||
SolrException.log(log, "", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
if (oldestItemInWorkQueue == null)
|
||||
hasLeftOverItems = false;
|
||||
else
|
||||
log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
|
||||
|
||||
try {
|
||||
prioritizer.prioritizeOverseerNodes(myId);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to prioritize overseer ", e);
|
||||
}
|
||||
|
||||
// TODO: Make maxThreads configurable.
|
||||
|
||||
this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<Runnable>(),
|
||||
new DefaultSolrThreadFactory("OverseerThreadFactory"));
|
||||
try {
|
||||
while (!this.isClosed) {
|
||||
try {
|
||||
isLeader = amILeader();
|
||||
if (LeaderStatus.NO == isLeader) {
|
||||
break;
|
||||
} else if (LeaderStatus.YES != isLeader) {
|
||||
log.debug("am_i_leader unclear {}", isLeader);
|
||||
continue; // not a no, not a yes, try asking again
|
||||
}
|
||||
|
||||
log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
|
||||
cleanUpWorkQueue();
|
||||
|
||||
printTrackingMaps();
|
||||
|
||||
boolean waited = false;
|
||||
|
||||
while (runningTasks.size() > maxParallelThreads) {
|
||||
synchronized (waitLock) {
|
||||
waitLock.wait(100);//wait for 100 ms or till a task is complete
|
||||
}
|
||||
waited = true;
|
||||
}
|
||||
|
||||
if (waited)
|
||||
cleanUpWorkQueue();
|
||||
|
||||
List<QueueEvent> heads = workQueue.peekTopN(maxParallelThreads, runningZKTasks, 2000L);
|
||||
|
||||
if (heads == null)
|
||||
continue;
|
||||
|
||||
log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
|
||||
|
||||
if (isClosed) break;
|
||||
|
||||
for (QueueEvent head : heads) {
|
||||
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
|
||||
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
|
||||
String taskKey = messageHandler.getTaskKey(message);
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
if (hasLeftOverItems) {
|
||||
if (head.getId().equals(oldestItemInWorkQueue))
|
||||
hasLeftOverItems = false;
|
||||
if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
|
||||
log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId );
|
||||
workQueue.remove(head);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!checkExclusivity(messageHandler, message, head.getId())) {
|
||||
log.debug("Exclusivity check failed for [{}]", message.toString());
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
markTaskAsRunning(messageHandler, head, taskKey, asyncId, message);
|
||||
log.debug("Marked task [{}] as running", head.getId());
|
||||
} catch (KeeperException.NodeExistsException e) {
|
||||
// This should never happen
|
||||
log.error("Tried to pick up task [{}] when it was already running!", head.getId());
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Thread interrupted while trying to pick task for execution.", head.getId());
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
log.info(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString());
|
||||
String operation = message.getStr(Overseer.QUEUE_OPERATION);
|
||||
Runner runner = new Runner(messageHandler, message,
|
||||
operation, head);
|
||||
tpe.execute(runner);
|
||||
}
|
||||
|
||||
} catch (KeeperException e) {
|
||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||
log.warn("Overseer cannot talk to ZK");
|
||||
return;
|
||||
}
|
||||
SolrException.log(log, "", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "", e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean checkExclusivity(OverseerMessageHandler messageHandler, ZkNodeProps message, String id)
|
||||
throws KeeperException, InterruptedException {
|
||||
String taskKey = messageHandler.getTaskKey(message);
|
||||
|
||||
if(taskKey == null)
|
||||
return true;
|
||||
|
||||
OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message);
|
||||
switch (marking) {
|
||||
case NOTDETERMINED:
|
||||
break;
|
||||
case EXCLUSIVE:
|
||||
return true;
|
||||
case NONEXCLUSIVE:
|
||||
return false;
|
||||
default:
|
||||
throw new IllegalArgumentException("Undefined marking: " + marking);
|
||||
}
|
||||
|
||||
if(runningZKTasks.contains(id))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
|
||||
synchronized (completedTasks) {
|
||||
for (String id : completedTasks.keySet()) {
|
||||
workQueue.remove(completedTasks.get(id));
|
||||
runningZKTasks.remove(id);
|
||||
}
|
||||
completedTasks.clear();
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
isClosed = true;
|
||||
if(tpe != null) {
|
||||
if (!tpe.isShutdown()) {
|
||||
tpe.shutdown();
|
||||
try {
|
||||
tpe.awaitTermination(60, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Thread interrupted while waiting for OCP threadpool close.");
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
if (!tpe.isShutdown())
|
||||
tpe.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> getSortedOverseerNodeNames(SolrZkClient zk) throws KeeperException, InterruptedException {
|
||||
List<String> children = null;
|
||||
try {
|
||||
children = zk.getChildren(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE, null, true);
|
||||
} catch (Exception e) {
|
||||
log.warn("error ", e);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
LeaderElector.sortSeqs(children);
|
||||
ArrayList<String> nodeNames = new ArrayList<>(children.size());
|
||||
for (String c : children) nodeNames.add(LeaderElector.getNodeName(c));
|
||||
return nodeNames;
|
||||
}
|
||||
|
||||
public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
|
||||
List<String> children = null;
|
||||
try {
|
||||
children = zk.getChildren(path, null, true);
|
||||
LeaderElector.sortSeqs(children);
|
||||
return children;
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static String getLeaderNode(SolrZkClient zkClient) throws KeeperException, InterruptedException {
|
||||
String id = getLeaderId(zkClient);
|
||||
return id==null ?
|
||||
null:
|
||||
LeaderElector.getNodeName( id);
|
||||
}
|
||||
|
||||
public static String getLeaderId(SolrZkClient zkClient) throws KeeperException,InterruptedException{
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = zkClient.getData("/overseer_elect/leader", null, new Stat(), true);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
return null;
|
||||
}
|
||||
Map m = (Map) Utils.fromJSON(data);
|
||||
return (String) m.get("id");
|
||||
}
|
||||
|
||||
protected LeaderStatus amILeader() {
|
||||
String statsName = "collection_am_i_leader";
|
||||
TimerContext timerContext = stats.time(statsName);
|
||||
boolean success = true;
|
||||
try {
|
||||
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
|
||||
"/overseer_elect/leader", null, null, true));
|
||||
if (myId.equals(props.getStr("id"))) {
|
||||
return LeaderStatus.YES;
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
success = false;
|
||||
if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
|
||||
log.error("", e);
|
||||
return LeaderStatus.DONT_KNOW;
|
||||
} else if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||
log.info("", e);
|
||||
} else {
|
||||
log.warn("", e);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
success = false;
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
timerContext.stop();
|
||||
if (success) {
|
||||
stats.success(statsName);
|
||||
} else {
|
||||
stats.error(statsName);
|
||||
}
|
||||
}
|
||||
log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
|
||||
return LeaderStatus.NO;
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return isClosed;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void markTaskAsRunning(OverseerMessageHandler messageHandler, QueueEvent head, String taskKey,
|
||||
String asyncId, ZkNodeProps message)
|
||||
throws KeeperException, InterruptedException {
|
||||
synchronized (runningZKTasks) {
|
||||
runningZKTasks.add(head.getId());
|
||||
}
|
||||
|
||||
synchronized (runningTasks) {
|
||||
runningTasks.add(head.getId());
|
||||
}
|
||||
|
||||
messageHandler.markExclusiveTask(taskKey, message);
|
||||
|
||||
if(asyncId != null)
|
||||
runningMap.put(asyncId, null);
|
||||
}
|
||||
|
||||
protected class Runner implements Runnable {
|
||||
ZkNodeProps message;
|
||||
String operation;
|
||||
SolrResponse response;
|
||||
QueueEvent head;
|
||||
OverseerMessageHandler messageHandler;
|
||||
|
||||
public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head) {
|
||||
this.message = message;
|
||||
this.operation = operation;
|
||||
this.head = head;
|
||||
this.messageHandler = messageHandler;
|
||||
response = null;
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
String statsName = messageHandler.getTimerName(operation);
|
||||
final TimerContext timerContext = stats.time(statsName);
|
||||
|
||||
boolean success = false;
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
String taskKey = messageHandler.getTaskKey(message);
|
||||
|
||||
try {
|
||||
try {
|
||||
log.debug("Runner processing {}", head.getId());
|
||||
response = messageHandler.processMessage(message, operation);
|
||||
} finally {
|
||||
timerContext.stop();
|
||||
updateStats(statsName);
|
||||
}
|
||||
|
||||
if(asyncId != null) {
|
||||
if (response != null && (response.getResponse().get("failure") != null
|
||||
|| response.getResponse().get("exception") != null)) {
|
||||
failureMap.put(asyncId, SolrResponse.serializable(response));
|
||||
log.debug("Updated failed map for task with zkid:[{}]", head.getId());
|
||||
} else {
|
||||
completedMap.put(asyncId, SolrResponse.serializable(response));
|
||||
log.debug("Updated completed map for task with zkid:[{}]", head.getId());
|
||||
}
|
||||
} else {
|
||||
head.setBytes(SolrResponse.serializable(response));
|
||||
log.debug("Completed task:[{}]", head.getId());
|
||||
}
|
||||
|
||||
markTaskComplete(messageHandler, head.getId(), asyncId, taskKey);
|
||||
log.debug("Marked task [{}] as completed.", head.getId());
|
||||
printTrackingMaps();
|
||||
|
||||
log.info(messageHandler.getName() + ": Message id:" + head.getId() +
|
||||
" complete, response:" + response.getResponse().toString());
|
||||
success = true;
|
||||
} catch (KeeperException e) {
|
||||
SolrException.log(log, "", e);
|
||||
} catch (InterruptedException e) {
|
||||
// Reset task from tracking data structures so that it can be retried.
|
||||
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
|
||||
log.warn("Resetting task {} as the thread was interrupted.", head.getId());
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
if(!success) {
|
||||
// Reset task from tracking data structures so that it can be retried.
|
||||
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
|
||||
}
|
||||
synchronized (waitLock){
|
||||
waitLock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey)
|
||||
throws KeeperException, InterruptedException {
|
||||
synchronized (completedTasks) {
|
||||
completedTasks.put(id, head);
|
||||
}
|
||||
|
||||
synchronized (runningTasks) {
|
||||
runningTasks.remove(id);
|
||||
}
|
||||
|
||||
if(asyncId != null)
|
||||
runningMap.remove(asyncId);
|
||||
|
||||
messageHandler.unmarkExclusiveTask(taskKey, operation);
|
||||
}
|
||||
|
||||
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey) {
|
||||
log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey);
|
||||
try {
|
||||
if (asyncId != null)
|
||||
runningMap.remove(asyncId);
|
||||
|
||||
synchronized (runningTasks) {
|
||||
runningTasks.remove(id);
|
||||
}
|
||||
|
||||
messageHandler.unmarkExclusiveTask(taskKey, operation);
|
||||
} catch (KeeperException e) {
|
||||
SolrException.log(log, "", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void updateStats(String statsName) {
|
||||
if (isSuccessful()) {
|
||||
stats.success(statsName);
|
||||
} else {
|
||||
stats.error(statsName);
|
||||
stats.storeFailureDetails(statsName, message, response);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isSuccessful() {
|
||||
if(response == null)
|
||||
return false;
|
||||
return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null);
|
||||
}
|
||||
}
|
||||
|
||||
private void printTrackingMaps() {
|
||||
if(log.isDebugEnabled()) {
|
||||
synchronized (runningTasks) {
|
||||
log.debug("RunningTasks: {}", runningTasks.toString());
|
||||
}
|
||||
synchronized (completedTasks) {
|
||||
log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
|
||||
}
|
||||
synchronized (runningZKTasks) {
|
||||
log.debug("RunningZKTasks: {}", runningZKTasks.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
String getId(){
|
||||
return myId;
|
||||
}
|
||||
|
||||
/**
|
||||
* An interface to determine which {@link OverseerMessageHandler}
|
||||
* handles a given message. This could be a single OverseerMessageHandler
|
||||
* for the case where a single type of message is handled (e.g. collection
|
||||
* messages only) , or a different handler could be selected based on the
|
||||
* contents of the message.
|
||||
*/
|
||||
public interface OverseerMessageHandlerSelector {
|
||||
OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
|
||||
}
|
||||
|
||||
}
|
|
@ -24,7 +24,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionProcessor;
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
|
@ -87,10 +87,10 @@ public class ClusterStateMutator {
|
|||
|
||||
Map<String, Object> collectionProps = new HashMap<>();
|
||||
|
||||
for (Map.Entry<String, Object> e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) {
|
||||
for (Map.Entry<String, Object> e : OverseerCollectionMessageHandler.COLL_PROPS.entrySet()) {
|
||||
Object val = message.get(e.getKey());
|
||||
if (val == null) {
|
||||
val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey());
|
||||
val = OverseerCollectionMessageHandler.COLL_PROPS.get(e.getKey());
|
||||
}
|
||||
if (val != null) collectionProps.put(e.getKey(), val);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.Set;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.cloud.Assign;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.OverseerCollectionProcessor;
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
|
@ -40,7 +40,7 @@ import org.apache.solr.common.util.Utils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
|
||||
import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
@ -107,18 +107,18 @@ public class ReplicaMutator {
|
|||
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
|
||||
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
|
||||
if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
|
||||
property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
|
||||
property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
|
||||
}
|
||||
property = property.toLowerCase(Locale.ROOT);
|
||||
String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
|
||||
String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE);
|
||||
String shardUnique = message.getStr(OverseerCollectionMessageHandler.SHARD_UNIQUE);
|
||||
|
||||
boolean isUnique = false;
|
||||
|
||||
if (SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property)) {
|
||||
if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer ADDREPLICAPROP for " +
|
||||
property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" +
|
||||
property + " cannot have " + OverseerCollectionMessageHandler.SHARD_UNIQUE + " set to anything other than" +
|
||||
"'true'. No action taken");
|
||||
}
|
||||
isUnique = true;
|
||||
|
@ -170,7 +170,7 @@ public class ReplicaMutator {
|
|||
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
|
||||
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
|
||||
if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
|
||||
property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
|
||||
property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
|
||||
}
|
||||
|
||||
Replica replica = clusterState.getReplica(collectionName, replicaName);
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.solr.common.util.Utils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
|
|
@ -71,16 +71,16 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.REQUESTID;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.cloud.DocCollection.RULE;
|
||||
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.cloud.LeaderElector;
|
||||
import org.apache.solr.cloud.OverseerCollectionProcessor;
|
||||
import org.apache.solr.cloud.OverseerProcessor;
|
||||
import org.apache.solr.cloud.overseer.SliceMutator;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -160,7 +160,7 @@ class RebalanceLeaders {
|
|||
|
||||
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
|
||||
|
||||
List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
|
||||
|
||||
if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
|
||||
|
@ -193,7 +193,7 @@ class RebalanceLeaders {
|
|||
throws KeeperException, InterruptedException {
|
||||
|
||||
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
|
||||
List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
|
||||
|
||||
// First, queue up the preferred leader at the head of the queue.
|
||||
|
@ -210,12 +210,12 @@ class RebalanceLeaders {
|
|||
return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
|
||||
}
|
||||
|
||||
List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
List<String> electionNodesTmp = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
|
||||
|
||||
|
||||
// Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
|
||||
electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
|
||||
|
||||
for (String thisNode : electionNodes) {
|
||||
|
@ -238,7 +238,7 @@ class RebalanceLeaders {
|
|||
int oldSeq = LeaderElector.getSeq(electionNode);
|
||||
for (int idx = 0; idx < 600; ++idx) {
|
||||
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
|
||||
List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
|
||||
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
|
||||
for (String testNode : electionNodes) {
|
||||
if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
|
||||
|
|
|
@ -50,7 +50,7 @@ public class AsyncMigrateRouteKeyTest extends MigrateRouteKeyTest {
|
|||
String message;
|
||||
params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
|
||||
params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
|
||||
params.set(OverseerCollectionMessageHandler.REQUESTID, asyncId);
|
||||
// This task takes long enough to run. Also check for the current state of the task to be running.
|
||||
message = sendStatusRequestWithRetry(params, 5);
|
||||
assertEquals("found " + asyncId + " in running tasks", message);
|
||||
|
|
|
@ -45,7 +45,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.*;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
||||
|
@ -398,7 +398,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
}
|
||||
Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
|
||||
if (replicationFactor == null) {
|
||||
replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
|
||||
replicationFactor = (Integer) OverseerCollectionMessageHandler.COLL_PROPS.get(REPLICATION_FACTOR);
|
||||
}
|
||||
|
||||
if (confSetName != null) {
|
||||
|
|
|
@ -571,10 +571,10 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionAction.CREATE.toString());
|
||||
|
||||
params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
|
||||
params.set(OverseerCollectionMessageHandler.NUM_SLICES, numShards);
|
||||
params.set(ZkStateReader.REPLICATION_FACTOR, numReplicas);
|
||||
params.set(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
|
||||
if (createNodeSetStr != null) params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSetStr);
|
||||
if (createNodeSetStr != null) params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, createNodeSetStr);
|
||||
|
||||
int clientIndex = clients.size() > 1 ? random().nextInt(2) : 0;
|
||||
List<Integer> list = new ArrayList<>();
|
||||
|
|
|
@ -82,7 +82,7 @@ import org.apache.solr.core.SolrInfoMBean.Category;
|
|||
import org.apache.solr.servlet.SolrDispatchFilter;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
|
@ -442,7 +442,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
|
|||
String nn1 = ((SolrDispatchFilter) jettys.get(0).getDispatchFilter().getFilter()).getCores().getZkController().getNodeName();
|
||||
String nn2 = ((SolrDispatchFilter) jettys.get(1).getDispatchFilter().getFilter()).getCores().getZkController().getNodeName();
|
||||
|
||||
params.set(OverseerCollectionProcessor.CREATE_NODE_SET, nn1 + "," + nn2);
|
||||
params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, nn1 + "," + nn2);
|
||||
request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
gotExp = false;
|
||||
|
|
|
@ -47,8 +47,8 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
|
|
@ -36,8 +36,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
|
||||
|
||||
|
|
|
@ -40,8 +40,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
|
||||
|
|
|
@ -39,7 +39,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
DistributedQueue workQueue, DistributedMap runningMap,
|
||||
DistributedMap completedMap,
|
||||
DistributedMap failureMap) {
|
||||
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap);
|
||||
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -407,14 +407,14 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
ZkStateReader.REPLICATION_FACTOR, replicationFactor.toString(),
|
||||
"name", COLLECTION_NAME,
|
||||
"collection.configName", CONFIG_NAME,
|
||||
OverseerCollectionProcessor.NUM_SLICES, numberOfSlices.toString(),
|
||||
OverseerCollectionMessageHandler.NUM_SLICES, numberOfSlices.toString(),
|
||||
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode.toString()
|
||||
);
|
||||
if (sendCreateNodeList) {
|
||||
propMap.put(OverseerCollectionProcessor.CREATE_NODE_SET,
|
||||
propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET,
|
||||
(createNodeList != null)?StrUtils.join(createNodeList, ','):null);
|
||||
if (OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE_DEFAULT != createNodeSetShuffle || random().nextBoolean()) {
|
||||
propMap.put(OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE, createNodeSetShuffle);
|
||||
if (OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT != createNodeSetShuffle || random().nextBoolean()) {
|
||||
propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE, createNodeSetShuffle);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -590,7 +590,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
}
|
||||
|
||||
if (random().nextBoolean()) Collections.shuffle(createNodeList, OverseerCollectionProcessor.RANDOM);
|
||||
if (random().nextBoolean()) Collections.shuffle(createNodeList, OverseerCollectionMessageHandler.RANDOM);
|
||||
|
||||
List<SubmitCapture> submitCaptures = null;
|
||||
if (collectionExceptedToBeCreated) {
|
||||
|
|
|
@ -41,9 +41,9 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.getLeaderNode;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.getSortedOverseerNodeNames;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
|
|
@ -50,7 +50,7 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
|
||||
|
||||
public class TestCollectionAPI extends ReplicaPropertiesBase {
|
||||
|
||||
|
|
|
@ -295,7 +295,7 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
|
|||
|
||||
// create collection
|
||||
final String asyncId = (random().nextBoolean() ? null : "asyncId("+collectionName+".create)="+random().nextInt());
|
||||
createCollection(miniCluster, collectionName, OverseerCollectionProcessor.CREATE_NODE_SET_EMPTY, asyncId);
|
||||
createCollection(miniCluster, collectionName, OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY, asyncId);
|
||||
if (asyncId != null) {
|
||||
assertEquals("did not see async createCollection completion", "completed", AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 330, cloudSolrClient));
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
|
|||
params = new ModifiableSolrParams();
|
||||
|
||||
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
|
||||
params.set(OverseerCollectionProcessor.REQUESTID, "1000");
|
||||
params.set(OverseerCollectionMessageHandler.REQUESTID, "1000");
|
||||
|
||||
try {
|
||||
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
|
||||
|
@ -76,7 +76,7 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
|
|||
// Check for a random (hopefully non-existent request id
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString());
|
||||
params.set(OverseerCollectionProcessor.REQUESTID, "9999999");
|
||||
params.set(OverseerCollectionMessageHandler.REQUESTID, "9999999");
|
||||
try {
|
||||
r = sendRequest(params);
|
||||
status = (NamedList) r.get("status");
|
||||
|
@ -101,7 +101,7 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
|
|||
// Check for the request to be completed.
|
||||
params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
|
||||
params.set(OverseerCollectionProcessor.REQUESTID, "1001");
|
||||
params.set(OverseerCollectionMessageHandler.REQUESTID, "1001");
|
||||
try {
|
||||
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
|
||||
} catch (SolrServerException | IOException e) {
|
||||
|
@ -128,7 +128,7 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
|
|||
params = new ModifiableSolrParams();
|
||||
|
||||
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
|
||||
params.set(OverseerCollectionProcessor.REQUESTID, "1002");
|
||||
params.set(OverseerCollectionMessageHandler.REQUESTID, "1002");
|
||||
|
||||
try {
|
||||
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
|
||||
|
|
|
@ -66,7 +66,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
|
|
@ -81,9 +81,9 @@ import org.noggit.JSONWriter;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
|
||||
/**
|
||||
|
@ -1540,7 +1540,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
}
|
||||
Integer replicationFactor = (Integer) collectionProps.get(ZkStateReader.REPLICATION_FACTOR);
|
||||
if(replicationFactor==null){
|
||||
replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(ZkStateReader.REPLICATION_FACTOR);
|
||||
replicationFactor = (Integer) OverseerCollectionMessageHandler.COLL_PROPS.get(ZkStateReader.REPLICATION_FACTOR);
|
||||
}
|
||||
|
||||
if (confSetName != null) {
|
||||
|
|
|
@ -319,7 +319,7 @@ public class MiniSolrCloudCluster {
|
|||
params.set("replicationFactor", replicationFactor);
|
||||
params.set("collection.configName", configName);
|
||||
if (null != createNodeSet) {
|
||||
params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSet);
|
||||
params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, createNodeSet);
|
||||
}
|
||||
if (null != asyncId) {
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
|
|
Loading…
Reference in New Issue