SOLR-8744: Overseer operations performed with fine grained mutual exclusion

This commit is contained in:
Noble Paul 2016-06-02 14:47:57 +05:30
parent 34d9f0a7a3
commit 459a9c77a6
11 changed files with 542 additions and 177 deletions

View File

@ -275,6 +275,8 @@ Optimizations
* SOLR-9147: Upgrade commons-io to 2.5, avoid expensive array resizing in EmbeddedSolrServer (Mikhail Khludnev) * SOLR-9147: Upgrade commons-io to 2.5, avoid expensive array resizing in EmbeddedSolrServer (Mikhail Khludnev)
* SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum)
Other Changes Other Changes
---------------------- ----------------------
* SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy. * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.

View File

@ -0,0 +1,182 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.solr.cloud.OverseerMessageHandler.Lock;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.LockLevel;
import org.apache.solr.common.util.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This is a utility class that offers fine grained locking for various Collection Operations
* This class is designed for single threaded operation. It's safe for multiple threads to use it
* but internally it is synchronized so that only one thread can perform any operation.
*/
public class LockTree {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Node root = new Node(null, LockLevel.CLUSTER, null);
public void clear() {
synchronized (this) {
root.clear();
}
}
private class LockImpl implements Lock {
final Node node;
LockImpl( Node node) {
this.node = node;
}
@Override
public void unlock() {
synchronized (LockTree.this) {
node.unlock(this);
}
}
@Override
public String toString() {
return StrUtils.join(node.constructPath(new LinkedList<>()), '/');
}
}
public class Session {
private SessionNode root = new SessionNode(LockLevel.CLUSTER);
public Lock lock(CollectionParams.CollectionAction action, List<String> path) {
synchronized (LockTree.this) {
if (action.lockLevel == LockLevel.NONE) return FREELOCK;
if (root.isBusy(action.lockLevel, path)) return null;
Lock lockObject = LockTree.this.root.lock(action.lockLevel, path);
if (lockObject == null) root.markBusy(path, 0);
return lockObject;
}
}
}
private static class SessionNode {
final LockLevel level;
Map<String, SessionNode> kids;
boolean busy = false;
SessionNode(LockLevel level) {
this.level = level;
}
void markBusy(List<String> path, int depth) {
if (path.size() == depth) {
busy = true;
} else {
String s = path.get(depth);
if (kids == null) kids = new HashMap<>();
SessionNode node = kids.get(s);
if (node == null) kids.put(s, node = new SessionNode(level.getChild()));
node.markBusy(path, depth + 1);
}
}
boolean isBusy(LockLevel lockLevel, List<String> path) {
if (lockLevel.isHigherOrEqual(level)) {
if (busy) return true;
String s = path.get(level.level);
if (kids == null || kids.get(s) == null) return false;
return kids.get(s).isBusy(lockLevel, path);
} else {
return false;
}
}
}
public Session getSession() {
return new Session();
}
private class Node {
final String name;
final Node mom;
final LockLevel level;
HashMap<String, Node> children = new HashMap<>();
LockImpl myLock;
Node(String name, LockLevel level, Node mom) {
this.name = name;
this.level = level;
this.mom = mom;
}
//if this or any of its children are locked
boolean isLocked() {
if (myLock != null) return true;
for (Node node : children.values()) if (node.isLocked()) return true;
return false;
}
void unlock(LockImpl lockObject) {
if (myLock == lockObject) myLock = null;
else {
LOG.info("Unlocked multiple times : {}", lockObject.toString());
}
}
Lock lock(LockLevel lockLevel, List<String> path) {
if (myLock != null) return null;//I'm already locked. no need to go any further
if (lockLevel == level) {
//lock is supposed to be acquired at this level
//If I am locked or any of my children or grandchildren are locked
// it is not possible to acquire a lock
if (isLocked()) return null;
return myLock = new LockImpl(this);
} else {
String childName = path.get(level.level);
Node child = children.get(childName);
if (child == null)
children.put(childName, child = new Node(childName, LockLevel.getLevel(level.level + 1), this));
return child.lock(lockLevel, path);
}
}
LinkedList<String> constructPath(LinkedList<String> collect) {
if (name != null) collect.addFirst(name);
if (mom != null) mom.constructPath(collect);
return collect;
}
void clear() {
if (myLock != null) {
LOG.warn("lock_is_leaked at" + constructPath(new LinkedList<>()));
myLock = null;
}
for (Node node : children.values()) node.clear();
}
}
static final Lock FREELOCK = () -> {};
}

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardHandlerFactory;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX; import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
/** /**
@ -61,8 +61,6 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
super( super(
zkStateReader, zkStateReader,
myId, myId,
shardHandlerFactory,
adminPath,
stats, stats,
getOverseerMessageHandlerSelector(zkStateReader, myId, shardHandlerFactory, getOverseerMessageHandlerSelector(zkStateReader, myId, shardHandlerFactory,
adminPath, stats, overseer, overseerNodePrioritizer), adminPath, stats, overseer, overseerNodePrioritizer),
@ -85,15 +83,12 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer); zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer);
final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler( final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler(
zkStateReader); zkStateReader);
return new OverseerMessageHandlerSelector() { return message -> {
@Override String operation = message.getStr(Overseer.QUEUE_OPERATION);
public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) { if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
String operation = message.getStr(Overseer.QUEUE_OPERATION); return configMessageHandler;
if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
return configMessageHandler;
}
return collMessageHandler;
} }
return collMessageHandler;
}; };
} }
} }

View File

@ -179,7 +179,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
// Set that tracks collections that are currently being processed by a running task. // Set that tracks collections that are currently being processed by a running task.
// This is used for handling mutual exclusion of the tasks. // This is used for handling mutual exclusion of the tasks.
final private Set collectionWip;
final private LockTree lockTree = new LockTree();
static final Random RANDOM; static final Random RANDOM;
static { static {
@ -206,7 +207,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
this.stats = stats; this.stats = stats;
this.overseer = overseer; this.overseer = overseer;
this.overseerPrioritizer = overseerPrioritizer; this.overseerPrioritizer = overseerPrioritizer;
this.collectionWip = new HashSet();
} }
@Override @Override
@ -216,10 +216,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
NamedList results = new NamedList(); NamedList results = new NamedList();
try { try {
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation); CollectionParams.CollectionAction action = getCollectionAction(operation);
if (action == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
}
switch (action) { switch (action) {
case CREATE: case CREATE:
createCollection(zkStateReader.getClusterState(), message, results); createCollection(zkStateReader.getClusterState(), message, results);
@ -287,6 +284,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
case RESTORE: case RESTORE:
processRestoreAction(message, results); processRestoreAction(message, results);
break; break;
case MOCK_COLL_TASK:
case MOCK_SHARD_TASK:
case MOCK_REPLICA_TASK: {
//only for test purposes
Thread.sleep(message.getInt("sleep", 1));
break;
}
default: default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation); + operation);
@ -311,6 +315,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
return new OverseerSolrResponse(results); return new OverseerSolrResponse(results);
} }
private CollectionParams.CollectionAction getCollectionAction(String operation) {
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
if (action == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
}
return action;
}
// //
// TODO DWS: this class has gone out of control (too big); refactor to break it up // TODO DWS: this class has gone out of control (too big); refactor to break it up
// //
@ -2663,7 +2675,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
message.getStr(COLLECTION_PROP) : message.getStr(NAME); message.getStr(COLLECTION_PROP) : message.getStr(NAME);
} }
@Override
/* @Override
public void markExclusiveTask(String collectionName, ZkNodeProps message) { public void markExclusiveTask(String collectionName, ZkNodeProps message) {
if (collectionName != null) { if (collectionName != null) {
synchronized (collectionWip) { synchronized (collectionWip) {
@ -2679,8 +2692,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
collectionWip.remove(collectionName); collectionWip.remove(collectionName);
} }
} }
} }*/
/*
@Override @Override
public ExclusiveMarking checkExclusiveMarking(String collectionName, ZkNodeProps message) { public ExclusiveMarking checkExclusiveMarking(String collectionName, ZkNodeProps message) {
synchronized (collectionWip) { synchronized (collectionWip) {
@ -2689,5 +2702,29 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
} }
return ExclusiveMarking.NOTDETERMINED; return ExclusiveMarking.NOTDETERMINED;
}*/
private long sessionId = -1;
private LockTree.Session lockSession;
@Override
public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
if (lockSession == null || sessionId != taskBatch.getId()) {
//this is always called in the same thread.
//Each batch is supposed to have a new taskBatch
//So if taskBatch changes we must create a new Session
// also check if the running tasks are empty. If yes, clear lockTree
// this will ensure that locks are not 'leaked'
if(taskBatch.getRunningTasks() == 0) lockTree.clear();
lockSession = lockTree.getSession();
}
return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
Arrays.asList(
getTaskKey(message),
message.getStr(ZkStateReader.SHARD_ID_PROP),
message.getStr(ZkStateReader.REPLICA_PROP))
);
} }
} }

