mirror of https://github.com/apache/lucene.git
SOLR-5908: Make the REQUESTSTATUS Collection API call non-blocking and non-blocked.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1583532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
225a9966b6
commit
aacd7ee80f
|
@ -471,8 +471,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
processRoleCommand(message, operation);
|
||||
} else if (ADDREPLICA.isEqual(operation)) {
|
||||
addReplica(zkStateReader.getClusterState(), message, results);
|
||||
} else if (REQUESTSTATUS.equals(operation)) {
|
||||
requestStatus(message, results);
|
||||
} else if (OVERSEERSTATUS.isEqual(operation)) {
|
||||
getOverseerStatus(message, results);
|
||||
} else if(LIST.isEqual(operation)) {
|
||||
|
@ -1494,40 +1492,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
} while (srsp != null);
|
||||
}
|
||||
|
||||
private void requestStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
|
||||
log.info("Request status invoked");
|
||||
String requestId = message.getStr(REQUESTID);
|
||||
|
||||
// Special taskId (-1), clears up the request state maps.
|
||||
if(requestId.equals("-1")) {
|
||||
completedMap.clear();
|
||||
failureMap.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
if(completedMap.contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "completed");
|
||||
success.add("msg", "found " + requestId + " in completed tasks");
|
||||
results.add("status", success);
|
||||
} else if (runningMap.contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "running");
|
||||
success.add("msg", "found " + requestId + " in submitted tasks");
|
||||
results.add("status", success);
|
||||
} else if (failureMap.contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "failed");
|
||||
success.add("msg", "found " + requestId + " in failed tasks");
|
||||
results.add("status", success);
|
||||
} else {
|
||||
SimpleOrderedMap failure = new SimpleOrderedMap();
|
||||
failure.add("state", "notfound");
|
||||
failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
|
||||
results.add("status", failure);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
|
||||
log.info("Delete shard invoked");
|
||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
|
|
|
@ -265,11 +265,43 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
|
||||
log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
|
||||
req.getParams().required().check(REQUESTID);
|
||||
Map<String, Object> props = new HashMap<String, Object>();
|
||||
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.REQUESTSTATUS);
|
||||
props.put(REQUESTID, req.getParams().get(REQUESTID));
|
||||
ZkNodeProps m = new ZkNodeProps(props);
|
||||
handleResponse(OverseerCollectionProcessor.REQUESTSTATUS, m, rsp);
|
||||
|
||||
String requestId = req.getParams().get(REQUESTID);
|
||||
|
||||
if (requestId.equals("-1")) {
|
||||
// Special taskId (-1), clears up the request state maps.
|
||||
if(requestId.equals("-1")) {
|
||||
coreContainer.getZkController().getOverseerCompletedMap().clear();
|
||||
coreContainer.getZkController().getOverseerFailureMap().clear();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
NamedList<Object> results = new NamedList<>();
|
||||
if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "completed");
|
||||
success.add("msg", "found " + requestId + " in completed tasks");
|
||||
results.add("status", success);
|
||||
} else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "running");
|
||||
success.add("msg", "found " + requestId + " in submitted tasks");
|
||||
results.add("status", success);
|
||||
} else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
|
||||
SimpleOrderedMap success = new SimpleOrderedMap();
|
||||
success.add("state", "failed");
|
||||
success.add("msg", "found " + requestId + " in failed tasks");
|
||||
results.add("status", success);
|
||||
} else {
|
||||
SimpleOrderedMap failure = new SimpleOrderedMap();
|
||||
failure.add("state", "notfound");
|
||||
failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
|
||||
results.add("status", failure);
|
||||
}
|
||||
SolrResponse response = new OverseerSolrResponse(results);
|
||||
|
||||
rsp.getValues().addAll(response.getResponse());
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResponse(String operation, ZkNodeProps m,
|
||||
|
|
|
@ -53,7 +53,11 @@ public class AsyncMigrateRouteKeyTest extends MigrateRouteKeyTest {
|
|||
params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
|
||||
params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
|
||||
message = sendStatusRequestWithRetry(params, 10);
|
||||
// This task takes long enough to run. Also check for the current state of the task to be running.
|
||||
message = sendStatusRequestWithRetry(params, 2);
|
||||
assertEquals("found " + asyncId + " in submitted tasks", message);
|
||||
// Now wait until the task actually completes successfully/fails.
|
||||
message = sendStatusRequestWithRetry(params, 20);
|
||||
assertEquals("Task " + asyncId + " not found in completed tasks.",
|
||||
"found " + asyncId + " in completed tasks", message);
|
||||
}
|
||||
|
@ -92,7 +96,6 @@ public class AsyncMigrateRouteKeyTest extends MigrateRouteKeyTest {
|
|||
|
||||
if (state.equals("completed") || state.equals("failed"))
|
||||
return (String) status.get("msg");
|
||||
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
Loading…
Reference in New Issue