SOLR-8744: Minimize the impact on ZK when there are a lot of blocked tasks

This commit is contained in:
Noble Paul 2016-06-12 13:11:07 +05:30
parent 844ca4a348
commit 232b44e283
7 changed files with 139 additions and 19 deletions

View File

@ -82,6 +82,7 @@ import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
@ -210,6 +211,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
@Override
@SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
@SuppressWarnings("unchecked")
public SolrResponse processMessage(ZkNodeProps message, String operation) {
log.info("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
@ -289,6 +291,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
case MOCK_REPLICA_TASK: {
//only for test purposes
Thread.sleep(message.getInt("sleep", 1));
log.info("MOCK_TASK_EXECUTED time {} data {}",System.currentTimeMillis(), Utils.toJSONString(message));
results.add("MOCK_FINISHED", System.currentTimeMillis());
break;
}
default:

View File

@ -19,15 +19,19 @@ package org.apache.solr.cloud;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
@ -36,6 +40,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.util.DefaultSolrThreadFactory;
@ -63,6 +68,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
* executed concurrently
*/
public static final int MAX_PARALLEL_TASKS = 100;
public static final int MAX_BLOCKED_TASKS = 1000;
public ExecutorService tpe;
@ -74,7 +80,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
private DistributedMap failureMap;
// Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
final private Set runningTasks;
final private Set<String> runningTasks;
// List of completed tasks. This is used to clean up workQueue in zk.
final private HashMap<String, QueueEvent> completedTasks;
@ -91,6 +97,24 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
// It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
// deleted from the work-queue as that is a batched operation.
final private Set<String> runningZKTasks;
// This map may contain tasks which are read from work queue but could not
// be executed because they are blocked or the execution queue is full
// This is an optimization to ensure that we do not read the same tasks
// again and again from ZK.
final private Map<String, QueueEvent> blockedTasks = new LinkedHashMap<>();
final private Predicate<String> excludedTasks = new Predicate<String>() {
@Override
public boolean test(String s) {
return runningTasks.contains(s) || blockedTasks.containsKey(s);
}
@Override
public String toString() {
return StrUtils.join(ImmutableSet.of(runningTasks, blockedTasks.keySet()), ',');
}
};
private final Object waitLock = new Object();
private OverseerMessageHandlerSelector selector;
@ -115,7 +139,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
this.completedMap = completedMap;
this.failureMap = failureMap;
this.runningZKTasks = new HashSet<>();
this.runningTasks = new HashSet();
this.runningTasks = new HashSet<>();
this.completedTasks = new HashMap<>();
}
@ -189,17 +213,46 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
if (waited)
cleanUpWorkQueue();
List<QueueEvent> heads = workQueue.peekTopN(MAX_PARALLEL_TASKS, runningZKTasks, 2000L);
ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
heads.addAll(blockedTasks.values());
//If we have enough items in the blocked tasks already, it makes
// no sense to read more items from the work queue. it makes sense
// to clear out at least a few items in the queue before we read more items
if (heads.size() < MAX_BLOCKED_TASKS) {
//instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
heads.addAll(newTasks);
} else {
// Prevent free-spinning this loop.
Thread.sleep(1000);
}
if (isClosed) break;
if (heads.isEmpty()) {
continue;
}
log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
if (isClosed) break;
blockedTasks.clear(); // clear it now; may get refilled below.
taskBatch.batchId++;
boolean tooManyTasks = false;
for (QueueEvent head : heads) {
if (!tooManyTasks) {
synchronized (runningTasks) {
tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
}
}
if (tooManyTasks) {
// Too many tasks are running, just shove the rest into the "blocked" queue.
if(blockedTasks.size() < MAX_BLOCKED_TASKS)
blockedTasks.put(head.getId(), head);
continue;
}
if (runningZKTasks.contains(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
@ -217,6 +270,9 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
if (lock == null) {
log.debug("Exclusivity check failed for [{}]", message.toString());
//we may end crossing the size of the MAX_BLOCKED_TASKS. They are fine
if (blockedTasks.size() < MAX_BLOCKED_TASKS)
blockedTasks.put(head.getId(), head);
continue;
}
try {
@ -370,7 +426,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
runningTasks.add(head.getId());
}
// messageHandler.markExclusiveTask(taskKey, message);
if (asyncId != null)
runningMap.put(asyncId, null);
@ -512,6 +567,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
synchronized (runningTasks) {
log.debug("RunningTasks: {}", runningTasks.toString());
}
log.debug("BlockedTasks: {}", blockedTasks.keySet().toString());
synchronized (completedTasks) {
log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
}

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -222,7 +223,7 @@ public class OverseerTaskQueue extends DistributedQueue {
}
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
public List<QueueEvent> peekTopN(int n, Predicate<String> excludeSet, long waitMillis)
throws KeeperException, InterruptedException {
ArrayList<QueueEvent> topN = new ArrayList<>();
@ -232,7 +233,7 @@ public class OverseerTaskQueue extends DistributedQueue {
else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
try {
for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.contains(dir + "/" + child))) {
for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.test(dir + "/" + child))) {
topN.add(new QueueEvent(dir + "/" + element.first(),
element.second(), null));
}

View File

@ -38,6 +38,11 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
import static org.apache.solr.cloud.OverseerTaskProcessor.MAX_PARALLEL_TASKS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
/**
* Tests the Multi threaded Collections API.
*/
@ -55,11 +60,58 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
testParallelCollectionAPICalls();
testTaskExclusivity();
testDeduplicationOfSubmittedTasks();
testLongAndShortRunningParallelApiCalls();
testFillWorkQueue();
}
private void testFillWorkQueue() throws Exception {
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
"/overseer/collection-queue-work", new Overseer.Stats());
//fill the work queue with blocked tasks by adding more than the no:of parallel tasks
for (int i = 0; i < MAX_PARALLEL_TASKS+5; i++) {
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
"collection", "A_COLL",
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
ASYNC, String.valueOf(i),
"sleep", (i == 0 ? "1000" : "1") //first task waits for 1 second, and thus blocking
// all other tasks. Subsequent tasks only wait for 1ms
)));
log.info("MOCK task added {}", i);
}
Thread.sleep(10);//wait and post the next message
//this is not going to be blocked because it operates on another collection
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
"collection", "B_COLL",
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
ASYNC, "200",
"sleep", "1"
)));
Long acoll = null, bcoll = null;
for (int i = 0; i < 100; i++) {
if (bcoll == null) {
CollectionAdminResponse statusResponse = getStatusResponse("200", client);
bcoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
}
if (acoll == null) {
CollectionAdminResponse statusResponse = getStatusResponse("2", client);
acoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
}
if (acoll != null && bcoll != null) break;
Thread.sleep(100);
}
assertTrue(acoll != null && bcoll != null);
assertTrue(acoll > bcoll);
}
}
private void testParallelCollectionAPICalls() throws IOException, SolrServerException {
@ -116,14 +168,14 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
"collection", "ocptest_shardsplit",
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
CommonAdminParams.ASYNC, "1001",
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
ASYNC, "1001",
"sleep", "100"
)));
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
"collection", "ocptest_shardsplit",
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
CommonAdminParams.ASYNC, "1002",
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
ASYNC, "1002",
"sleep", "100"
)));