View File

@ -45,8 +45,6 @@ import org.noggit.JSONUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NONEXCLUSIVE;
import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NOTDETERMINED;
import static org.apache.solr.common.params.CommonParams.NAME; import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.CREATE; import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.CREATE;
@ -146,13 +144,23 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler {
return "configset_" + operation; return "configset_" + operation;
} }
@Override
public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
String configSetName = getTaskKey(message);
if (canExecute(configSetName, message)) {
markExclusiveTask(configSetName, message);
return () -> unmarkExclusiveTask(configSetName, message);
}
return null;
}
@Override @Override
public String getTaskKey(ZkNodeProps message) { public String getTaskKey(ZkNodeProps message) {
return message.getStr(NAME); return message.getStr(NAME);
} }
@Override
public void markExclusiveTask(String configSetName, ZkNodeProps message) { private void markExclusiveTask(String configSetName, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message); String baseConfigSet = getBaseConfigSetIfCreate(message);
markExclusive(configSetName, baseConfigSet); markExclusive(configSetName, baseConfigSet);
} }
@ -164,8 +172,7 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler {
} }
} }
@Override private void unmarkExclusiveTask(String configSetName, ZkNodeProps message) {
public void unmarkExclusiveTask(String configSetName, String operation, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message); String baseConfigSet = getBaseConfigSetIfCreate(message);
unmarkExclusiveConfigSet(configSetName, baseConfigSet); unmarkExclusiveConfigSet(configSetName, baseConfigSet);
} }
@ -177,28 +184,26 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler {
} }
} }
@Override
public ExclusiveMarking checkExclusiveMarking(String configSetName, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message);
return checkExclusiveMarking(configSetName, baseConfigSet);
}
private ExclusiveMarking checkExclusiveMarking(String configSetName, String baseConfigSetName) { private boolean canExecute(String configSetName, ZkNodeProps message) {
String baseConfigSetName = getBaseConfigSetIfCreate(message);
synchronized (configSetWriteWip) { synchronized (configSetWriteWip) {
// need to acquire: // need to acquire:
// 1) write lock on ConfigSet // 1) write lock on ConfigSet
// 2) read lock on Base ConfigSet // 2) read lock on Base ConfigSet
if (configSetWriteWip.contains(configSetName) || configSetReadWip.contains(configSetName)) { if (configSetWriteWip.contains(configSetName) || configSetReadWip.contains(configSetName)) {
return NONEXCLUSIVE; return false;
} }
if (baseConfigSetName != null && configSetWriteWip.contains(baseConfigSetName)) { if (baseConfigSetName != null && configSetWriteWip.contains(baseConfigSetName)) {
return NONEXCLUSIVE; return false;
} }
} }
return NOTDETERMINED; return true;
} }
private String getBaseConfigSetIfCreate(ZkNodeProps message) { private String getBaseConfigSetIfCreate(ZkNodeProps message) {
String operation = message.getStr(Overseer.QUEUE_OPERATION); String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation != null) { if (operation != null) {

View File

@ -44,6 +44,15 @@ public interface OverseerMessageHandler {
*/ */
String getTimerName(String operation); String getTimerName(String operation);
interface Lock {
void unlock();
}
/**Try to provide an exclusive lock for this particular task
* return null if locking is not possible. If locking is not necessary
*/
Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch);
/** /**
* @param message the message being processed * @param message the message being processed
* *
@ -51,30 +60,4 @@ public interface OverseerMessageHandler {
*/ */
String getTaskKey(ZkNodeProps message); String getTaskKey(ZkNodeProps message);
/**
* @param taskKey the key associated with the task, cached from getTaskKey
* @param message the message being processed
*/
void markExclusiveTask(String taskKey, ZkNodeProps message);
/**
* @param taskKey the key associated with the task
* @param operation the operation being processed
* @param message the message being processed
*/
void unmarkExclusiveTask(String taskKey, String operation, ZkNodeProps message);
/**
* @param taskKey the key associated with the task
* @param message the message being processed
*
* @return the exclusive marking
*/
ExclusiveMarking checkExclusiveMarking(String taskKey, ZkNodeProps message);
enum ExclusiveMarking {
NOTDETERMINED, // not enough context, fall back to the processor (i.e. look at running tasks)
EXCLUSIVE,
NONEXCLUSIVE
}
} }

View File

@ -81,10 +81,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
private String myId; private String myId;
private final ShardHandlerFactory shardHandlerFactory;
private String adminPath;
private ZkStateReader zkStateReader; private ZkStateReader zkStateReader;
private boolean isClosed; private boolean isClosed;
@ -102,8 +98,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
private OverseerNodePrioritizer prioritizer; private OverseerNodePrioritizer prioritizer;
public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId, public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId,
final ShardHandlerFactory shardHandlerFactory,
String adminPath,
Overseer.Stats stats, Overseer.Stats stats,
OverseerMessageHandlerSelector selector, OverseerMessageHandlerSelector selector,
OverseerNodePrioritizer prioritizer, OverseerNodePrioritizer prioritizer,
@ -113,8 +107,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
DistributedMap failureMap) { DistributedMap failureMap) {
this.zkStateReader = zkStateReader; this.zkStateReader = zkStateReader;
this.myId = myId; this.myId = myId;
this.shardHandlerFactory = shardHandlerFactory;
this.adminPath = adminPath;
this.stats = stats; this.stats = stats;
this.selector = selector; this.selector = selector;
this.prioritizer = prioritizer; this.prioritizer = prioritizer;
@ -206,10 +198,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
if (isClosed) break; if (isClosed) break;
taskBatch.batchId++;
for (QueueEvent head : heads) { for (QueueEvent head : heads) {
if (runningZKTasks.contains(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message); OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
String taskKey = messageHandler.getTaskKey(message);
final String asyncId = message.getStr(ASYNC); final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) { if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue)) if (head.getId().equals(oldestItemInWorkQueue))
@ -220,27 +213,29 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
continue; continue;
} }
} }
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (!checkExclusivity(messageHandler, message, head.getId())) { OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
if (lock == null) {
log.debug("Exclusivity check failed for [{}]", message.toString()); log.debug("Exclusivity check failed for [{}]", message.toString());
continue; continue;
} }
try { try {
markTaskAsRunning(messageHandler, head, taskKey, asyncId, message); markTaskAsRunning(head, asyncId);
log.debug("Marked task [{}] as running", head.getId()); log.debug("Marked task [{}] as running", head.getId());
} catch (KeeperException.NodeExistsException e) { } catch (KeeperException.NodeExistsException e) {
lock.unlock();
// This should never happen // This should never happen
log.error("Tried to pick up task [{}] when it was already running!", head.getId()); log.error("Tried to pick up task [{}] when it was already running!", head.getId());
continue;
} catch (InterruptedException e) { } catch (InterruptedException e) {
lock.unlock();
log.error("Thread interrupted while trying to pick task for execution.", head.getId()); log.error("Thread interrupted while trying to pick task for execution.", head.getId());
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
continue;
} }
log.info(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString()); log.info(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString());
String operation = message.getStr(Overseer.QUEUE_OPERATION);
Runner runner = new Runner(messageHandler, message, Runner runner = new Runner(messageHandler, message,
operation, head); operation, head, lock);
tpe.execute(runner); tpe.execute(runner);
} }
@ -262,31 +257,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
} }
} }
protected boolean checkExclusivity(OverseerMessageHandler messageHandler, ZkNodeProps message, String id)
throws KeeperException, InterruptedException {
String taskKey = messageHandler.getTaskKey(message);
if (taskKey == null)
return true;
OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message);
switch (marking) {
case NOTDETERMINED:
break;
case EXCLUSIVE:
return true;
case NONEXCLUSIVE:
return false;
default:
throw new IllegalArgumentException("Undefined marking: " + marking);
}
if (runningZKTasks.contains(id))
return false;
return true;
}
private void cleanUpWorkQueue() throws KeeperException, InterruptedException { private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
synchronized (completedTasks) { synchronized (completedTasks) {
for (String id : completedTasks.keySet()) { for (String id : completedTasks.keySet()) {
@ -390,8 +360,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void markTaskAsRunning(OverseerMessageHandler messageHandler, QueueEvent head, String taskKey, private void markTaskAsRunning(QueueEvent head, String asyncId)
String asyncId, ZkNodeProps message)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
synchronized (runningZKTasks) { synchronized (runningZKTasks) {
runningZKTasks.add(head.getId()); runningZKTasks.add(head.getId());
@ -401,7 +370,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
runningTasks.add(head.getId()); runningTasks.add(head.getId());
} }
messageHandler.markExclusiveTask(taskKey, message); // messageHandler.markExclusiveTask(taskKey, message);
if (asyncId != null) if (asyncId != null)
runningMap.put(asyncId, null); runningMap.put(asyncId, null);
@ -413,12 +382,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
SolrResponse response; SolrResponse response;
QueueEvent head; QueueEvent head;
OverseerMessageHandler messageHandler; OverseerMessageHandler messageHandler;
private final OverseerMessageHandler.Lock lock;
public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head) {
public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head, OverseerMessageHandler.Lock lock) {
this.message = message; this.message = message;
this.operation = operation; this.operation = operation;
this.head = head; this.head = head;
this.messageHandler = messageHandler; this.messageHandler = messageHandler;
this.lock = lock;
response = null; response = null;
} }
@ -454,7 +425,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.debug("Completed task:[{}]", head.getId()); log.debug("Completed task:[{}]", head.getId());
} }
markTaskComplete(messageHandler, head.getId(), asyncId, taskKey, message); markTaskComplete(head.getId(), asyncId);
log.debug("Marked task [{}] as completed.", head.getId()); log.debug("Marked task [{}] as completed.", head.getId());
printTrackingMaps(); printTrackingMaps();
@ -469,6 +440,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.warn("Resetting task {} as the thread was interrupted.", head.getId()); log.warn("Resetting task {} as the thread was interrupted.", head.getId());
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} finally { } finally {
lock.unlock();
if (!success) { if (!success) {
// Reset task from tracking data structures so that it can be retried. // Reset task from tracking data structures so that it can be retried.
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message); resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
@ -479,7 +451,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
} }
} }
private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) private void markTaskComplete(String id, String asyncId)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
synchronized (completedTasks) { synchronized (completedTasks) {
completedTasks.put(id, head); completedTasks.put(id, head);
@ -494,9 +466,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.warn("Could not find and remove async call [" + asyncId + "] from the running map."); log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
} }
} }
messageHandler.unmarkExclusiveTask(taskKey, operation, message);
} }
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) { private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
@ -512,7 +481,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
runningTasks.remove(id); runningTasks.remove(id);
} }
messageHandler.unmarkExclusiveTask(taskKey, operation, message);
} catch (KeeperException e) { } catch (KeeperException e) {
SolrException.log(log, "", e); SolrException.log(log, "", e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -568,4 +536,20 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message); OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
} }
final private TaskBatch taskBatch = new TaskBatch();
public class TaskBatch {
private long batchId = 0;
public long getId() {
return batchId;
}
public int getRunningTasks() {
synchronized (runningTasks) {
return runningTasks.size();
}
}
}
} }

