diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 2d27e07616c..e7a838238d8 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -244,6 +244,8 @@ Optimizations * SOLR-9147: Upgrade commons-io to 2.5, avoid expensive array resizing in EmbeddedSolrServer (Mikhail Khludnev) +* SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum) + Other Changes ---------------------- * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy. diff --git a/solr/core/src/java/org/apache/solr/cloud/LockTree.java b/solr/core/src/java/org/apache/solr/cloud/LockTree.java new file mode 100644 index 00000000000..d629d1cdd87 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/LockTree.java @@ -0,0 +1,182 @@ +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.solr.cloud.OverseerMessageHandler.Lock; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CollectionParams.LockLevel; +import org.apache.solr.common.util.StrUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This is a utility class that offers fine grained locking for various Collection Operations + * This class is designed for single threaded operation. It's safe for multiple threads to use it + * but internally it is synchronized so that only one thread can perform any operation. + */ +public class LockTree { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final Node root = new Node(null, LockLevel.CLUSTER, null); + + public void clear() { + synchronized (this) { + root.clear(); + } + } + + private class LockImpl implements Lock { + final Node node; + + LockImpl( Node node) { + this.node = node; + } + + @Override + public void unlock() { + synchronized (LockTree.this) { + node.unlock(this); + } + } + + @Override + public String toString() { + return StrUtils.join(node.constructPath(new LinkedList<>()), '/'); + } + } + + + public class Session { + private SessionNode root = new SessionNode(LockLevel.CLUSTER); + + public Lock lock(CollectionParams.CollectionAction action, List path) { + synchronized (LockTree.this) { + if (action.lockLevel == LockLevel.NONE) return FREELOCK; + if (root.isBusy(action.lockLevel, path)) return null; + Lock lockObject = LockTree.this.root.lock(action.lockLevel, path); + if (lockObject == null) root.markBusy(path, 0); + return lockObject; + } + } + } + + private static class SessionNode { + final LockLevel level; + Map kids; + boolean busy = false; + + SessionNode(LockLevel level) { + this.level = level; + } + + void markBusy(List path, int depth) { + if (path.size() == depth) { + busy = true; + } else { + String s = path.get(depth); + if (kids == null) kids = new HashMap<>(); + SessionNode node = kids.get(s); + if (node == null) kids.put(s, node = new SessionNode(level.getChild())); + node.markBusy(path, depth + 1); + } + } + + boolean isBusy(LockLevel lockLevel, List path) { + if (lockLevel.isHigherOrEqual(level)) { + if (busy) return true; + String s = path.get(level.level); + if (kids == null || kids.get(s) == null) return false; + return kids.get(s).isBusy(lockLevel, path); + } else { + return false; + } + } + } + + public Session getSession() { + return new Session(); + } + + private class Node { + final String name; + final Node mom; + final LockLevel level; + HashMap children = new HashMap<>(); + LockImpl myLock; + + Node(String name, LockLevel level, Node mom) { + this.name = name; + this.level = level; + this.mom = mom; + } + + //if this or any of its children are locked + boolean isLocked() { + if (myLock != null) return true; + for (Node node : children.values()) if (node.isLocked()) return true; + return false; + } + + + void unlock(LockImpl lockObject) { + if (myLock == lockObject) myLock = null; + else { + LOG.info("Unlocked multiple times : {}", lockObject.toString()); + } + } + + + Lock lock(LockLevel lockLevel, List path) { + if (myLock != null) return null;//I'm already locked. no need to go any further + if (lockLevel == level) { + //lock is supposed to be acquired at this level + //If I am locked or any of my children or grandchildren are locked + // it is not possible to acquire a lock + if (isLocked()) return null; + return myLock = new LockImpl(this); + } else { + String childName = path.get(level.level); + Node child = children.get(childName); + if (child == null) + children.put(childName, child = new Node(childName, LockLevel.getLevel(level.level + 1), this)); + return child.lock(lockLevel, path); + } + } + + LinkedList constructPath(LinkedList collect) { + if (name != null) collect.addFirst(name); + if (mom != null) mom.constructPath(collect); + return collect; + } + + void clear() { + if (myLock != null) { + LOG.warn("lock_is_leaked at" + constructPath(new LinkedList<>())); + myLock = null; + } + for (Node node : children.values()) node.clear(); + } + } + static final Lock FREELOCK = () -> {}; + +} diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java index f1d0ab27359..f8f84462df0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java @@ -16,10 +16,10 @@ */ package org.apache.solr.cloud; -import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandlerFactory; + import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX; /** @@ -61,8 +61,6 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor super( zkStateReader, myId, - shardHandlerFactory, - adminPath, stats, getOverseerMessageHandlerSelector(zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer), @@ -85,15 +83,12 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer); final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler( zkStateReader); - return new OverseerMessageHandlerSelector() { - @Override - public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) { - String operation = message.getStr(Overseer.QUEUE_OPERATION); - if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) { - return configMessageHandler; - } - return collMessageHandler; + return message -> { + String operation = message.getStr(Overseer.QUEUE_OPERATION); + if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) { + return configMessageHandler; } + return collMessageHandler; }; } } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index ed23e7754e1..54c0697bd62 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -179,7 +179,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler // Set that tracks collections that are currently being processed by a running task. // This is used for handling mutual exclusion of the tasks. - final private Set collectionWip; + + final private LockTree lockTree = new LockTree(); static final Random RANDOM; static { @@ -206,7 +207,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler this.stats = stats; this.overseer = overseer; this.overseerPrioritizer = overseerPrioritizer; - this.collectionWip = new HashSet(); } @Override @@ -216,10 +216,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler NamedList results = new NamedList(); try { - CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation); - if (action == null) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation); - } + CollectionParams.CollectionAction action = getCollectionAction(operation); switch (action) { case CREATE: createCollection(zkStateReader.getClusterState(), message, results); @@ -287,6 +284,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler case RESTORE: processRestoreAction(message, results); break; + case MOCK_COLL_TASK: + case MOCK_SHARD_TASK: + case MOCK_REPLICA_TASK: { + //only for test purposes + Thread.sleep(message.getInt("sleep", 1)); + break; + } default: throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation); @@ -311,6 +315,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler return new OverseerSolrResponse(results); } + private CollectionParams.CollectionAction getCollectionAction(String operation) { + CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation); + if (action == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation); + } + return action; + } + // // TODO DWS: this class has gone out of control (too big); refactor to break it up // @@ -2663,7 +2675,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler message.getStr(COLLECTION_PROP) : message.getStr(NAME); } - @Override + + /* @Override public void markExclusiveTask(String collectionName, ZkNodeProps message) { if (collectionName != null) { synchronized (collectionWip) { @@ -2679,8 +2692,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler collectionWip.remove(collectionName); } } - } - + }*/ +/* @Override public ExclusiveMarking checkExclusiveMarking(String collectionName, ZkNodeProps message) { synchronized (collectionWip) { @@ -2689,5 +2702,29 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } return ExclusiveMarking.NOTDETERMINED; + }*/ + + private long sessionId = -1; + private LockTree.Session lockSession; + + @Override + public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) { + if (lockSession == null || sessionId != taskBatch.getId()) { + //this is always called in the same thread. + //Each batch is supposed to have a new taskBatch + //So if taskBatch changes we must create a new Session + // also check if the running tasks are empty. If yes, clear lockTree + // this will ensure that locks are not 'leaked' + if(taskBatch.getRunningTasks() == 0) lockTree.clear(); + lockSession = lockTree.getSession(); + } + return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)), + Arrays.asList( + getTaskKey(message), + message.getStr(ZkStateReader.SHARD_ID_PROP), + message.getStr(ZkStateReader.REPLICA_PROP)) + + ); } + } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java index ba8f129811a..2f2859f3417 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java @@ -45,8 +45,6 @@ import org.noggit.JSONUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NONEXCLUSIVE; -import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NOTDETERMINED; import static org.apache.solr.common.params.CommonParams.NAME; import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.CREATE; @@ -146,13 +144,23 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler { return "configset_" + operation; } + @Override + public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) { + String configSetName = getTaskKey(message); + if (canExecute(configSetName, message)) { + markExclusiveTask(configSetName, message); + return () -> unmarkExclusiveTask(configSetName, message); + } + return null; + } + @Override public String getTaskKey(ZkNodeProps message) { return message.getStr(NAME); } - @Override - public void markExclusiveTask(String configSetName, ZkNodeProps message) { + + private void markExclusiveTask(String configSetName, ZkNodeProps message) { String baseConfigSet = getBaseConfigSetIfCreate(message); markExclusive(configSetName, baseConfigSet); } @@ -164,8 +172,7 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler { } } - @Override - public void unmarkExclusiveTask(String configSetName, String operation, ZkNodeProps message) { + private void unmarkExclusiveTask(String configSetName, ZkNodeProps message) { String baseConfigSet = getBaseConfigSetIfCreate(message); unmarkExclusiveConfigSet(configSetName, baseConfigSet); } @@ -177,28 +184,26 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler { } } - @Override - public ExclusiveMarking checkExclusiveMarking(String configSetName, ZkNodeProps message) { - String baseConfigSet = getBaseConfigSetIfCreate(message); - return checkExclusiveMarking(configSetName, baseConfigSet); - } - private ExclusiveMarking checkExclusiveMarking(String configSetName, String baseConfigSetName) { + private boolean canExecute(String configSetName, ZkNodeProps message) { + String baseConfigSetName = getBaseConfigSetIfCreate(message); + synchronized (configSetWriteWip) { // need to acquire: // 1) write lock on ConfigSet // 2) read lock on Base ConfigSet if (configSetWriteWip.contains(configSetName) || configSetReadWip.contains(configSetName)) { - return NONEXCLUSIVE; + return false; } if (baseConfigSetName != null && configSetWriteWip.contains(baseConfigSetName)) { - return NONEXCLUSIVE; + return false; } } - return NOTDETERMINED; + return true; } + private String getBaseConfigSetIfCreate(ZkNodeProps message) { String operation = message.getStr(Overseer.QUEUE_OPERATION); if (operation != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java index 2d2408f7218..c4027ccd5fd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java @@ -44,6 +44,15 @@ public interface OverseerMessageHandler { */ String getTimerName(String operation); + interface Lock { + void unlock(); + } + + /**Try to provide an exclusive lock for this particular task + * return null if locking is not possible. If locking is not necessary + */ + Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch); + /** * @param message the message being processed * @@ -51,30 +60,4 @@ public interface OverseerMessageHandler { */ String getTaskKey(ZkNodeProps message); - /** - * @param taskKey the key associated with the task, cached from getTaskKey - * @param message the message being processed - */ - void markExclusiveTask(String taskKey, ZkNodeProps message); - - /** - * @param taskKey the key associated with the task - * @param operation the operation being processed - * @param message the message being processed - */ - void unmarkExclusiveTask(String taskKey, String operation, ZkNodeProps message); - - /** - * @param taskKey the key associated with the task - * @param message the message being processed - * - * @return the exclusive marking - */ - ExclusiveMarking checkExclusiveMarking(String taskKey, ZkNodeProps message); - - enum ExclusiveMarking { - NOTDETERMINED, // not enough context, fall back to the processor (i.e. look at running tasks) - EXCLUSIVE, - NONEXCLUSIVE - } } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java index 26a90cb8059..93a7e6fee1b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java @@ -81,10 +81,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable { private String myId; - private final ShardHandlerFactory shardHandlerFactory; - - private String adminPath; - private ZkStateReader zkStateReader; private boolean isClosed; @@ -102,8 +98,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable { private OverseerNodePrioritizer prioritizer; public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId, - final ShardHandlerFactory shardHandlerFactory, - String adminPath, Overseer.Stats stats, OverseerMessageHandlerSelector selector, OverseerNodePrioritizer prioritizer, @@ -113,8 +107,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable { DistributedMap failureMap) { this.zkStateReader = zkStateReader; this.myId = myId; - this.shardHandlerFactory = shardHandlerFactory; - this.adminPath = adminPath; this.stats = stats; this.selector = selector; this.prioritizer = prioritizer; @@ -206,10 +198,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable { if (isClosed) break; + taskBatch.batchId++; for (QueueEvent head : heads) { + if (runningZKTasks.contains(head.getId())) continue; final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message); - String taskKey = messageHandler.getTaskKey(message); final String asyncId = message.getStr(ASYNC); if (hasLeftOverItems) { if (head.getId().equals(oldestItemInWorkQueue)) @@ -220,27 +213,29 @@ public class OverseerTaskProcessor implements Runnable, Closeable { continue; } } - - if (!checkExclusivity(messageHandler, message, head.getId())) { + String operation = message.getStr(Overseer.QUEUE_OPERATION); + OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch); + if (lock == null) { log.debug("Exclusivity check failed for [{}]", message.toString()); continue; } - try { - markTaskAsRunning(messageHandler, head, taskKey, asyncId, message); + markTaskAsRunning(head, asyncId); log.debug("Marked task [{}] as running", head.getId()); } catch (KeeperException.NodeExistsException e) { + lock.unlock(); // This should never happen log.error("Tried to pick up task [{}] when it was already running!", head.getId()); + continue; } catch (InterruptedException e) { + lock.unlock(); log.error("Thread interrupted while trying to pick task for execution.", head.getId()); Thread.currentThread().interrupt(); + continue; } - log.info(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString()); - String operation = message.getStr(Overseer.QUEUE_OPERATION); Runner runner = new Runner(messageHandler, message, - operation, head); + operation, head, lock); tpe.execute(runner); } @@ -262,31 +257,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable { } } - protected boolean checkExclusivity(OverseerMessageHandler messageHandler, ZkNodeProps message, String id) - throws KeeperException, InterruptedException { - String taskKey = messageHandler.getTaskKey(message); - - if (taskKey == null) - return true; - - OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message); - switch (marking) { - case NOTDETERMINED: - break; - case EXCLUSIVE: - return true; - case NONEXCLUSIVE: - return false; - default: - throw new IllegalArgumentException("Undefined marking: " + marking); - } - - if (runningZKTasks.contains(id)) - return false; - - return true; - } - private void cleanUpWorkQueue() throws KeeperException, InterruptedException { synchronized (completedTasks) { for (String id : completedTasks.keySet()) { @@ -390,8 +360,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { } @SuppressWarnings("unchecked") - private void markTaskAsRunning(OverseerMessageHandler messageHandler, QueueEvent head, String taskKey, - String asyncId, ZkNodeProps message) + private void markTaskAsRunning(QueueEvent head, String asyncId) throws KeeperException, InterruptedException { synchronized (runningZKTasks) { runningZKTasks.add(head.getId()); @@ -401,7 +370,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { runningTasks.add(head.getId()); } - messageHandler.markExclusiveTask(taskKey, message); +// messageHandler.markExclusiveTask(taskKey, message); if (asyncId != null) runningMap.put(asyncId, null); @@ -413,12 +382,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable { SolrResponse response; QueueEvent head; OverseerMessageHandler messageHandler; - - public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head) { + private final OverseerMessageHandler.Lock lock; + + public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head, OverseerMessageHandler.Lock lock) { this.message = message; this.operation = operation; this.head = head; this.messageHandler = messageHandler; + this.lock = lock; response = null; } @@ -454,7 +425,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { log.debug("Completed task:[{}]", head.getId()); } - markTaskComplete(messageHandler, head.getId(), asyncId, taskKey, message); + markTaskComplete(head.getId(), asyncId); log.debug("Marked task [{}] as completed.", head.getId()); printTrackingMaps(); @@ -469,6 +440,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { log.warn("Resetting task {} as the thread was interrupted.", head.getId()); Thread.currentThread().interrupt(); } finally { + lock.unlock(); if (!success) { // Reset task from tracking data structures so that it can be retried. resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message); @@ -479,7 +451,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { } } - private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) + private void markTaskComplete(String id, String asyncId) throws KeeperException, InterruptedException { synchronized (completedTasks) { completedTasks.put(id, head); @@ -494,9 +466,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable { log.warn("Could not find and remove async call [" + asyncId + "] from the running map."); } } - - - messageHandler.unmarkExclusiveTask(taskKey, operation, message); } private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) { @@ -512,7 +481,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable { runningTasks.remove(id); } - messageHandler.unmarkExclusiveTask(taskKey, operation, message); } catch (KeeperException e) { SolrException.log(log, "", e); } catch (InterruptedException e) { @@ -568,4 +536,20 @@ public class OverseerTaskProcessor implements Runnable, Closeable { OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message); } + final private TaskBatch taskBatch = new TaskBatch(); + + public class TaskBatch { + private long batchId = 0; + + public long getId() { + return batchId; + } + + public int getRunningTasks() { + synchronized (runningTasks) { + return runningTasks.size(); + } + } + } + } diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java index 11955831bfa..c18b330b5ef 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java @@ -30,8 +30,10 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.Utils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,27 +101,32 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase { } } - private void testTaskExclusivity() throws IOException, SolrServerException { + private void testTaskExclusivity() throws Exception, SolrServerException { + + DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(), + "/overseer/collection-queue-work", new Overseer.Stats()); try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { + Create createCollectionRequest = new Create() .setCollectionName("ocptest_shardsplit") .setNumShards(4) .setConfigName("conf1") .setAsyncId("1000"); createCollectionRequest.process(client); - - SplitShard splitShardRequest = new SplitShard() - .setCollectionName("ocptest_shardsplit") - .setShardName(SHARD1) - .setAsyncId("1001"); - splitShardRequest.process(client); - - splitShardRequest = new SplitShard() - .setCollectionName("ocptest_shardsplit") - .setShardName(SHARD2) - .setAsyncId("1002"); - splitShardRequest.process(client); - + + distributedQueue.offer(Utils.toJSON(Utils.makeMap( + "collection", "ocptest_shardsplit", + Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(), + CommonAdminParams.ASYNC, "1001", + "sleep", "100" + ))); + distributedQueue.offer(Utils.toJSON(Utils.makeMap( + "collection", "ocptest_shardsplit", + Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(), + CommonAdminParams.ASYNC, "1002", + "sleep", "100" + ))); + int iterations = 0; while(true) { int runningTasks = 0; diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java new file mode 100644 index 00000000000..7e4d9b764e3 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java @@ -0,0 +1,130 @@ +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + +import com.google.common.collect.ImmutableSet; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.OverseerMessageHandler.Lock; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CollectionParams.CollectionAction; +import org.apache.solr.common.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class TestLockTree extends SolrTestCaseJ4 { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + + public void testLocks() throws Exception { + LockTree lockTree = new LockTree(); + Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE, + Arrays.asList("coll1")); + assertNotNull(coll1Lock); + assertNull("Should not be able to lock coll1/shard1", lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE, + Arrays.asList("coll1", "shard1"))); + + assertNull(lockTree.getSession().lock(ADDREPLICAPROP, + Arrays.asList("coll1", "shard1", "core_node2"))); + coll1Lock.unlock(); + Lock shard1Lock = lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE, + Arrays.asList("coll1", "shard1")); + assertNotNull(shard1Lock); + shard1Lock.unlock(); + Lock replica1Lock = lockTree.getSession().lock(ADDREPLICAPROP, + Arrays.asList("coll1", "shard1", "core_node2")); + assertNotNull(replica1Lock); + + + List>> operations = new ArrayList<>(); + operations.add(new Pair<>(ADDREPLICAPROP, Arrays.asList("coll1", "shard1", "core_node2"))); + operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll1"))); + operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll1", "shard1"))); + operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll2", "shard2"))); + operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll2"))); + operations.add(new Pair<>(DELETEREPLICA, Arrays.asList("coll2", "shard1"))); + + List> orderOfExecution = Arrays.asList( + ImmutableSet.of("coll1/shard1/core_node2", "coll2/shard2"), + ImmutableSet.of("coll1", "coll2"), + ImmutableSet.of("coll1/shard1", "coll2/shard1")); + lockTree = new LockTree(); + for (int counter = 0; counter < orderOfExecution.size(); counter++) { + LockTree.Session session = lockTree.getSession(); + List>> completedOps = new CopyOnWriteArrayList<>(); + List locks = new CopyOnWriteArrayList<>(); + List threads = new ArrayList<>(); + for (int i = 0; i < operations.size(); i++) { + Pair> operation = operations.get(i); + final Lock lock = session.lock(operation.first(), operation.second()); + if (lock != null) { + Thread thread = new Thread(getRunnable(completedOps, operation, locks, lock)); + threads.add(thread); + thread.start(); + } + } + + + for (Thread thread : threads) thread.join(); + if (locks.isEmpty()) + throw new RuntimeException("Could not attain lock for anything " + operations); + + Set expectedOps = orderOfExecution.get(counter); + log.info("counter : {} , expected : {}, actual : {}", counter, expectedOps, locks); + assertEquals(expectedOps.size(), locks.size()); + for (Lock lock : locks) + assertTrue("locks : " + locks + " expectedOps : " + expectedOps, expectedOps.contains(lock.toString())); + locks.clear(); + for (Pair> completedOp : completedOps) { + operations.remove(completedOp); + } + } + } + + private Runnable getRunnable(List>> completedOps, Pair> operation, List locks, Lock lock) { + return () -> { + try { + Thread.sleep(1); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + completedOps.add(operation); + locks.add(lock); + } + }; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java index cc505f8cfc4..42cf37201ce 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java @@ -18,65 +18,104 @@ package org.apache.solr.common.params; import java.util.Locale; -public interface CollectionParams -{ - /** What action **/ - public final static String ACTION = "action"; - public final static String NAME = "name"; - +public interface CollectionParams { + /** + * What action + **/ + String ACTION = "action"; + String NAME = "name"; - public enum CollectionAction { - CREATE(true), - DELETE(true), - RELOAD(true), - SYNCSHARD(true), - CREATEALIAS(true), - DELETEALIAS(true), - SPLITSHARD(true), - DELETESHARD(true), - CREATESHARD(true), - DELETEREPLICA(true), - FORCELEADER(true), - MIGRATE(true), - ADDROLE(true), - REMOVEROLE(true), - CLUSTERPROP(true), - REQUESTSTATUS(false), - DELETESTATUS(false), - ADDREPLICA(true), - OVERSEERSTATUS(false), - LIST(false), - CLUSTERSTATUS(false), - ADDREPLICAPROP(true), - DELETEREPLICAPROP(true), - BALANCESHARDUNIQUE(true), - REBALANCELEADERS(true), - MODIFYCOLLECTION(true), - MIGRATESTATEFORMAT(true), - BACKUP(true), - RESTORE(true); - - public final boolean isWrite; + enum LockLevel { + CLUSTER(0), + COLLECTION(1), + SHARD(2), + REPLICA(3), + NONE(10); - CollectionAction(boolean isWrite) { - this.isWrite = isWrite; + public final int level; + + LockLevel(int i) { + this.level = i; } - public static CollectionAction get(String p) { - if( p != null ) { - try { - return CollectionAction.valueOf( p.toUpperCase(Locale.ROOT) ); - } - catch( Exception ex ) {} + public LockLevel getChild() { + return getLevel(level + 1); + } + + public static LockLevel getLevel(int i) { + for (LockLevel v : values()) { + if (v.level == i) return v; } return null; } - public boolean isEqual(String s){ - if(s == null) return false; + + public boolean isHigherOrEqual(LockLevel that) { + return that.level <= level; + } + } + + enum CollectionAction { + CREATE(true, LockLevel.COLLECTION), + DELETE(true, LockLevel.COLLECTION), + RELOAD(true, LockLevel.COLLECTION), + SYNCSHARD(true, LockLevel.SHARD), + CREATEALIAS(true, LockLevel.COLLECTION), + DELETEALIAS(true, LockLevel.COLLECTION), + SPLITSHARD(true, LockLevel.SHARD), + DELETESHARD(true, LockLevel.SHARD), + CREATESHARD(true, LockLevel.COLLECTION), + DELETEREPLICA(true, LockLevel.SHARD), + FORCELEADER(true, LockLevel.SHARD), + MIGRATE(true, LockLevel.SHARD), + ADDROLE(true, LockLevel.NONE), + REMOVEROLE(true, LockLevel.NONE), + CLUSTERPROP(true, LockLevel.NONE), + REQUESTSTATUS(false, LockLevel.NONE), + DELETESTATUS(false, LockLevel.NONE), + ADDREPLICA(true, LockLevel.SHARD), + OVERSEERSTATUS(false, LockLevel.NONE), + LIST(false, LockLevel.NONE), + CLUSTERSTATUS(false, LockLevel.NONE), + ADDREPLICAPROP(true, LockLevel.REPLICA), + DELETEREPLICAPROP(true, LockLevel.REPLICA), + BALANCESHARDUNIQUE(true, LockLevel.SHARD), + REBALANCELEADERS(true, LockLevel.COLLECTION), + MODIFYCOLLECTION(true, LockLevel.COLLECTION), + MIGRATESTATEFORMAT(true, LockLevel.CLUSTER), + BACKUP(true, LockLevel.COLLECTION), + RESTORE(true, LockLevel.COLLECTION), + //only for testing. it just waits for specified time + // these are not exposed via collection API commands + // but the overseer is aware of these tasks + MOCK_COLL_TASK(false, LockLevel.COLLECTION), + MOCK_SHARD_TASK(false, LockLevel.SHARD), + MOCK_REPLICA_TASK(false, LockLevel.REPLICA) + ; + public final boolean isWrite; + public final LockLevel lockLevel; + + CollectionAction(boolean isWrite, LockLevel level) { + this.isWrite = isWrite; + this.lockLevel = level; + } + + public static CollectionAction get(String p) { + if (p != null) { + try { + return CollectionAction.valueOf(p.toUpperCase(Locale.ROOT)); + } catch (Exception ex) { + } + } + return null; + } + + public boolean isEqual(String s) { + if (s == null) return false; return toString().equals(s.toUpperCase(Locale.ROOT)); } - public String toLower(){ + + public String toLower() { return toString().toLowerCase(Locale.ROOT); } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java b/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java index 5fa0fae5795..995e142e1c2 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java @@ -149,10 +149,11 @@ public class StrUtils { * @see #escapeTextWithSeparator */ public static String join(Collection items, char separator) { + if (items == null) return ""; StringBuilder sb = new StringBuilder(items.size() << 3); boolean first=true; for (Object o : items) { - String item = o.toString(); + String item = String.valueOf(o); if (first) { first = false; } else {