SOLR-7294: Migrate API fails with 'Invalid status request: notfoundretried 6times' message

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1668956 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-03-24 19:04:40 +00:00
parent 2cba11bfa6
commit ad482ef4a5
4 changed files with 43 additions and 70 deletions

View File

@ -289,6 +289,9 @@ Bug Fixes
* SOLR-7248: In legacyCloud=false mode we should check if the core was hosted on the same node before registering it
(Varun Thacker, Noble Paul, Mark Miller)
* SOLR-7294: Migrate API fails with 'Invalid status request: notfoundretried 6times' message.
(Jessica Cheng Mallet, shalin)
Optimizations
----------------------

View File

@ -1594,9 +1594,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
cmd.setOnlyIfLeader(true);
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
setupAsyncRequest(asyncId, requestMap, p, nodeName);
sendShardRequest(nodeName, p, shardHandler);
sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
}
collectShardResponses(results, true,
@ -1619,9 +1617,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
params.add(CoreAdminParams.TARGET_CORE, subShardName);
}
params.set(CoreAdminParams.RANGES, rangesStr);
setupAsyncRequest(asyncId, requestMap, params, parentShardLeader.getNodeName());
sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler);
sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command",
shardHandler);
@ -1639,9 +1636,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, subShardName);
setupAsyncRequest(asyncId, requestMap, params, nodeName);
sendShardRequest(nodeName, params, shardHandler);
sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
}
collectShardResponses(results, true,
@ -1722,9 +1717,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
cmd.setOnlyIfLeader(true);
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
setupAsyncRequest(asyncId, requestMap, p, nodeName);
sendShardRequest(nodeName, p, shardHandler);
sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
}
}
@ -2031,9 +2024,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
String nodeName = targetLeader.getNodeName();
setupAsyncRequest(asyncId, requestMap, params, nodeName);
sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates",
shardHandler);
@ -2110,7 +2102,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler);
// we don't want this to happen asynchronously
sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
collectShardResponses(results, true,
"MIGRATE failed to create temp collection leader or timed out waiting for it to come up",
@ -2126,9 +2119,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
String tempNodeName = sourceLeader.getNodeName();
setupAsyncRequest(asyncId, requestMap, params, tempNodeName);
sendShardRequest(tempNodeName, params, shardHandler);
sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command", shardHandler);
completeAsyncRequest(asyncId, requestMap, results);
@ -2172,9 +2163,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
cmd.setOnlyIfLeader(true);
params = new ModifiableSolrParams(cmd.getParams());
setupAsyncRequest(asyncId, requestMap, params, tempSourceLeader.getNodeName());
sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler);
sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
collectShardResponses(results, true,
"MIGRATE failed to create temp collection replica or timed out waiting for them to come up",
@ -2189,9 +2178,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
setupAsyncRequest(asyncId, requestMap, params, sourceLeader.getNodeName());
sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
collectShardResponses(results, true,
"MIGRATE failed to merge " + tempCollectionReplica2 +
" to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(),
@ -2203,9 +2190,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
collectShardResponses(results, true,
"MIGRATE failed to request node to apply buffered updates",
shardHandler);
@ -2231,14 +2217,6 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
}
private void setupAsyncRequest(String asyncId, HashMap<String, String> requestMap, ModifiableSolrParams params, String nodeName) {
if(asyncId != null) {
String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
params.set(ASYNC, coreAdminAsyncId);
requestMap.put(nodeName, coreAdminAsyncId);
}
}
private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
if (a == null || b == null || !a.overlaps(b)) {
return null;
@ -2253,7 +2231,13 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
}
private void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler) {
private void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
if (asyncId != null) {
String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
params.set(ASYNC, coreAdminAsyncId);
requestMap.put(nodeName, coreAdminAsyncId);
}
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
@ -2431,8 +2415,11 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
setupAsyncRequest(async, requestMap, params, nodeName);
if (async != null) {
String coreAdminAsyncId = async + Math.abs(System.nanoTime());
params.add(ASYNC, coreAdminAsyncId);
requestMap.put(nodeName, coreAdminAsyncId);
}
addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
@ -2593,8 +2580,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
// For tracking async calls.
HashMap<String, String> requestMap = new HashMap<>();
setupAsyncRequest(asyncId, requestMap, params, node);
sendShardRequest(node, params, shardHandler);
sendShardRequest(node, params, shardHandler, asyncId, requestMap);
collectShardResponses(results, true,
"ADDREPLICA failed to create replica", shardHandler);
@ -2779,7 +2765,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
NamedList results = new NamedList();
processResponse(results, srsp);
String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
if(r.equals("running")) {
if (r.equals("running")) {
log.debug("The task is still RUNNING, continuing to wait.");
try {
Thread.sleep(1000);
@ -2788,7 +2774,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
continue;
} else if(r.equals("completed")) {
} else if (r.equals("completed")) {
log.debug("The task is COMPLETED, returning");
return srsp.getSolrResponse().getResponse();
} else if (r.equals("failed")) {
@ -2797,15 +2783,15 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
return srsp.getSolrResponse().getResponse();
} else if (r.equals("notfound")) {
log.debug("The task is notfound, retry");
if(counter++ < 5) {
if (counter++ < 5) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
break;
}
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request: " + srsp.getSolrResponse().getResponse().get("STATUS") +
"retried " + counter + "times");
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request for requestId: " + requestId + "" + srsp.getSolrResponse().getResponse().get("STATUS") +
"retried " + counter + "times");
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
}

View File

@ -94,8 +94,7 @@ import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
public class CoreAdminHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
protected final CoreContainer coreContainer;
protected static HashMap<String, Map<String, TaskObject>> requestStatusMap =
new HashMap<String,Map<String, TaskObject>>();
protected final Map<String, Map<String, TaskObject>> requestStatusMap;
protected final ExecutorService parallelExecutor = Executors.newFixedThreadPool(50,
new DefaultSolrThreadFactory("parallelCoreAdminExecutor"));
@ -108,17 +107,16 @@ public class CoreAdminHandler extends RequestHandlerBase {
public static String RESPONSE_STATUS = "STATUS";
public static String RESPONSE_MESSAGE = "msg";
static {
requestStatusMap.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
requestStatusMap.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
requestStatusMap.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
}
public CoreAdminHandler() {
super();
// Unlike most request handlers, CoreContainer initialization
// should happen in the constructor...
this.coreContainer = null;
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
requestStatusMap = Collections.unmodifiableMap(map);
}
@ -129,6 +127,11 @@ public class CoreAdminHandler extends RequestHandlerBase {
*/
public CoreAdminHandler(final CoreContainer coreContainer) {
this.coreContainer = coreContainer;
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
requestStatusMap = Collections.unmodifiableMap(map);
}

View File

@ -50,25 +50,6 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest {
schemaString = "schema15.xml"; // we need a string id
}
@Override
public void distribSetUp() throws Exception {
super.distribSetUp();
System.setProperty("numShards", Integer.toString(sliceCount));
System.setProperty("solr.xml.persist", "true");
}
@Override
public void distribTearDown() throws Exception {
super.distribTearDown();
System.clearProperty("zkHost");
System.clearProperty("numShards");
System.clearProperty("solr.xml.persist");
// insurance
DirectUpdateHandler2.commitOnClose = true;
}
@Test
public void test() throws Exception {
waitForThingsToLevelOut(15);