mirror of https://github.com/apache/lucene.git
SOLR-8534: Add generic support for collection APIs to be async
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1725474 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f0362f6248
commit
a2e5c37c85
|
@ -318,6 +318,10 @@ New Features
|
|||
* SOLR-8312: Add domain size and numBuckets to facet telemetry info (facet debug info
|
||||
for the new Facet Module). (Michael Sun, yonik)
|
||||
|
||||
* SOLR-8534: Add generic support for collection APIs to be async. Thus more actions benefit from having async
|
||||
support. The commands that additionally get async support are: delete/reload collection, create/delete alias,
|
||||
create/delete shard, delete replica, add/delete replica property, add/remove role,
|
||||
overseer status, balance shard unique, rebalance leaders, modify collection, migrate state format (Varun Thacker)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
|
|
@ -224,9 +224,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
deleteCollection(message, results);
|
||||
break;
|
||||
case RELOAD:
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
|
||||
collectionCmd(zkStateReader.getClusterState(), message, params, results, Replica.State.ACTIVE);
|
||||
reloadCollection(message, results);
|
||||
break;
|
||||
case CREATEALIAS:
|
||||
createAlias(zkStateReader.getAliases(), message);
|
||||
|
@ -303,6 +301,18 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
return new OverseerSolrResponse(results);
|
||||
}
|
||||
|
||||
private void reloadCollection(ZkNodeProps message, NamedList results) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
|
||||
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (asyncId != null) {
|
||||
requestMap = new HashMap<>();
|
||||
}
|
||||
collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
|
||||
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
|
||||
|
@ -670,7 +680,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
Slice slice = coll.getSlice(shard);
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
if (slice == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"Invalid shard name : " + shard + " in collection : " + collectionName);
|
||||
|
@ -691,36 +700,34 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
"Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
|
||||
+ " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
|
||||
}
|
||||
|
||||
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
|
||||
// assume the core exists and try to unload it
|
||||
Map m = makeMap("qt", adminPath, CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE,
|
||||
core, CoreAdminParams.DELETE_INSTANCE_DIR, "true", CoreAdminParams.DELETE_DATA_DIR, "true");
|
||||
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
sreq.purpose = 1;
|
||||
sreq.shards = new String[] {baseUrl};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = new ModifiableSolrParams(new MapSolrParams(m));
|
||||
try {
|
||||
shardHandler.submit(sreq, baseUrl, sreq.params);
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception trying to unload core " + sreq, e);
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (asyncId != null) {
|
||||
requestMap = new HashMap<>(1, 1.0f);
|
||||
}
|
||||
|
||||
collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results, false, null,
|
||||
shardHandler);
|
||||
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;// check if the core unload removed the
|
||||
// corenode zk enry
|
||||
deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.add(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
|
||||
params.add(CoreAdminParams.CORE, core);
|
||||
params.add(CoreAdminParams.DELETE_INSTANCE_DIR, "true");
|
||||
params.add(CoreAdminParams.DELETE_DATA_DIR, "true");
|
||||
|
||||
sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
|
||||
processResponses(results, shardHandler, false, null, asyncId, requestMap);
|
||||
|
||||
//check if the core unload removed the corenode zk entry
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;
|
||||
|
||||
// try and ensure core info is removed from cluster state
|
||||
deleteCoreNode(collectionName, replicaName, replica, core);
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
|
||||
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
|
||||
|
||||
}
|
||||
|
||||
private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
|
||||
|
@ -760,21 +767,23 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
}
|
||||
|
||||
private void deleteCollection(ZkNodeProps message, NamedList results)
|
||||
throws KeeperException, InterruptedException {
|
||||
private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
|
||||
final String collection = message.getStr(NAME);
|
||||
try {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
|
||||
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
|
||||
params.set(CoreAdminParams.DELETE_DATA_DIR, true);
|
||||
collectionCmd(zkStateReader.getClusterState(), message, params, results,
|
||||
null);
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
|
||||
DELETE.toLower(), NAME, collection);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
|
||||
Utils.toJSON(m));
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (asyncId != null) {
|
||||
requestMap = new HashMap<>();
|
||||
}
|
||||
collectionCmd(message, params, results, null, asyncId, requestMap);
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
|
||||
// wait for a while until we don't see the collection
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
|
@ -877,7 +886,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
boolean success = false;
|
||||
Aliases aliases = null;
|
||||
Aliases aliases;
|
||||
while (! timeout.hasTimedOut()) {
|
||||
aliases = zkStateReader.getAliases();
|
||||
String collections = aliases.getCollectionAlias(name);
|
||||
|
@ -942,7 +951,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
throws KeeperException, InterruptedException {
|
||||
String collectionName = message.getStr(COLLECTION_PROP);
|
||||
String sliceName = message.getStr(SHARD_ID_PROP);
|
||||
|
||||
|
||||
log.info("Create shard invoked: {}", message);
|
||||
if (collectionName == null || sliceName == null)
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
|
||||
|
@ -968,6 +977,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
|
||||
|
||||
String configName = message.getStr(COLL_CONF);
|
||||
|
||||
String async = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (async != null) {
|
||||
requestMap = new HashMap<>(repFactor, 1.0f);
|
||||
}
|
||||
|
||||
for (int j = 1; j <= repFactor; j++) {
|
||||
String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
|
||||
String shardName = collectionName + "_" + sliceName + "_replica" + j;
|
||||
|
@ -977,27 +993,17 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
// Need to create new params for each request
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
|
||||
params.set(CoreAdminParams.NAME, shardName);
|
||||
params.set(COLL_CONF, configName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, sliceName);
|
||||
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
|
||||
addPropertyParams(message, params);
|
||||
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
params.set("qt", adminPath);
|
||||
sreq.purpose = 1;
|
||||
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
|
||||
sreq.shards = new String[] {replica};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
|
||||
shardHandler.submit(sreq, replica, sreq.params);
|
||||
|
||||
|
||||
sendShardRequest(nodeName, params, shardHandler, async, requestMap);
|
||||
}
|
||||
|
||||
processResponses(results, shardHandler);
|
||||
processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap);
|
||||
|
||||
log.info("Finished create command on all shards for collection: " + collectionName);
|
||||
|
||||
|
@ -1016,7 +1022,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||
|
||||
Slice parentSlice = null;
|
||||
Slice parentSlice;
|
||||
|
||||
if (slice == null) {
|
||||
if (router instanceof CompositeIdRouter) {
|
||||
|
@ -1157,13 +1163,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
// do not abort splitshard if the unloading fails
|
||||
// this can happen because the replicas created previously may be down
|
||||
// the only side effect of this is that the sub shard may end up having more replicas than we want
|
||||
collectShardResponses(results, false, null, shardHandler);
|
||||
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
HashMap<String,String> requestMap = new HashMap<>();
|
||||
Map<String,String> requestMap = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
String subSlice = subSlices.get(i);
|
||||
|
@ -1208,10 +1209,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), results);
|
||||
}
|
||||
|
||||
collectShardResponses(results, true, "SPLITSHARD failed to create subshard leaders", shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
|
||||
|
||||
for (String subShardName : subShardNames) {
|
||||
// wait for parent leader to acknowledge the sub-shard core
|
||||
|
@ -1228,11 +1227,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
|
||||
sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
|
||||
collectShardResponses(results, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
|
||||
shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
|
||||
asyncId, requestMap);
|
||||
|
||||
log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
|
||||
+ " on: " + parentShardLeader);
|
||||
|
@ -1250,9 +1247,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
params.set(CoreAdminParams.RANGES, rangesStr);
|
||||
|
||||
sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
|
||||
collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", shardHandler);
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
|
||||
requestMap);
|
||||
|
||||
log.info("Index on shard: " + nodeName + " split into two successfully");
|
||||
|
||||
|
@ -1268,11 +1265,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
|
||||
collectShardResponses(results, true, "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
|
||||
shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
|
||||
" to apply buffered updates", asyncId, requestMap);
|
||||
|
||||
log.info("Successfully applied buffered updates on : " + subShardNames);
|
||||
|
||||
|
@ -1380,11 +1375,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
for (Map<String, Object> replica : replicas) {
|
||||
addReplica(clusterState, new ZkNodeProps(replica), results);
|
||||
}
|
||||
|
||||
collectShardResponses(results, true,
|
||||
"SPLITSHARD failed to create subshard replicas", shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
|
||||
|
||||
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
|
||||
|
||||
|
@ -1481,26 +1473,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
);
|
||||
}
|
||||
|
||||
private void collectShardResponses(NamedList results, boolean abortOnError,
|
||||
String msgOnError,
|
||||
ShardHandler shardHandler) {
|
||||
ShardResponse srsp;
|
||||
do {
|
||||
srsp = shardHandler.takeCompletedOrError();
|
||||
if (srsp != null) {
|
||||
processResponse(results, srsp);
|
||||
Throwable exception = srsp.getException();
|
||||
if (abortOnError && exception != null) {
|
||||
// drain pending requests
|
||||
while (srsp != null) {
|
||||
srsp = shardHandler.takeCompletedOrError();
|
||||
}
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
|
||||
}
|
||||
}
|
||||
} while (srsp != null);
|
||||
}
|
||||
|
||||
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
|
||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
|
@ -1525,15 +1497,21 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
+ ". Only non-active (or custom-hashed) slices can be deleted.");
|
||||
}
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (asyncId != null) {
|
||||
requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f);
|
||||
}
|
||||
|
||||
try {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
|
||||
params.set(CoreAdminParams.DELETE_INDEX, "true");
|
||||
sliceCmd(clusterState, params, null, slice, shardHandler);
|
||||
|
||||
processResponses(results, shardHandler);
|
||||
|
||||
sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
|
||||
|
||||
processResponses(results, shardHandler, true, "Failed to delete shard", asyncId, requestMap);
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
|
||||
collection, ZkStateReader.SHARD_ID_PROP, sliceId);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
|
@ -1646,21 +1624,17 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
|
||||
// For tracking async calls.
|
||||
HashMap<String, String> requestMap = new HashMap<String, String>();
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
|
||||
log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
|
||||
+ targetLeader.getStr("core") + " to buffer updates");
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
|
||||
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
|
||||
String nodeName = targetLeader.getNodeName();
|
||||
|
||||
sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
|
||||
collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates",
|
||||
shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
|
||||
|
@ -1671,8 +1645,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
"targetCollection", targetCollection.getName(),
|
||||
"expireAt", RoutingRule.makeExpiryAt(timeout));
|
||||
log.info("Adding routing rule: " + m);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
|
||||
Utils.toJSON(m));
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
|
||||
// wait for a while until we see the new rule
|
||||
log.info("Waiting to see routing rule updated in clusterstate");
|
||||
|
@ -1707,7 +1680,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
NUM_SLICES, 1,
|
||||
COLL_CONF, configName,
|
||||
CREATE_NODE_SET, sourceLeader.getNodeName());
|
||||
if(asyncId != null) {
|
||||
if (asyncId != null) {
|
||||
String internalAsyncId = asyncId + Math.abs(System.nanoTime());
|
||||
props.put(ASYNC, internalAsyncId);
|
||||
}
|
||||
|
@ -1734,9 +1707,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
// we don't want this to happen asynchronously
|
||||
sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
|
||||
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to create temp collection leader or timed out waiting for it to come up",
|
||||
shardHandler);
|
||||
processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
|
||||
" or timed out waiting for it to come up", asyncId, requestMap);
|
||||
|
||||
log.info("Asking source leader to split index");
|
||||
params = new ModifiableSolrParams();
|
||||
|
@ -1749,8 +1721,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
String tempNodeName = sourceLeader.getNodeName();
|
||||
|
||||
sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
|
||||
collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command", shardHandler);
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
|
||||
|
||||
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
|
||||
tempSourceCollectionName, targetLeader.getNodeName());
|
||||
|
@ -1773,11 +1744,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
addReplica(clusterState, new ZkNodeProps(props), results);
|
||||
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to create replica of temporary collection in target leader node.",
|
||||
shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
|
||||
"temporary collection in target leader node.", asyncId, requestMap);
|
||||
|
||||
coreNodeName = waitForCoreNodeName(tempSourceCollectionName,
|
||||
targetLeader.getNodeName(), tempCollectionReplica2);
|
||||
|
@ -1794,11 +1762,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to create temp collection replica or timed out waiting for them to come up",
|
||||
shardHandler);
|
||||
processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
|
||||
" replica or timed out waiting for them to come up", asyncId, requestMap);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
log.info("Successfully created replica of temp source collection on target leader node");
|
||||
|
||||
log.info("Requesting merge of temp source collection replica to target leader");
|
||||
|
@ -1808,12 +1774,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
|
||||
|
||||
sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to merge " + tempCollectionReplica2 +
|
||||
" to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(),
|
||||
shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
|
||||
+ targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
|
||||
processResponses(results, shardHandler, true, msg, asyncId, requestMap);
|
||||
|
||||
log.info("Asking target leader to apply buffered updates");
|
||||
params = new ModifiableSolrParams();
|
||||
|
@ -1821,11 +1784,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
|
||||
|
||||
sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to request node to apply buffered updates",
|
||||
shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
|
||||
asyncId, requestMap);
|
||||
|
||||
try {
|
||||
log.info("Deleting temporary collection: " + tempSourceCollectionName);
|
||||
|
@ -1839,13 +1799,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
private void completeAsyncRequest(String asyncId, HashMap<String, String> requestMap, NamedList results) {
|
||||
if(asyncId != null) {
|
||||
waitForAsyncCallsToComplete(requestMap, results);
|
||||
requestMap.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
|
||||
if (a == null || b == null || !a.overlaps(b)) {
|
||||
return null;
|
||||
|
@ -1867,11 +1820,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
}
|
||||
|
||||
public static void sendShardRequest(String nodeName, ModifiableSolrParams params,
|
||||
ShardHandler shardHandler, String asyncId,
|
||||
Map<String, String> requestMap,
|
||||
String adminPath, ZkStateReader zkStateReader
|
||||
) {
|
||||
public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
|
||||
String asyncId, Map<String, String> requestMap, String adminPath,
|
||||
ZkStateReader zkStateReader) {
|
||||
if (asyncId != null) {
|
||||
String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
|
||||
params.set(ASYNC, coreAdminAsyncId);
|
||||
|
@ -2029,7 +1980,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
|
||||
// For tracking async calls.
|
||||
HashMap<String, String> requestMap = new HashMap<String, String>();
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
|
||||
|
||||
log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
|
||||
|
@ -2098,11 +2049,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
processResponses(results, shardHandler);
|
||||
processResponses(results, shardHandler, false, null, async, requestMap);
|
||||
|
||||
completeAsyncRequest(async, requestMap, results);
|
||||
|
||||
log.info("Finished create command on all shards for collection: "
|
||||
log.debug("Finished create command on all shards for collection: "
|
||||
+ collectionName);
|
||||
|
||||
} catch (SolrException ex) {
|
||||
|
@ -2263,22 +2212,37 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
addPropertyParams(message, params);
|
||||
|
||||
// For tracking async calls.
|
||||
HashMap<String,String> requestMap = new HashMap<>();
|
||||
Map<String,String> requestMap = new HashMap<>();
|
||||
sendShardRequest(node, params, shardHandler, asyncId, requestMap);
|
||||
|
||||
collectShardResponses(results, true, "ADDREPLICA failed to create replica", shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
|
||||
}
|
||||
|
||||
private void processResponses(NamedList results, ShardHandler shardHandler) {
|
||||
|
||||
private void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
|
||||
String asyncId, Map<String, String> requestMap) {
|
||||
//Processes all shard responses
|
||||
ShardResponse srsp;
|
||||
do {
|
||||
srsp = shardHandler.takeCompletedOrError();
|
||||
if (srsp != null) {
|
||||
processResponse(results, srsp);
|
||||
Throwable exception = srsp.getException();
|
||||
if (abortOnError && exception != null) {
|
||||
// drain pending requests
|
||||
while (srsp != null) {
|
||||
srsp = shardHandler.takeCompletedOrError();
|
||||
}
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
|
||||
}
|
||||
}
|
||||
} while (srsp != null);
|
||||
|
||||
//If request is async wait for the core admin to complete before returning
|
||||
if (asyncId != null) {
|
||||
waitForAsyncCallsToComplete(requestMap, results);
|
||||
requestMap.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
|
||||
|
@ -2332,47 +2296,36 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
}
|
||||
|
||||
private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, Replica.State stateMatcher) {
|
||||
private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
|
||||
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
|
||||
log.info("Executing Collection Cmd : " + params);
|
||||
String collectionName = message.getStr(NAME);
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
|
||||
for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
|
||||
Slice slice = entry.getValue();
|
||||
sliceCmd(clusterState, params, stateMatcher, slice, shardHandler);
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
|
||||
processResponses(results, shardHandler);
|
||||
processResponses(results, shardHandler, false, null, asyncId, requestMap);
|
||||
|
||||
}
|
||||
|
||||
private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
|
||||
Slice slice, ShardHandler shardHandler) {
|
||||
Map<String,Replica> shards = slice.getReplicasMap();
|
||||
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
|
||||
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
|
||||
final ZkNodeProps node = shardEntry.getValue();
|
||||
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP))
|
||||
&& (stateMatcher == null || Replica.State.getState(node.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
|
||||
Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
|
||||
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
|
||||
&& (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
|
||||
|
||||
// For thread safety, only simple clone the ModifiableSolrParams
|
||||
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
|
||||
cloneParams.add(params);
|
||||
cloneParams.set(CoreAdminParams.CORE, node.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
|
||||
String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
|
||||
// yes, they must use same admin handler path everywhere...
|
||||
cloneParams.set("qt", adminPath);
|
||||
sreq.purpose = 1;
|
||||
sreq.shards = new String[] {replica};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = cloneParams;
|
||||
log.info("Collection Admin sending CoreAdmin cmd to " + replica
|
||||
+ " params:" + sreq.params);
|
||||
shardHandler.submit(sreq, replica, sreq.params);
|
||||
sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2413,7 +2366,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
|
||||
for(String k:requestMap.keySet()) {
|
||||
for (String k:requestMap.keySet()) {
|
||||
log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
|
||||
results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
|
||||
}
|
||||
|
|
|
@ -72,7 +72,6 @@ import org.apache.solr.core.CoreContainer;
|
|||
import org.apache.solr.handler.BlobHandler;
|
||||
import org.apache.solr.handler.RequestHandlerBase;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
|
@ -169,23 +168,26 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
String a = params.get(CoreAdminParams.ACTION);
|
||||
if (a != null) {
|
||||
CollectionAction action = CollectionAction.get(a);
|
||||
if (action == null)
|
||||
if (action == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
|
||||
}
|
||||
CollectionOperation operation = CollectionOperation.get(action);
|
||||
log.info("Invoked Collection Action :{} with params {} ", action.toLower(), req.getParamString());
|
||||
Map<String, Object> result = operation.call(req, rsp, this);
|
||||
if (result != null) {
|
||||
result.put(QUEUE_OPERATION, operation.action.toLower());
|
||||
ZkNodeProps props = new ZkNodeProps(result);
|
||||
if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), props, rsp, operation.timeOut);
|
||||
else Overseer.getInQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
|
||||
|
||||
Map<String, Object> props = operation.call(req, rsp, this);
|
||||
String asyncId = req.getParams().get(ASYNC);
|
||||
if (props != null) {
|
||||
if (asyncId != null) {
|
||||
props.put(ASYNC, asyncId);
|
||||
}
|
||||
props.put(QUEUE_OPERATION, operation.action.toLower());
|
||||
ZkNodeProps zkProps = new ZkNodeProps(props);
|
||||
if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), zkProps, rsp, operation.timeOut);
|
||||
else Overseer.getInQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
|
||||
}
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param");
|
||||
|
||||
}
|
||||
|
||||
rsp.setHttpCaching(false);
|
||||
}
|
||||
|
||||
|
@ -315,7 +317,6 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
MAX_SHARDS_PER_NODE,
|
||||
CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE,
|
||||
SHARDS_PROP,
|
||||
ASYNC,
|
||||
STATE_FORMAT,
|
||||
AUTO_ADD_REPLICAS,
|
||||
RULE,
|
||||
|
@ -366,7 +367,6 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
|
||||
throws Exception {
|
||||
return req.getParams().required().getAll(null, NAME);
|
||||
|
||||
}
|
||||
},
|
||||
SYNCSHARD_OP(SYNCSHARD) {
|
||||
|
@ -381,7 +381,6 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
|
||||
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
|
||||
|
||||
;
|
||||
try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) {
|
||||
client.setConnectionTimeout(15000);
|
||||
client.setSoTimeout(60000);
|
||||
|
@ -436,8 +435,7 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
COLLECTION_PROP,
|
||||
SHARD_ID_PROP,
|
||||
"split.key",
|
||||
CoreAdminParams.RANGES,
|
||||
ASYNC);
|
||||
CoreAdminParams.RANGES);
|
||||
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
|
||||
}
|
||||
},
|
||||
|
@ -453,7 +451,6 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
@Override
|
||||
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
|
||||
forceLeaderElection(req, handler);
|
||||
|
||||
return null;
|
||||
}
|
||||
},
|
||||
|
@ -468,7 +465,7 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections");
|
||||
req.getParams().getAll(map,
|
||||
REPLICATION_FACTOR,
|
||||
CREATE_NODE_SET, ASYNC);
|
||||
CREATE_NODE_SET);
|
||||
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
|
||||
}
|
||||
},
|
||||
|
@ -479,14 +476,14 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
COLLECTION_PROP,
|
||||
SHARD_ID_PROP,
|
||||
REPLICA_PROP);
|
||||
return req.getParams().getAll(map, ASYNC, ONLY_IF_DOWN);
|
||||
return req.getParams().getAll(map, ONLY_IF_DOWN);
|
||||
}
|
||||
},
|
||||
MIGRATE_OP(MIGRATE) {
|
||||
@Override
|
||||
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
|
||||
Map<String, Object> map = req.getParams().required().getAll(null, COLLECTION_PROP, "split.key", "target.collection");
|
||||
return req.getParams().getAll(map, "forward.timeout", ASYNC);
|
||||
return req.getParams().getAll(map, "forward.timeout");
|
||||
}
|
||||
},
|
||||
ADDROLE_OP(ADDROLE) {
|
||||
|
@ -586,8 +583,7 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
_ROUTE_,
|
||||
CoreAdminParams.NAME,
|
||||
INSTANCE_DIR,
|
||||
DATA_DIR,
|
||||
ASYNC);
|
||||
DATA_DIR);
|
||||
return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
|
||||
}
|
||||
},
|
||||
|
@ -687,8 +683,7 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
prop = COLL_PROP_PREFIX + prop;
|
||||
}
|
||||
|
||||
if (!shardUnique &&
|
||||
!SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) {
|
||||
if (!shardUnique && !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
|
||||
+ " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " +
|
||||
" Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique));
|
||||
|
|
|
@ -17,12 +17,22 @@ package org.apache.solr.cloud;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -83,4 +93,138 @@ public class CollectionsAPIAsyncDistributedZkTest extends AbstractFullDistribZkT
|
|||
assertEquals("Shard split did not complete. Last recorded state: " + state, "completed", state);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncRequests() throws Exception {
|
||||
String collection = "testAsyncOperations";
|
||||
|
||||
Create createCollectionRequest = new Create()
|
||||
.setCollectionName(collection)
|
||||
.setNumShards(1)
|
||||
.setRouterName("implicit")
|
||||
.setShards("shard1")
|
||||
.setConfigName("conf1")
|
||||
.setAsyncId("42");
|
||||
CollectionAdminResponse response = createCollectionRequest.process(cloudClient);
|
||||
assertEquals("42", response.getResponse().get("requestid"));
|
||||
String state = getRequestStateAfterCompletion("42", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("CreateCollection task did not complete!", "completed", state);
|
||||
|
||||
//Add a few documents to shard1
|
||||
int numDocs = TestUtil.nextInt(random(), 10, 100);
|
||||
List<SolrInputDocument> docs = new ArrayList<>(numDocs);
|
||||
for (int i=0; i<numDocs; i++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", i);
|
||||
doc.addField("_route_", "shard1");
|
||||
docs.add(doc);
|
||||
}
|
||||
cloudClient.add(collection, docs);
|
||||
cloudClient.commit(collection);
|
||||
|
||||
SolrQuery query = new SolrQuery("*:*");
|
||||
query.set("shards", "shard1");
|
||||
assertEquals(numDocs, cloudClient.query(collection, query).getResults().getNumFound());
|
||||
|
||||
CollectionAdminRequest.Reload reloadCollection = new CollectionAdminRequest.Reload();
|
||||
reloadCollection.setCollectionName(collection).setAsyncId("43");
|
||||
response = reloadCollection.process(cloudClient);
|
||||
assertEquals("43", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("43", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("ReloadCollection did not complete", "completed", state);
|
||||
|
||||
CollectionAdminRequest.CreateShard createShard = new CollectionAdminRequest.CreateShard()
|
||||
.setCollectionName(collection)
|
||||
.setShardName("shard2")
|
||||
.setAsyncId("44");
|
||||
response = createShard.process(cloudClient);
|
||||
assertEquals("44", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("44", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("CreateShard did not complete", "completed", state);
|
||||
|
||||
//Add a doc to shard2 to make sure shard2 was created properly
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", numDocs + 1);
|
||||
doc.addField("_route_", "shard2");
|
||||
cloudClient.add(collection, doc);
|
||||
cloudClient.commit(collection);
|
||||
query = new SolrQuery("*:*");
|
||||
query.set("shards", "shard2");
|
||||
assertEquals(1, cloudClient.query(collection, query).getResults().getNumFound());
|
||||
|
||||
CollectionAdminRequest.DeleteShard deleteShard = new CollectionAdminRequest.DeleteShard()
|
||||
.setCollectionName(collection)
|
||||
.setShardName("shard2")
|
||||
.setAsyncId("45");
|
||||
response = deleteShard.process(cloudClient);
|
||||
assertEquals("45", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("45", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("DeleteShard did not complete", "completed", state);
|
||||
|
||||
CollectionAdminRequest.AddReplica addReplica = new CollectionAdminRequest.AddReplica()
|
||||
.setCollectionName(collection)
|
||||
.setShardName("shard1")
|
||||
.setAsyncId("46");
|
||||
response = addReplica.process(cloudClient);
|
||||
assertEquals("46", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("46", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("AddReplica did not complete", "completed", state);
|
||||
|
||||
//cloudClient watch might take a couple of seconds to reflect it
|
||||
Slice shard1 = cloudClient.getZkStateReader().getClusterState().getSlice(collection, "shard1");
|
||||
int count = 0;
|
||||
while (shard1.getReplicas().size() != 2) {
|
||||
if (count++ > 1000) {
|
||||
fail("2nd Replica not reflecting in the cluster state");
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
CollectionAdminRequest.CreateAlias createAlias = new CollectionAdminRequest.CreateAlias()
|
||||
.setAliasName("myalias")
|
||||
.setAliasedCollections(collection)
|
||||
.setAsyncId("47");
|
||||
response = createAlias.process(cloudClient);
|
||||
assertEquals("47", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("47", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("CreateAlias did not complete", "completed", state);
|
||||
|
||||
query = new SolrQuery("*:*");
|
||||
query.set("shards", "shard1");
|
||||
assertEquals(numDocs, cloudClient.query("myalias", query).getResults().getNumFound());
|
||||
|
||||
CollectionAdminRequest.DeleteAlias deleteAlias = new CollectionAdminRequest.DeleteAlias()
|
||||
.setAliasName("myalias")
|
||||
.setAsyncId("48");
|
||||
response = deleteAlias.process(cloudClient);
|
||||
assertEquals("48", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("48", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("DeleteAlias did not complete", "completed", state);
|
||||
|
||||
try {
|
||||
cloudClient.query("myalias", query);
|
||||
fail("Alias should not exist");
|
||||
} catch (SolrException e) {
|
||||
//expected
|
||||
}
|
||||
|
||||
String replica = shard1.getReplicas().iterator().next().getName();
|
||||
CollectionAdminRequest.DeleteReplica deleteReplica = new CollectionAdminRequest.DeleteReplica()
|
||||
.setCollectionName(collection)
|
||||
.setShardName("shard1")
|
||||
.setReplica(replica)
|
||||
.setAsyncId("47");
|
||||
response = deleteReplica.process(cloudClient);
|
||||
assertEquals("47", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("47", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("DeleteReplica did not complete", "completed", state);
|
||||
|
||||
CollectionAdminRequest.Delete deleteCollection = new CollectionAdminRequest.Delete()
|
||||
.setCollectionName(collection)
|
||||
.setAsyncId("48");
|
||||
response = deleteCollection.process(cloudClient);
|
||||
assertEquals("48", response.getResponse().get("requestid"));
|
||||
state = getRequestStateAfterCompletion("48", MAX_TIMEOUT_SECONDS, cloudClient);
|
||||
assertEquals("DeleteCollection did not complete", "completed", state);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.util.Properties;
|
|||
public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q>> extends SolrRequest<CollectionAdminResponse> {
|
||||
|
||||
protected CollectionAction action = null;
|
||||
protected String asyncId;
|
||||
|
||||
private static String PROPERTY_PREFIX = "property.";
|
||||
|
||||
|
@ -63,12 +64,24 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
|
||||
protected abstract Q getThis();
|
||||
|
||||
public Q setAsyncId(String asyncId) {
|
||||
this.asyncId = asyncId;
|
||||
return getThis();
|
||||
}
|
||||
|
||||
public String getAsyncId() {
|
||||
return asyncId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
if (action == null) {
|
||||
throw new RuntimeException( "no action specified!" );
|
||||
}
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
if (asyncId != null) {
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
}
|
||||
params.set(CoreAdminParams.ACTION, action.toString());
|
||||
return params;
|
||||
}
|
||||
|
@ -112,11 +125,9 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
|
||||
params.set( CoreAdminParams.NAME, collection );
|
||||
params.set(CoreAdminParams.NAME, collection);
|
||||
return params;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
protected abstract static class CollectionShardAdminRequest <T extends CollectionAdminRequest<T>> extends CollectionAdminRequest<T> {
|
||||
|
@ -141,16 +152,23 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
return this.shardName;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public ModifiableSolrParams getCommonParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set(CoreAdminParams.COLLECTION, collection);
|
||||
params.set(CoreAdminParams.SHARD, shardName);
|
||||
if (asyncId != null) {
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
return getCommonParams();
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set(CoreAdminParams.COLLECTION, collection);
|
||||
params.set(CoreAdminParams.SHARD, shardName);
|
||||
return params;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,7 +220,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
private Properties properties;
|
||||
protected Boolean autoAddReplicas;
|
||||
protected Integer stateFormat;
|
||||
protected String asyncId;
|
||||
private String[] rule , snitch;
|
||||
public Create() {
|
||||
action = CollectionAction.CREATE;
|
||||
|
@ -218,10 +235,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
public Create setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; }
|
||||
public Create setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; }
|
||||
public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
|
||||
public Create setAsyncId(String asyncId) {
|
||||
this.asyncId = asyncId;
|
||||
return this;
|
||||
}
|
||||
public Create setRule(String... s){ this.rule = s; return this; }
|
||||
public Create setSnitch(String... s){ this.snitch = s; return this; }
|
||||
|
||||
|
@ -234,9 +247,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
public Integer getReplicationFactor() { return replicationFactor; }
|
||||
public Boolean getAutoAddReplicas() { return autoAddReplicas; }
|
||||
public Integer getStateFormat() { return stateFormat; }
|
||||
public String getAsyncId() {
|
||||
return asyncId;
|
||||
}
|
||||
|
||||
public Properties getProperties() {
|
||||
return properties;
|
||||
|
@ -267,7 +277,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
if (replicationFactor != null) {
|
||||
params.set( "replicationFactor", replicationFactor);
|
||||
}
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
if (autoAddReplicas != null) {
|
||||
params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas);
|
||||
}
|
||||
|
@ -294,6 +303,12 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
action = CollectionAction.RELOAD;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reload getThis() {
|
||||
return this;
|
||||
|
@ -302,12 +317,17 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
|
||||
// DELETE request
|
||||
public static class Delete extends CollectionSpecificAdminRequest<Delete> {
|
||||
protected String collection = null;
|
||||
|
||||
public Delete() {
|
||||
action = CollectionAction.DELETE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Delete getThis() {
|
||||
return this;
|
||||
|
@ -317,7 +337,7 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
// CREATESHARD request
|
||||
public static class CreateShard extends CollectionShardAdminRequest<CreateShard> {
|
||||
protected String nodeSet;
|
||||
private Properties properties;
|
||||
protected Properties properties;
|
||||
|
||||
public CreateShard setNodeSet(String nodeSet) {
|
||||
this.nodeSet = nodeSet;
|
||||
|
@ -343,7 +363,7 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = getCommonParams();
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
if (nodeSet != null) {
|
||||
params.set("createNodeSet", nodeSet);
|
||||
}
|
||||
|
@ -363,7 +383,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
public static class SplitShard extends CollectionShardAdminRequest<SplitShard> {
|
||||
protected String ranges;
|
||||
protected String splitKey;
|
||||
protected String asyncId;
|
||||
|
||||
private Properties properties;
|
||||
|
||||
|
@ -391,19 +410,10 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
this.properties = properties;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SplitShard setAsyncId(String asyncId) {
|
||||
this.asyncId = asyncId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getAsyncId() {
|
||||
return asyncId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = getCommonParams();
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set( "ranges", ranges);
|
||||
|
||||
if(splitKey != null)
|
||||
|
@ -412,8 +422,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
if(properties != null) {
|
||||
addProperties(params, properties);
|
||||
}
|
||||
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
return params;
|
||||
}
|
||||
|
||||
|
@ -437,7 +445,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
|
||||
// FORCELEADER request
|
||||
public static class ForceLeader extends CollectionShardAdminRequest<ForceLeader> {
|
||||
protected String asyncId;
|
||||
|
||||
public ForceLeader() {
|
||||
action = CollectionAction.FORCELEADER;
|
||||
|
@ -447,15 +454,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
protected ForceLeader getThis() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = getCommonParams();
|
||||
if (asyncId != null) {
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
}
|
||||
return params;
|
||||
}
|
||||
}
|
||||
|
||||
// REQUESTSTATUS request
|
||||
|
@ -566,12 +564,11 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
|
||||
// ADDREPLICA request
|
||||
public static class AddReplica extends CollectionShardAdminRequest<AddReplica> {
|
||||
private String node;
|
||||
private String routeKey;
|
||||
private String instanceDir;
|
||||
private String dataDir;
|
||||
private Properties properties;
|
||||
private String asyncId;
|
||||
protected String node;
|
||||
protected String routeKey;
|
||||
protected String instanceDir;
|
||||
protected String dataDir;
|
||||
protected Properties properties;
|
||||
|
||||
public AddReplica() {
|
||||
action = CollectionAction.ADDREPLICA;
|
||||
|
@ -632,9 +629,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
}
|
||||
params.add(ShardParams._ROUTE_, routeKey);
|
||||
}
|
||||
if (asyncId != null) {
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
}
|
||||
if (node != null) {
|
||||
params.add("node", node);
|
||||
}
|
||||
|
@ -650,15 +644,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
return params;
|
||||
}
|
||||
|
||||
public AddReplica setAsyncId(String asyncId) {
|
||||
this.asyncId = asyncId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getAsyncId() {
|
||||
return asyncId;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AddReplica getThis() {
|
||||
return this;
|
||||
|
@ -667,8 +652,8 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
|
||||
// DELETEREPLICA request
|
||||
public static class DeleteReplica extends CollectionShardAdminRequest<DeleteReplica> {
|
||||
private String replica;
|
||||
private Boolean onlyIfDown;
|
||||
protected String replica;
|
||||
protected Boolean onlyIfDown;
|
||||
|
||||
public DeleteReplica() {
|
||||
action = CollectionAction.DELETEREPLICA;
|
||||
|
@ -697,8 +682,8 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
|
||||
params.set(ZkStateReader.REPLICA_PROP, this.replica);
|
||||
|
||||
if(onlyIfDown != null) {
|
||||
params.set("onlyIfDown", this.onlyIfDown);
|
||||
if (onlyIfDown != null) {
|
||||
params.set("onlyIfDown", onlyIfDown);
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
@ -758,7 +743,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
private String splitKey;
|
||||
private Integer forwardTimeout;
|
||||
private Properties properties;
|
||||
private String asyncId;
|
||||
|
||||
public Migrate() {
|
||||
action = CollectionAction.MIGRATE;
|
||||
|
@ -818,8 +802,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
if (forwardTimeout != null) {
|
||||
params.set("forward.timeout", forwardTimeout);
|
||||
}
|
||||
params.set(CommonAdminParams.ASYNC, asyncId);
|
||||
|
||||
if (properties != null) {
|
||||
addProperties(params, properties);
|
||||
}
|
||||
|
@ -827,15 +809,6 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
|
|||
return params;
|
||||
}
|
||||
|
||||
public Migrate setAsyncId(String asyncId) {
|
||||
this.asyncId = asyncId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getAsyncId() {
|
||||
return asyncId;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Migrate getThis() {
|
||||
return this;
|
||||
|
|
Loading…
Reference in New Issue