View File

@ -21,6 +21,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
@ -163,7 +164,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
log.info("SHARDHANDLER");
return shardHandlerMock;
}).anyTimes();
workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Set.class), EasyMock.anyLong());
workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Predicate.class), EasyMock.anyLong());
expectLastCall().andAnswer(() -> {
Object result;
int count = 0;

View File

@ -20,6 +20,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.SolrResponseBase;
@ -75,7 +77,7 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
tq.createRequestNode(Utils.toJSON(props), watchID);
// Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, s -> false, 1000);
OverseerTaskQueue.QueueEvent requestId2Event = null;
for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());

View File

@ -1988,12 +1988,16 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
static RequestStatusState getRequestState(String requestId, SolrClient client) throws IOException, SolrServerException {
CollectionAdminRequest.RequestStatus requestStatusRequest = new CollectionAdminRequest.RequestStatus();
requestStatusRequest.setRequestId(requestId);
CollectionAdminResponse response = requestStatusRequest.process(client);
CollectionAdminResponse response = getStatusResponse(requestId, client);
NamedList innerResponse = (NamedList) response.getResponse().get("status");
return RequestStatusState.fromKey((String) innerResponse.get("state"));
}
static CollectionAdminResponse getStatusResponse(String requestId, SolrClient client) throws SolrServerException, IOException {
CollectionAdminRequest.RequestStatus requestStatusRequest = new CollectionAdminRequest.RequestStatus();
requestStatusRequest.setRequestId(requestId);
return requestStatusRequest.process(client);
}
}