View File

@ -30,8 +30,10 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -99,27 +101,32 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
} }
} }
private void testTaskExclusivity() throws IOException, SolrServerException { private void testTaskExclusivity() throws Exception, SolrServerException {
DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
"/overseer/collection-queue-work", new Overseer.Stats());
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
Create createCollectionRequest = new Create() Create createCollectionRequest = new Create()
.setCollectionName("ocptest_shardsplit") .setCollectionName("ocptest_shardsplit")
.setNumShards(4) .setNumShards(4)
.setConfigName("conf1") .setConfigName("conf1")
.setAsyncId("1000"); .setAsyncId("1000");
createCollectionRequest.process(client); createCollectionRequest.process(client);
SplitShard splitShardRequest = new SplitShard() distributedQueue.offer(Utils.toJSON(Utils.makeMap(
.setCollectionName("ocptest_shardsplit") "collection", "ocptest_shardsplit",
.setShardName(SHARD1) Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
.setAsyncId("1001"); CommonAdminParams.ASYNC, "1001",
splitShardRequest.process(client); "sleep", "100"
)));
splitShardRequest = new SplitShard() distributedQueue.offer(Utils.toJSON(Utils.makeMap(
.setCollectionName("ocptest_shardsplit") "collection", "ocptest_shardsplit",
.setShardName(SHARD2) Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
.setAsyncId("1002"); CommonAdminParams.ASYNC, "1002",
splitShardRequest.process(client); "sleep", "100"
)));
int iterations = 0; int iterations = 0;
while(true) { while(true) {
int runningTasks = 0; int runningTasks = 0;

View File

@ -0,0 +1,130 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerMessageHandler.Lock;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TestLockTree extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void testLocks() throws Exception {
LockTree lockTree = new LockTree();
Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE,
Arrays.asList("coll1"));
assertNotNull(coll1Lock);
assertNull("Should not be able to lock coll1/shard1", lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE,
Arrays.asList("coll1", "shard1")));
assertNull(lockTree.getSession().lock(ADDREPLICAPROP,
Arrays.asList("coll1", "shard1", "core_node2")));
coll1Lock.unlock();
Lock shard1Lock = lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE,
Arrays.asList("coll1", "shard1"));
assertNotNull(shard1Lock);
shard1Lock.unlock();
Lock replica1Lock = lockTree.getSession().lock(ADDREPLICAPROP,
Arrays.asList("coll1", "shard1", "core_node2"));
assertNotNull(replica1Lock);
List<Pair<CollectionAction, List<String>>> operations = new ArrayList<>();
operations.add(new Pair<>(ADDREPLICAPROP, Arrays.asList("coll1", "shard1", "core_node2")));
operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll1")));
operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll1", "shard1")));
operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll2", "shard2")));
operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll2")));
operations.add(new Pair<>(DELETEREPLICA, Arrays.asList("coll2", "shard1")));
List<Set<String>> orderOfExecution = Arrays.asList(
ImmutableSet.of("coll1/shard1/core_node2", "coll2/shard2"),
ImmutableSet.of("coll1", "coll2"),
ImmutableSet.of("coll1/shard1", "coll2/shard1"));
lockTree = new LockTree();
for (int counter = 0; counter < orderOfExecution.size(); counter++) {
LockTree.Session session = lockTree.getSession();
List<Pair<CollectionAction, List<String>>> completedOps = new CopyOnWriteArrayList<>();
List<Lock> locks = new CopyOnWriteArrayList<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
Pair<CollectionAction, List<String>> operation = operations.get(i);
final Lock lock = session.lock(operation.first(), operation.second());
if (lock != null) {
Thread thread = new Thread(getRunnable(completedOps, operation, locks, lock));
threads.add(thread);
thread.start();
}
}
for (Thread thread : threads) thread.join();
if (locks.isEmpty())
throw new RuntimeException("Could not attain lock for anything " + operations);
Set<String> expectedOps = orderOfExecution.get(counter);
log.info("counter : {} , expected : {}, actual : {}", counter, expectedOps, locks);
assertEquals(expectedOps.size(), locks.size());
for (Lock lock : locks)
assertTrue("locks : " + locks + " expectedOps : " + expectedOps, expectedOps.contains(lock.toString()));
locks.clear();
for (Pair<CollectionAction, List<String>> completedOp : completedOps) {
operations.remove(completedOp);
}
}
}
private Runnable getRunnable(List<Pair<CollectionAction, List<String>>> completedOps, Pair<CollectionAction,
List<String>> operation, List<Lock> locks, Lock lock) {
return () -> {
try {
Thread.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
completedOps.add(operation);
locks.add(lock);
}
};
}
}

