mirror of https://github.com/apache/lucene.git
SOLR-5681: Make the OverseerCollectionProcessor multi-threaded
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1596089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c4bdb2dad0
commit
c3692aa73d
|
@ -151,6 +151,8 @@ Other Changes
|
|||
|
||||
* SOLR-5340: Add support for named snapshots (Varun Thacker via Noble Paul)
|
||||
|
||||
* SOLR-5681: Make the processing of Collection API calls multi-threaded. (Anshum Gupta)
|
||||
|
||||
Build
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.zookeeper.WatchedEvent;
|
|||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -160,6 +161,12 @@ public class DistributedMap {
|
|||
return zookeeper.exists(dir + "/" + prefix + trackingId, true);
|
||||
}
|
||||
|
||||
public int size() throws KeeperException, InterruptedException {
|
||||
Stat stat = new Stat();
|
||||
zookeeper.getData(dir, null, stat, true);
|
||||
return stat.getNumChildren();
|
||||
}
|
||||
|
||||
public void remove(String trackingId) throws KeeperException, InterruptedException {
|
||||
zookeeper.delete(dir + "/" + prefix + trackingId, -1, true);
|
||||
}
|
||||
|
|
|
@ -18,10 +18,6 @@
|
|||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
|
@ -36,6 +32,12 @@ import org.apache.zookeeper.data.ACL;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* A distributed queue from zk recipes.
|
||||
*/
|
||||
|
@ -364,6 +366,103 @@ public class DistributedQueue {
|
|||
}
|
||||
}
|
||||
|
||||
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, Long wait)
|
||||
throws KeeperException, InterruptedException {
|
||||
ArrayList<QueueEvent> topN = new ArrayList<>();
|
||||
|
||||
LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
|
||||
boolean waitedEnough = false;
|
||||
TimerContext time = null;
|
||||
if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
|
||||
else time = stats.time(dir + "_peekTopN_wait" + wait);
|
||||
|
||||
try {
|
||||
TreeMap<Long, String> orderedChildren;
|
||||
while (true) {
|
||||
LatchChildWatcher childWatcher = new LatchChildWatcher();
|
||||
try {
|
||||
orderedChildren = orderedChildren(childWatcher);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (orderedChildren.size() == 0) {
|
||||
if(waitedEnough) return null;
|
||||
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
|
||||
waitedEnough = wait != Long.MAX_VALUE;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (String headNode : orderedChildren.values()) {
|
||||
if (headNode != null && topN.size() < n) {
|
||||
try {
|
||||
String id = dir + "/" + headNode;
|
||||
if (excludeSet != null && excludeSet.contains(id)) continue;
|
||||
QueueEvent queueEvent = new QueueEvent(id,
|
||||
zookeeper.getData(dir + "/" + headNode, null, null, true), null);
|
||||
topN.add(queueEvent);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// Another client removed the node first, try next
|
||||
}
|
||||
} else {
|
||||
if (topN.size() >= 1) {
|
||||
printQueueEventsListElementIds(topN);
|
||||
return topN;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (topN.size() > 0 ) {
|
||||
printQueueEventsListElementIds(topN);
|
||||
return topN;
|
||||
}
|
||||
if (waitedEnough) {
|
||||
LOG.debug("Waited enough, returning null after peekTopN");
|
||||
return null;
|
||||
}
|
||||
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
|
||||
waitedEnough = wait != Long.MAX_VALUE;
|
||||
}
|
||||
} finally {
|
||||
time.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
StringBuffer sb = new StringBuffer("[");
|
||||
for(QueueEvent queueEvent: topN) {
|
||||
sb.append(queueEvent.getId()).append(", ");
|
||||
}
|
||||
sb.append("]");
|
||||
LOG.debug("Returning topN elements: {}", sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* Gets last element of the Queue without removing it.
|
||||
*/
|
||||
public String getTailId() throws KeeperException, InterruptedException {
|
||||
TreeMap<Long, String> orderedChildren = null;
|
||||
orderedChildren = orderedChildren(null);
|
||||
if(orderedChildren == null || orderedChildren.isEmpty()) return null;
|
||||
|
||||
for(String headNode : orderedChildren.descendingMap().values())
|
||||
if (headNode != null) {
|
||||
try {
|
||||
QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
|
||||
null, null, true), null);
|
||||
return queueEvent.getId();
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// Another client removed the node first, try next
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static class QueueEvent {
|
||||
@Override
|
||||
public int hashCode() {
|
||||
|
|
|
@ -1200,7 +1200,6 @@ public class Overseer {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
if (ccThread != null) {
|
||||
try {
|
||||
ccThread.close();
|
||||
|
@ -1309,16 +1308,19 @@ public class Overseer {
|
|||
|
||||
public void success(String operation) {
|
||||
String op = operation.toLowerCase(Locale.ROOT);
|
||||
Stat stat = stats.get(op);
|
||||
if (stat == null) {
|
||||
stat = new Stat();
|
||||
stats.put(op, stat);
|
||||
synchronized (stats) {
|
||||
Stat stat = stats.get(op);
|
||||
if (stat == null) {
|
||||
stat = new Stat();
|
||||
stats.put(op, stat);
|
||||
}
|
||||
stat.success.incrementAndGet();
|
||||
}
|
||||
stat.success.incrementAndGet();
|
||||
}
|
||||
|
||||
public void error(String operation) {
|
||||
String op = operation.toLowerCase(Locale.ROOT);
|
||||
synchronized (stats) {
|
||||
Stat stat = stats.get(op);
|
||||
if (stat == null) {
|
||||
stat = new Stat();
|
||||
|
@ -1326,20 +1328,26 @@ public class Overseer {
|
|||
}
|
||||
stat.errors.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
public TimerContext time(String operation) {
|
||||
String op = operation.toLowerCase(Locale.ROOT);
|
||||
Stat stat = stats.get(op);
|
||||
Stat stat;
|
||||
synchronized (stats) {
|
||||
stat = stats.get(op);
|
||||
if (stat == null) {
|
||||
stat = new Stat();
|
||||
stats.put(op, stat);
|
||||
}
|
||||
}
|
||||
return stat.requestTime.time();
|
||||
}
|
||||
|
||||
public void storeFailureDetails(String operation, ZkNodeProps request, SolrResponse resp) {
|
||||
String op = operation.toLowerCase(Locale.ROOT);
|
||||
Stat stat = stats.get(op);
|
||||
Stat stat ;
|
||||
synchronized (stats) {
|
||||
stat = stats.get(op);
|
||||
if (stat == null) {
|
||||
stat = new Stat();
|
||||
stats.put(op, stat);
|
||||
|
@ -1352,6 +1360,7 @@ public class Overseer {
|
|||
failedOps.addLast(new FailedOp(request, resp));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<FailedOp> getFailureDetails(String operation) {
|
||||
Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -283,16 +283,16 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
success.add("state", "completed");
|
||||
success.add("msg", "found " + requestId + " in completed tasks");
|
||||
results.add("status", success);
|
||||
} else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "running");
|
||||
success.add("msg", "found " + requestId + " in submitted tasks");
|
||||
results.add("status", success);
|
||||
} else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "failed");
|
||||
success.add("msg", "found " + requestId + " in failed tasks");
|
||||
results.add("status", success);
|
||||
} else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "running");
|
||||
success.add("msg", "found " + requestId + " in submitted tasks");
|
||||
results.add("status", success);
|
||||
} else {
|
||||
SimpleOrderedMap failure = new SimpleOrderedMap();
|
||||
failure.add("state", "notfound");
|
||||
|
|
|
@ -1345,11 +1345,13 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
* Helper method to add a task to a tracking map.
|
||||
*/
|
||||
protected void addTask(String map, TaskObject o, boolean limit) {
|
||||
if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
|
||||
String key = getMap(map).entrySet().iterator().next().getKey();
|
||||
getMap(map).remove(key);
|
||||
synchronized (getMap(map)) {
|
||||
if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
|
||||
String key = getMap(map).entrySet().iterator().next().getKey();
|
||||
getMap(map).remove(key);
|
||||
}
|
||||
addTask(map, o);
|
||||
}
|
||||
addTask(map, o);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -405,6 +405,10 @@ public class HttpShardHandler extends ShardHandler {
|
|||
ClientUtils.addSlices(target, collectionName, slices, multiCollection);
|
||||
}
|
||||
|
||||
public ShardHandlerFactory getShardHandlerFactory(){
|
||||
return httpShardHandlerFactory;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -25,4 +25,5 @@ public abstract class ShardHandler {
|
|||
public abstract ShardResponse takeCompletedIncludingErrors();
|
||||
public abstract ShardResponse takeCompletedOrError();
|
||||
public abstract void cancelAll();
|
||||
public abstract ShardHandlerFactory getShardHandlerFactory();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,235 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* Tests the Multi threaded Collections API.
|
||||
*/
|
||||
public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private static Logger log = LoggerFactory
|
||||
.getLogger(MultiThreadedOCPTest.class);
|
||||
|
||||
private static int NUM_COLLECTIONS = 4;
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
useJettyDataDir = false;
|
||||
|
||||
System.setProperty("numShards", Integer.toString(sliceCount));
|
||||
System.setProperty("solr.xml.persist", "true");
|
||||
}
|
||||
|
||||
public MultiThreadedOCPTest() {
|
||||
fixShardCount = true;
|
||||
sliceCount = 2;
|
||||
shardCount = 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
|
||||
testParallelCollectionAPICalls();
|
||||
testTaskExclusivity();
|
||||
testLongAndShortRunningParallelApiCalls();
|
||||
}
|
||||
|
||||
private void testParallelCollectionAPICalls() throws IOException, SolrServerException {
|
||||
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
|
||||
|
||||
for(int i = 1 ; i <= NUM_COLLECTIONS ; i++) {
|
||||
CollectionAdminRequest.createCollection("ocptest" + i, 4, "conf1", server, i + "");
|
||||
}
|
||||
|
||||
boolean pass = false;
|
||||
int counter = 0;
|
||||
while(true) {
|
||||
int numRunningTasks = 0;
|
||||
for (int i = 1; i <= NUM_COLLECTIONS; i++)
|
||||
if (getRequestState(i + "", server).equals("running"))
|
||||
numRunningTasks++;
|
||||
if(numRunningTasks > 1) {
|
||||
pass = true;
|
||||
break;
|
||||
} else if(counter++ > 100)
|
||||
break;
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
assertTrue("More than one tasks were supposed to be running in parallel but they weren't.", pass);
|
||||
for(int i=1;i<=NUM_COLLECTIONS;i++) {
|
||||
String state = getRequestStateAfterCompletion(i + "", 30, server);
|
||||
assertTrue("Task " + i + " did not complete, final state: " + state,state.equals("completed"));
|
||||
}
|
||||
}
|
||||
|
||||
private void testTaskExclusivity() throws IOException, SolrServerException {
|
||||
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
|
||||
CollectionAdminRequest.createCollection("ocptest_shardsplit", 4, "conf1", server, "1000");
|
||||
|
||||
CollectionAdminRequest.splitShard("ocptest_shardsplit", SHARD1, server, "1001");
|
||||
CollectionAdminRequest.splitShard("ocptest_shardsplit", SHARD2, server, "1002");
|
||||
|
||||
int iterations = 0;
|
||||
while(true) {
|
||||
int runningTasks = 0;
|
||||
int completedTasks = 0;
|
||||
for (int i=1001;i<=1002;i++) {
|
||||
String state = getRequestState(i, server);
|
||||
if (state.equals("running"))
|
||||
runningTasks++;
|
||||
if (state.equals("completed"))
|
||||
completedTasks++;
|
||||
assertTrue("We have a failed SPLITSHARD task", !state.equals("failed"));
|
||||
}
|
||||
// TODO: REQUESTSTATUS might come back with more than 1 running tasks over multiple calls.
|
||||
// The only way to fix this is to support checking of multiple requestids in a single REQUESTSTATUS task.
|
||||
|
||||
assertTrue("Mutual exclusion failed. Found more than one task running for the same collection", runningTasks < 2);
|
||||
|
||||
if(completedTasks == 2 || iterations++ > 90)
|
||||
break;
|
||||
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
for (int i=1001;i<=1002;i++) {
|
||||
String state = getRequestStateAfterCompletion(i + "", 30, server);
|
||||
assertTrue("Task " + i + " did not complete, final state: " + state,state.equals("completed"));
|
||||
}
|
||||
}
|
||||
|
||||
private void testLongAndShortRunningParallelApiCalls() throws InterruptedException, IOException, SolrServerException {
|
||||
|
||||
Thread indexThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
Random random = random();
|
||||
int max = atLeast(random, 200);
|
||||
for (int id = 101; id < max; id++) {
|
||||
try {
|
||||
doAddDoc(String.valueOf(id));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while adding docs", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
indexThread.start();
|
||||
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
|
||||
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
|
||||
CollectionAdminRequest.splitShard("collection1", SHARD1, server, "2000");
|
||||
|
||||
String state = getRequestState("2000", server);
|
||||
while (!state.equals("running")) {
|
||||
state = getRequestState("2000", server);
|
||||
if (state.equals("completed") || state.equals("failed"))
|
||||
break;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertTrue("SplitShard task [2000] was supposed to be in [running] but isn't. It is [" + state + "]", state.equals("running"));
|
||||
|
||||
invokeCollectionApi("action", CollectionParams.CollectionAction.OVERSEERSTATUS.toLower());
|
||||
|
||||
state = getRequestState("2000", server);
|
||||
|
||||
assertTrue("After invoking OVERSEERSTATUS, SplitShard task [2000] was still supposed to be in [running] but isn't." +
|
||||
"It is [" + state + "]", state.equals("running"));
|
||||
|
||||
} finally {
|
||||
try {
|
||||
indexThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Indexing thread interrupted.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void doAddDoc(String id) throws Exception {
|
||||
index("id", id);
|
||||
// todo - target diff servers and use cloud clients as well as non-cloud clients
|
||||
}
|
||||
|
||||
private String getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrServer server)
|
||||
throws IOException, SolrServerException {
|
||||
String state = null;
|
||||
while(waitForSeconds-- > 0) {
|
||||
state = getRequestState(requestId, server);
|
||||
if(state.equals("completed") || state.equals("failed"))
|
||||
return state;
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
private String getRequestState(int requestId, SolrServer server) throws IOException, SolrServerException {
|
||||
return getRequestState(String.valueOf(requestId), server);
|
||||
}
|
||||
|
||||
private String getRequestState(String requestId, SolrServer server) throws IOException, SolrServerException {
|
||||
CollectionAdminResponse response = CollectionAdminRequest.requestStatus(requestId, server);
|
||||
NamedList innerResponse = (NamedList) response.getResponse().get("status");
|
||||
return (String) innerResponse.get("state");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
System.clearProperty("numShards");
|
||||
System.clearProperty("zkHost");
|
||||
System.clearProperty("solr.xml.persist");
|
||||
|
||||
// insurance
|
||||
DirectUpdateHandler2.commitOnClose = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -31,12 +31,13 @@ import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
|||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.handler.component.ShardResponse;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.IAnswer;
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -44,14 +45,17 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
|
||||
import static org.easymock.EasyMock.anyBoolean;
|
||||
import static org.easymock.EasyMock.anyObject;
|
||||
|
@ -74,36 +78,32 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
private static DistributedMap runningMapMock;
|
||||
private static DistributedMap completedMapMock;
|
||||
private static DistributedMap failureMapMock;
|
||||
private static ShardHandlerFactory shardHandlerFactoryMock;
|
||||
private static ShardHandler shardHandlerMock;
|
||||
private static ZkStateReader zkStateReaderMock;
|
||||
private static ClusterState clusterStateMock;
|
||||
private static SolrZkClient solrZkClientMock;
|
||||
private final Map zkMap = new HashMap();
|
||||
private final Set collectionsSet = new HashSet();
|
||||
private SolrResponse lastProcessMessageResult;
|
||||
|
||||
|
||||
private OverseerCollectionProcessorToBeTested underTest;
|
||||
|
||||
private Thread thread;
|
||||
private Queue<QueueEvent> queue = new BlockingArrayQueue<>();
|
||||
private Queue<QueueEvent> queue = new ArrayBlockingQueue<>(10);
|
||||
|
||||
private class OverseerCollectionProcessorToBeTested extends
|
||||
OverseerCollectionProcessor {
|
||||
|
||||
private SolrResponse lastProcessMessageResult;
|
||||
|
||||
|
||||
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
|
||||
String myId, ShardHandler shardHandler, String adminPath,
|
||||
String myId, ShardHandlerFactory shardHandlerFactory,
|
||||
String adminPath,
|
||||
DistributedQueue workQueue, DistributedMap runningMap,
|
||||
DistributedMap completedMap,
|
||||
DistributedMap failureMap) {
|
||||
super(zkStateReader, myId, shardHandler, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
|
||||
lastProcessMessageResult = super.processMessage(message, operation);
|
||||
log.info("1 : "+System.currentTimeMillis());
|
||||
return lastProcessMessageResult;
|
||||
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,10 +119,12 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
runningMapMock = createMock(DistributedMap.class);
|
||||
completedMapMock = createMock(DistributedMap.class);
|
||||
failureMapMock = createMock(DistributedMap.class);
|
||||
shardHandlerFactoryMock = createMock(ShardHandlerFactory.class);
|
||||
shardHandlerMock = createMock(ShardHandler.class);
|
||||
zkStateReaderMock = createMock(ZkStateReader.class);
|
||||
clusterStateMock = createMock(ClusterState.class);
|
||||
solrZkClientMock = createMock(SolrZkClient.class);
|
||||
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -131,6 +133,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
runningMapMock = null;
|
||||
completedMapMock = null;
|
||||
failureMapMock = null;
|
||||
shardHandlerFactoryMock = null;
|
||||
shardHandlerMock = null;
|
||||
zkStateReaderMock = null;
|
||||
clusterStateMock = null;
|
||||
|
@ -145,12 +148,13 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
reset(runningMapMock);
|
||||
reset(completedMapMock);
|
||||
reset(failureMapMock);
|
||||
reset(shardHandlerFactoryMock);
|
||||
reset(shardHandlerMock);
|
||||
reset(zkStateReaderMock);
|
||||
reset(clusterStateMock);
|
||||
reset(solrZkClientMock);
|
||||
underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
|
||||
"1234", shardHandlerMock, ADMIN_PATH, workQueueMock, runningMapMock,
|
||||
"1234", shardHandlerFactoryMock, ADMIN_PATH, workQueueMock, runningMapMock,
|
||||
completedMapMock, failureMapMock);
|
||||
zkMap.clear();
|
||||
collectionsSet.clear();
|
||||
|
@ -163,6 +167,44 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
protected Set<String> commonMocks(int liveNodesCount) throws Exception {
|
||||
|
||||
shardHandlerFactoryMock.getShardHandler();
|
||||
expectLastCall().andAnswer(new IAnswer<ShardHandler>() {
|
||||
@Override
|
||||
public ShardHandler answer() throws Throwable {
|
||||
log.info("SHARDHANDLER");
|
||||
return shardHandlerMock;
|
||||
}
|
||||
}).anyTimes();
|
||||
workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Set.class), EasyMock.anyLong());
|
||||
expectLastCall().andAnswer(new IAnswer<List>() {
|
||||
@Override
|
||||
public List answer() throws Throwable {
|
||||
Object result;
|
||||
int count = 0;
|
||||
while ((result = queue.peek()) == null) {
|
||||
Thread.sleep(1000);
|
||||
count++;
|
||||
if (count > 1) return null;
|
||||
}
|
||||
|
||||
return Arrays.asList(result);
|
||||
}
|
||||
}).anyTimes();
|
||||
|
||||
workQueueMock.getTailId();
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
@Override
|
||||
public Object answer() throws Throwable {
|
||||
Object result = null;
|
||||
Iterator iter = queue.iterator();
|
||||
while(iter.hasNext()) {
|
||||
result = iter.next();
|
||||
}
|
||||
return result==null ? null : ((QueueEvent)result).getId();
|
||||
}
|
||||
}).anyTimes();
|
||||
|
||||
workQueueMock.peek(true);
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
@Override
|
||||
|
@ -191,7 +233,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
return queue.poll();
|
||||
}
|
||||
}).anyTimes();
|
||||
|
||||
|
||||
zkStateReaderMock.getClusterState();
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
@Override
|
||||
|
@ -207,8 +249,8 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
return solrZkClientMock;
|
||||
}
|
||||
}).anyTimes();
|
||||
|
||||
|
||||
|
||||
|
||||
clusterStateMock.getCollections();
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
@Override
|
||||
|
@ -239,7 +281,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
return new HashMap();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
solrZkClientMock.getZkClientTimeout();
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
@Override
|
||||
|
@ -378,7 +420,12 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
OverseerCollectionProcessor.MAX_SHARDS_PER_NODE,
|
||||
maxShardsPerNode.toString());
|
||||
}
|
||||
QueueEvent qe = new QueueEvent("id", ZkStateReader.toJSON(props), null);
|
||||
QueueEvent qe = new QueueEvent("id", ZkStateReader.toJSON(props), null){
|
||||
@Override
|
||||
public void setBytes(byte[] bytes) {
|
||||
lastProcessMessageResult = SolrResponse.deserialize( bytes);
|
||||
}
|
||||
};
|
||||
queue.add(qe);
|
||||
}
|
||||
|
||||
|
@ -542,20 +589,23 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
|
|||
replay(solrZkClientMock);
|
||||
replay(zkStateReaderMock);
|
||||
replay(clusterStateMock);
|
||||
replay(shardHandlerFactoryMock);
|
||||
replay(shardHandlerMock);
|
||||
|
||||
log.info("clusterstate " +clusterStateMock.hashCode());
|
||||
|
||||
log.info("clusterstate " + clusterStateMock.hashCode());
|
||||
|
||||
startComponentUnderTest();
|
||||
|
||||
issueCreateJob(numberOfSlices, replicationFactor, maxShardsPerNode, (createNodeListOption != CreateNodeListOptions.SEND_NULL)?createNodeList:null, (createNodeListOption != CreateNodeListOptions.DONT_SEND));
|
||||
issueCreateJob(numberOfSlices, replicationFactor, maxShardsPerNode, (createNodeListOption != CreateNodeListOptions.SEND_NULL) ? createNodeList : null, (createNodeListOption != CreateNodeListOptions.DONT_SEND));
|
||||
waitForEmptyQueue(10000);
|
||||
|
||||
if (collectionExceptedToBeCreated) {
|
||||
assertNotNull(underTest.lastProcessMessageResult.getResponse().toString(), underTest.lastProcessMessageResult);
|
||||
assertNotNull(lastProcessMessageResult.getResponse().toString(), lastProcessMessageResult);
|
||||
}
|
||||
verify(shardHandlerFactoryMock);
|
||||
verify(shardHandlerMock);
|
||||
|
||||
|
||||
if (collectionExceptedToBeCreated) {
|
||||
verifySubmitCaptures(submitCaptures, numberOfSlices, replicationFactor,
|
||||
createNodeList);
|
||||
|
|
|
@ -123,22 +123,4 @@ public class OverseerStatusTest extends BasicDistributedZkTest {
|
|||
|
||||
waitForThingsToLevelOut(15);
|
||||
}
|
||||
|
||||
private NamedList<Object> invokeCollectionApi(String... args) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
for (int i = 0; i < args.length - 1; i+=2) {
|
||||
params.add(args[i], args[i+1]);
|
||||
}
|
||||
request.setPath("/admin/collections");
|
||||
|
||||
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
|
||||
.getBaseURL();
|
||||
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
|
||||
|
||||
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
|
||||
baseServer.setConnectionTimeout(15000);
|
||||
baseServer.setSoTimeout(60000 * 5);
|
||||
return baseServer.request(request);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,11 @@ public class MockShardHandlerFactory extends ShardHandlerFactory implements Plug
|
|||
|
||||
@Override
|
||||
public void cancelAll() {}
|
||||
|
||||
@Override
|
||||
public ShardHandlerFactory getShardHandlerFactory() {
|
||||
return MockShardHandlerFactory.this;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ public class ZkStateReader {
|
|||
public static final String LEGACY_CLOUD = "legacyCloud";
|
||||
|
||||
public static final String URL_SCHEME = "urlScheme";
|
||||
|
||||
|
||||
private volatile ClusterState clusterState;
|
||||
|
||||
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
|
|||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
|
@ -1719,6 +1720,25 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
}
|
||||
}
|
||||
|
||||
protected NamedList<Object> invokeCollectionApi(String... args) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
for (int i = 0; i < args.length - 1; i+=2) {
|
||||
params.add(args[i], args[i+1]);
|
||||
}
|
||||
request.setPath("/admin/collections");
|
||||
|
||||
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
|
||||
.getBaseURL();
|
||||
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
|
||||
|
||||
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
|
||||
baseServer.setConnectionTimeout(15000);
|
||||
baseServer.setSoTimeout(60000 * 5);
|
||||
NamedList r = baseServer.request(request);
|
||||
baseServer.shutdown();
|
||||
return r;
|
||||
}
|
||||
|
||||
protected void createCollection(String collName,
|
||||
CloudSolrServer client,
|
||||
|
|
Loading…
Reference in New Issue