SOLR-6026: Also check work-queue while processing a REQUESTSTATUS Collection API Call

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1599248 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Anshum Gupta 2014-06-02 15:47:26 +00:00
parent 0d89f3f6cf
commit 22174ac432
5 changed files with 73 additions and 9 deletions

View File

@ -179,6 +179,9 @@ Other Changes
* SOLR-6116: Refactor DocRouter.getDocRouter to accept routerName as a String. (shalin)
* SOLR-6026: REQUESTSTATUS Collection API now also checks for submitted tasks which are
yet to begin execution.
Optimizations
----------------------

View File

@ -22,6 +22,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@ -118,7 +119,39 @@ public class DistributedQueue {
return orderedChildren;
}
/**
* Returns true if the queue contains a task with the specified async id.
*/
public boolean containsTaskWithRequestId(String requestId)
throws KeeperException, InterruptedException {
List<String> childNames = null;
try {
childNames = zookeeper.getChildren(dir, null, true);
} catch (KeeperException.NoNodeException e) {
throw e;
}
for (String childName : childNames) {
if (childName != null) {
try {
ZkNodeProps message = ZkNodeProps.load(zookeeper.getData(dir + "/" + childName, null, null, true));
if (message.containsKey(OverseerCollectionProcessor.ASYNC)) {
LOG.info(">>>> {}", message.get(OverseerCollectionProcessor.ASYNC));
if(message.get(OverseerCollectionProcessor.ASYNC).equals(requestId)) return true;
}
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
}
return false;
}
/**
* Return the head of the queue without modifying the queue.
*

View File

@ -51,6 +51,7 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
@ -291,6 +292,11 @@ public class CollectionsHandler extends RequestHandlerBase {
} else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
SimpleOrderedMap success = new SimpleOrderedMap();
success.add("state", "running");
success.add("msg", "found " + requestId + " in running tasks");
results.add("status", success);
} else if(overseerCollectionQueueContains(requestId)){
SimpleOrderedMap success = new SimpleOrderedMap();
success.add("state", "submitted");
success.add("msg", "found " + requestId + " in submitted tasks");
results.add("status", success);
} else {
@ -305,6 +311,11 @@ public class CollectionsHandler extends RequestHandlerBase {
}
}
private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
return collectionQueue.containsTaskWithRequestId(asyncId);
}
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@ -326,13 +337,13 @@ public class CollectionsHandler extends RequestHandlerBase {
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
overseerCollectionQueueContains(asyncId)) {
r.add("error", "Task with the same requestid already exists.");
} else {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m));
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
SolrResponse response = new OverseerSolrResponse(r);

View File

@ -55,7 +55,7 @@ public class AsyncMigrateRouteKeyTest extends MigrateRouteKeyTest {
params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
// This task takes long enough to run. Also check for the current state of the task to be running.
message = sendStatusRequestWithRetry(params, 2);
assertEquals("found " + asyncId + " in submitted tasks", message);
assertEquals("found " + asyncId + " in running tasks", message);
// Now wait until the task actually completes successfully/fails.
message = sendStatusRequestWithRetry(params, 20);
assertEquals("Task " + asyncId + " not found in completed tasks.",

View File

@ -64,6 +64,7 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
testParallelCollectionAPICalls();
testTaskExclusivity();
testDeduplicationOfSubmittedTasks();
testLongAndShortRunningParallelApiCalls();
}
@ -139,6 +140,25 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
}
}
private void testDeduplicationOfSubmittedTasks() throws IOException, SolrServerException {
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
CollectionAdminRequest.createCollection("ocptest_shardsplit2", 4, "conf1", server, "3000");
CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD1, server, "3001");
CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD2, server, "3002");
// Now submit another task with the same id. At this time, hopefully the previous 2002 should still be in the queue.
CollectionAdminResponse response = CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD1, server, "3002");
NamedList r = response.getResponse();
assertEquals("Duplicate request was supposed to exist but wasn't found. De-duplication of submitted task failed.",
"Task with the same requestid already exists.", r.get("error"));
for (int i=3001;i<=3002;i++) {
String state = getRequestStateAfterCompletion(i + "", 30, server);
assertTrue("Task " + i + " did not complete, final state: " + state,state.equals("completed"));
}
}
private void testLongAndShortRunningParallelApiCalls() throws InterruptedException, IOException, SolrServerException {
Thread indexThread = new Thread() {
@ -158,17 +178,14 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
indexThread.start();
try {
Thread.sleep(5000);
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
CollectionAdminRequest.splitShard("collection1", SHARD1, server, "2000");
String state = getRequestState("2000", server);
while (!state.equals("running")) {
while (state.equals("submitted")) {
state = getRequestState("2000", server);
if (state.equals("completed") || state.equals("failed"))
break;
Thread.sleep(100);
Thread.sleep(10);
}
assertTrue("SplitShard task [2000] was supposed to be in [running] but isn't. It is [" + state + "]", state.equals("running"));