View File

@ -18,65 +18,104 @@ package org.apache.solr.common.params;
import java.util.Locale; import java.util.Locale;
public interface CollectionParams public interface CollectionParams {
{ /**
/** What action **/ * What action
public final static String ACTION = "action"; **/
public final static String NAME = "name"; String ACTION = "action";
String NAME = "name";
public enum CollectionAction { enum LockLevel {
CREATE(true), CLUSTER(0),
DELETE(true), COLLECTION(1),
RELOAD(true), SHARD(2),
SYNCSHARD(true), REPLICA(3),
CREATEALIAS(true), NONE(10);
DELETEALIAS(true),
SPLITSHARD(true),
DELETESHARD(true),
CREATESHARD(true),
DELETEREPLICA(true),
FORCELEADER(true),
MIGRATE(true),
ADDROLE(true),
REMOVEROLE(true),
CLUSTERPROP(true),
REQUESTSTATUS(false),
DELETESTATUS(false),
ADDREPLICA(true),
OVERSEERSTATUS(false),
LIST(false),
CLUSTERSTATUS(false),
ADDREPLICAPROP(true),
DELETEREPLICAPROP(true),
BALANCESHARDUNIQUE(true),
REBALANCELEADERS(true),
MODIFYCOLLECTION(true),
MIGRATESTATEFORMAT(true),
BACKUP(true),
RESTORE(true);
public final boolean isWrite;
CollectionAction(boolean isWrite) { public final int level;
this.isWrite = isWrite;
LockLevel(int i) {
this.level = i;
} }
public static CollectionAction get(String p) { public LockLevel getChild() {
if( p != null ) { return getLevel(level + 1);
try { }
return CollectionAction.valueOf( p.toUpperCase(Locale.ROOT) );
} public static LockLevel getLevel(int i) {
catch( Exception ex ) {} for (LockLevel v : values()) {
if (v.level == i) return v;
} }
return null; return null;
} }
public boolean isEqual(String s){
if(s == null) return false; public boolean isHigherOrEqual(LockLevel that) {
return that.level <= level;
}
}
enum CollectionAction {
CREATE(true, LockLevel.COLLECTION),
DELETE(true, LockLevel.COLLECTION),
RELOAD(true, LockLevel.COLLECTION),
SYNCSHARD(true, LockLevel.SHARD),
CREATEALIAS(true, LockLevel.COLLECTION),
DELETEALIAS(true, LockLevel.COLLECTION),
SPLITSHARD(true, LockLevel.SHARD),
DELETESHARD(true, LockLevel.SHARD),
CREATESHARD(true, LockLevel.COLLECTION),
DELETEREPLICA(true, LockLevel.SHARD),
FORCELEADER(true, LockLevel.SHARD),
MIGRATE(true, LockLevel.SHARD),
ADDROLE(true, LockLevel.NONE),
REMOVEROLE(true, LockLevel.NONE),
CLUSTERPROP(true, LockLevel.NONE),
REQUESTSTATUS(false, LockLevel.NONE),
DELETESTATUS(false, LockLevel.NONE),
ADDREPLICA(true, LockLevel.SHARD),
OVERSEERSTATUS(false, LockLevel.NONE),
LIST(false, LockLevel.NONE),
CLUSTERSTATUS(false, LockLevel.NONE),
ADDREPLICAPROP(true, LockLevel.REPLICA),
DELETEREPLICAPROP(true, LockLevel.REPLICA),
BALANCESHARDUNIQUE(true, LockLevel.SHARD),
REBALANCELEADERS(true, LockLevel.COLLECTION),
MODIFYCOLLECTION(true, LockLevel.COLLECTION),
MIGRATESTATEFORMAT(true, LockLevel.CLUSTER),
BACKUP(true, LockLevel.COLLECTION),
RESTORE(true, LockLevel.COLLECTION),
//only for testing. it just waits for specified time
// these are not exposed via collection API commands
// but the overseer is aware of these tasks
MOCK_COLL_TASK(false, LockLevel.COLLECTION),
MOCK_SHARD_TASK(false, LockLevel.SHARD),
MOCK_REPLICA_TASK(false, LockLevel.REPLICA)
;
public final boolean isWrite;
public final LockLevel lockLevel;
CollectionAction(boolean isWrite, LockLevel level) {
this.isWrite = isWrite;
this.lockLevel = level;
}
public static CollectionAction get(String p) {
if (p != null) {
try {
return CollectionAction.valueOf(p.toUpperCase(Locale.ROOT));
} catch (Exception ex) {
}
}
return null;
}
public boolean isEqual(String s) {
if (s == null) return false;
return toString().equals(s.toUpperCase(Locale.ROOT)); return toString().equals(s.toUpperCase(Locale.ROOT));
} }
public String toLower(){
public String toLower() {
return toString().toLowerCase(Locale.ROOT); return toString().toLowerCase(Locale.ROOT);
} }

View File

@ -149,10 +149,11 @@ public class StrUtils {
* @see #escapeTextWithSeparator * @see #escapeTextWithSeparator
*/ */
public static String join(Collection<?> items, char separator) { public static String join(Collection<?> items, char separator) {
if (items == null) return "";
StringBuilder sb = new StringBuilder(items.size() << 3); StringBuilder sb = new StringBuilder(items.size() << 3);
boolean first=true; boolean first=true;
for (Object o : items) { for (Object o : items) {
String item = o.toString(); String item = String.valueOf(o);
if (first) { if (first) {
first = false; first = false;
} else { } else {