SOLR-5477: Async execution of OverseerCollectionProcessor tasks

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1577444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Anshum Gupta 2014-03-14 08:01:18 +00:00
parent 125ef65996
commit 0371c620d3
17 changed files with 1637 additions and 96 deletions

View File

@ -119,6 +119,8 @@ New Features
* SOLR-5653: Create a RestManager to provide REST API endpoints for
reconfigurable plugins. (Tim Potter, Steve Rowe)
* SOLR-5477: Async execution of OverseerCollectionProcessor(CollectionsAPI)
tasks. (Anshum Gupta)
Bug Fixes
----------------------

View File

@ -0,0 +1,232 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* A distributed map.
* This supports basic map functions e.g. get, put, contains for interaction with zk which
* don't have to be ordered i.e. DistributedQueue.
*/
public class DistributedMap {
private static final Logger LOG = LoggerFactory
.getLogger(DistributedMap.class);
private static long DEFAULT_TIMEOUT = 5*60*1000;
private final String dir;
private SolrZkClient zookeeper;
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private final String prefix = "mn-";
private final String response_prefix = "mnr-" ;
public DistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl) {
this.dir = dir;
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
try {
cmdExecutor.ensureExists(dir, zookeeper);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
if (acl != null) {
this.acl = acl;
}
this.zookeeper = zookeeper;
}
private class LatchChildWatcher implements Watcher {
Object lock = new Object();
private WatchedEvent event = null;
public LatchChildWatcher() {}
public LatchChildWatcher(Object lock) {
this.lock = lock;
}
@Override
public void process(WatchedEvent event) {
LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
+ event.getState() + " type " + event.getType());
synchronized (lock) {
this.event = event;
lock.notifyAll();
}
}
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
lock.wait(timeout);
}
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
/**
* Inserts data into zookeeper.
*
* @return true if data was successfully added
*/
private String createData(String path, byte[] data, CreateMode mode)
throws KeeperException, InterruptedException {
for (;;) {
try {
return zookeeper.create(path, data, acl, mode, true);
} catch (KeeperException.NoNodeException e) {
try {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException ne) {
// someone created it
}
}
}
}
public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
return createData(dir + "/" + prefix + trackingId, data,
CreateMode.PERSISTENT) != null;
}
/**
* Offer the data and wait for the response
*
*/
public MapEvent put(String trackingId, byte[] data, long timeout) throws KeeperException,
InterruptedException {
String path = createData(dir + "/" + prefix + trackingId, data,
CreateMode.PERSISTENT);
String watchID = createData(
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
null, CreateMode.EPHEMERAL);
Object lock = new Object();
LatchChildWatcher watcher = new LatchChildWatcher(lock);
synchronized (lock) {
if (zookeeper.exists(watchID, watcher, true) != null) {
watcher.await(timeout);
}
}
byte[] bytes = zookeeper.getData(watchID, null, null, true);
zookeeper.delete(watchID, -1, true);
return new MapEvent(watchID, bytes, watcher.getWatchedEvent());
}
public MapEvent get(String trackingId) throws KeeperException, InterruptedException {
return new MapEvent(trackingId, zookeeper.getData(dir + "/" + prefix + trackingId, null, null, true), null);
}
public boolean contains(String trackingId) throws KeeperException, InterruptedException {
return zookeeper.exists(dir + "/" + prefix + trackingId, true);
}
public void remove(String trackingId) throws KeeperException, InterruptedException {
zookeeper.delete(dir + "/" + prefix + trackingId, -1, true);
}
/**
* Helper method to clear all child nodes for a parent node.
*/
public void clear() throws KeeperException, InterruptedException {
List<String> childNames = zookeeper.getChildren(dir, null, true);
for(String childName: childNames) {
zookeeper.delete(dir + "/" + childName, -1, true);
}
}
public static class MapEvent {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
MapEvent other = (MapEvent) obj;
if (id == null) {
if (other.id != null) return false;
} else if (!id.equals(other.id)) return false;
return true;
}
private WatchedEvent event = null;
private String id;
private byte[] bytes;
MapEvent(String id, byte[] bytes, WatchedEvent event) {
this.id = id;
this.bytes = bytes;
this.event = event;
}
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public byte[] getBytes() {
return bytes;
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
}

View File

@ -81,6 +81,13 @@ public class Overseer {
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
// Internal map which holds the information about running tasks.
private final DistributedMap runningMap;
// Internal map which holds the information about successfully completed tasks.
private final DistributedMap completedMap;
// Internal map which holds the information about failed tasks.
private final DistributedMap failureMap;
private Map clusterProps;
private boolean isClosed = false;
@ -88,6 +95,9 @@ public class Overseer {
this.zkClient = reader.getZkClient();
this.stateUpdateQueue = getInQueue(zkClient);
this.workQueue = getInternalQueue(zkClient);
this.failureMap = getFailureMap(zkClient);
this.runningMap = getRunningMap(zkClient);
this.completedMap = getCompletedMap(zkClient);
this.myId = myId;
this.reader = reader;
clusterProps = reader.getClusterProps();
@ -135,7 +145,7 @@ public class Overseer {
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
@ -1135,6 +1145,24 @@ public class Overseer {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue-work", null);
}
/* Internal map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getRunningMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/collection-map-running", null);
}
/* Internal map for successfully completed tasks, not to be used outside of the Overseer */
static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/collection-map-completed", null);
}
/* Internal map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getFailureMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/collection-map-failure", null);
}
/* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {

View File

@ -61,10 +61,8 @@ import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -82,6 +80,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.Assign.Node;
@ -118,16 +117,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public static final String DELETESHARD = "deleteshard";
public static final String REQUESTSTATUS = "status";
public static final String ROUTER = "router";
public static final String SHARDS_PROP = "shards";
public static final String ASYNC = "async";
public static final String CREATESHARD = "createshard";
public static final String DELETEREPLICA = "deletereplica";
public static final String MIGRATE = "migrate";
public static final String REQUESTID = "requestid";
public static final String COLL_CONF = "collection.configName";
public static final String COLL_PROP_PREFIX = "property.";
@ -149,6 +154,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
.getLogger(OverseerCollectionProcessor.class);
private DistributedQueue workQueue;
private DistributedMap runningMap;
private DistributedMap completedMap;
private DistributedMap failureMap;
private String myId;
@ -161,15 +169,25 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()));
this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
Overseer.getRunningMap(zkStateReader.getZkClient()),
Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
}
protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, DistributedQueue workQueue) {
protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
String adminPath,
DistributedQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
this.zkStateReader = zkStateReader;
this.myId = myId;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
this.workQueue = workQueue;
this.runningMap = runningMap;
this.completedMap = completedMap;
this.failureMap = failureMap;
}
@Override
@ -200,11 +218,35 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
QueueEvent head = workQueue.peek(true);
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
try {
if(message.containsKey(ASYNC) && message.get(ASYNC) != null && !runningMap.contains(message.getStr(ASYNC)))
runningMap.put(asyncId, null);
} catch (KeeperException.NodeExistsException e) {
// Just catch and do nothing. The runningMap.contains(..) check ensures that this is the only
// entry point into the runningMap.
// NOTE: Make sure to handle it as soon as OCP gets distributed/multi-threaded.
}
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
final String operation = message.getStr(QUEUE_OPERATION);
SolrResponse response = processMessage(message, operation);
head.setBytes(SolrResponse.serializable(response));
if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
if(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
failureMap.put(asyncId, null);
} else {
completedMap.put(asyncId, null);
}
}
if(asyncId != null)
runningMap.remove(asyncId);
workQueue.remove(head);
log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@ -412,9 +454,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
processRoleCommand(message, operation);
} else if (ADDREPLICA.isEqual(operation)) {
addReplica(zkStateReader.getClusterState(), message, results);
}
else {
} else if (REQUESTSTATUS.equals(operation)) {
requestStatus(message, results);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
}
@ -764,13 +806,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
processResponses(results);
log.info("Finished create command on all shards for collection: "
+ collectionName);
@ -931,6 +967,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
// the only side effect of this is that the sub shard may end up having more replicas than we want
collectShardResponses(results, false, null);
String asyncId = message.getStr(ASYNC);
HashMap<String, String> requestMap = new HashMap<String, String>();
for (int i=0; i<subRanges.size(); i++) {
String subSlice = subSlices.get(i);
String subShardName = subShardNames.get(i);
@ -957,12 +996,15 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.NAME, subShardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, subSlice);
setupAsyncRequest(asyncId, requestMap, params, nodeName);
addPropertyParams(message, params);
sendShardRequest(nodeName, params);
}
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard leaders");
"SPLITSHARD failed to create subshard leaders");
completeAsyncRequest(asyncId, requestMap, results);
for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core
@ -975,12 +1017,18 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
setupAsyncRequest(asyncId, requestMap, p, nodeName);
sendShardRequest(nodeName, p);
}
collectShardResponses(results, true,
"SPLTSHARD timed out waiting for subshard leaders to come up");
"SPLITSHARD timed out waiting for subshard leaders to come up");
completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully created all sub-shards for collection "
+ collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
@ -996,9 +1044,12 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.add(CoreAdminParams.TARGET_CORE, subShardName);
}
params.set(CoreAdminParams.RANGES, rangesStr);
setupAsyncRequest(asyncId, requestMap, params, parentShardLeader.getNodeName());
sendShardRequest(parentShardLeader.getNodeName(), params);
collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
completeAsyncRequest(asyncId, requestMap, results);
log.info("Index on shard: " + nodeName + " split into two successfully");
@ -1012,12 +1063,16 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, subShardName);
setupAsyncRequest(asyncId, requestMap, params, nodeName);
sendShardRequest(nodeName, params);
}
collectShardResponses(results, true,
"SPLITSHARD failed while asking sub shard leaders to apply buffered updates");
completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully applied buffered updates on : " + subShardNames);
// Replica creation for the new Slices
@ -1067,6 +1122,12 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.NAME, shardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
if(asyncId != null) {
String requestId = asyncId + Math.abs(System.nanoTime());
params.set(ASYNC, requestId);
requestMap.put(subShardNodeName, requestId);
}
addPropertyParams(message, params);
// TODO: Figure the config used by the parent shard and use it.
//params.set("collection.configName", configName);
@ -1086,12 +1147,19 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
cmd.setState(ZkStateReader.RECOVERING);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
setupAsyncRequest(asyncId, requestMap, p, nodeName);
sendShardRequest(nodeName, p);
}
}
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
"SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up");
completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
@ -1211,7 +1279,40 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
} while (srsp != null);
}
private void requestStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
log.info("Request status invoked");
String requestId = message.getStr(REQUESTID);
// Special taskId (-1), clears up the request state maps.
if(requestId.equals("-1")) {
completedMap.clear();
failureMap.clear();
return;
}
if(completedMap.contains(requestId)) {
SimpleOrderedMap success = new SimpleOrderedMap();
success.add("state", "completed");
success.add("msg", "found " + requestId + " in completed tasks");
results.add("status", success);
} else if (runningMap.contains(requestId)) {
SimpleOrderedMap success = new SimpleOrderedMap();
success.add("state", "running");
success.add("msg", "found " + requestId + " in submitted tasks");
results.add("status", success);
} else if (failureMap.contains(requestId)) {
SimpleOrderedMap success = new SimpleOrderedMap();
success.add("state", "failed");
success.add("msg", "found " + requestId + " in failed tasks");
results.add("status", success);
} else {
SimpleOrderedMap failure = new SimpleOrderedMap();
failure.add("state", "notfound");
failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
results.add("status", failure);
}
}
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
log.info("Delete shard invoked");
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
@ -1222,7 +1323,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
if (slice == null) {
if(clusterState.hasCollection(collection)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"No shard with the specified name exists: " + slice);
"No shard with the specified name exists: " + slice.getName());
} else {
throw new SolrException(ErrorCode.BAD_REQUEST,
"No collection with the specified name exists: " + collection);
@ -1242,13 +1343,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.DELETE_INDEX, "true");
sliceCmd(clusterState, params, null, slice);
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
processResponses(results);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection,
@ -1314,21 +1409,29 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
"No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
}
String asyncId = null;
if(message.containsKey(ASYNC) && message.get(ASYNC) != null)
asyncId = message.getStr(ASYNC);
for (Slice sourceSlice : sourceSlices) {
for (Slice targetSlice : targetSlices) {
log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results);
migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results, asyncId);
}
}
}
private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, DocCollection targetCollection, Slice targetSlice, String splitKey, int timeout, NamedList results) throws KeeperException, InterruptedException {
private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
DocCollection targetCollection, Slice targetSlice,
String splitKey, int timeout,
NamedList results, String asyncId) throws KeeperException, InterruptedException {
String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
if (clusterState.hasCollection(tempSourceCollectionName)) {
log.info("Deleting temporary collection: " + tempSourceCollectionName);
Map<String, Object> props = ZkNodeProps.makeMap(
QUEUE_OPERATION, DELETECOLLECTION,
"name", tempSourceCollectionName);
try {
deleteCollection(new ZkNodeProps(props), results);
} catch (Exception e) {
@ -1350,15 +1453,23 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
// For tracking async calls.
HashMap<String, String> requestMap = new HashMap<String, String>();
log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+ targetLeader.getStr("core") + " to buffer updates");
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
String nodeName = targetLeader.getNodeName();
setupAsyncRequest(asyncId, requestMap, params, nodeName);
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates");
completeAsyncRequest(asyncId, requestMap, results);
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
COLLECTION_PROP, sourceCollection.getName(),
@ -1405,6 +1516,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
NUM_SLICES, 1,
COLL_CONF, configName,
CREATE_NODE_SET, sourceLeader.getNodeName());
if(asyncId != null) {
String internalAsyncId = asyncId + Math.abs(System.nanoTime());
props.put(ASYNC, internalAsyncId);
}
log.info("Creating temporary collection: " + props);
createCollection(clusterState, new ZkNodeProps(props), results);
// refresh cluster state
@ -1437,8 +1553,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.RANGES, splitRange.toString());
params.set("split.key", splitKey);
sendShardRequest(sourceLeader.getNodeName(), params);
String tempNodeName = sourceLeader.getNodeName();
setupAsyncRequest(asyncId, requestMap, params, tempNodeName);
sendShardRequest(tempNodeName, params);
collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command");
completeAsyncRequest(asyncId, requestMap, results);
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
tempSourceCollectionName, targetLeader.getNodeName());
@ -1448,7 +1569,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.NAME, tempCollectionReplica2);
params.set(CoreAdminParams.COLLECTION, tempSourceCollectionName);
params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to create replica of temporary collection in target leader node.");
completeAsyncRequest(asyncId, requestMap, results);
coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
zkStateReader.getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
@ -1458,14 +1585,19 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
cmd.setCoreName(tempSourceLeader.getStr("core"));
cmd.setNodeName(targetLeader.getNodeName());
cmd.setCoreNodeName(coreNodeName);
cmd.setState(ZkStateReader.ACTIVE); // todo introduce asynchronous actions
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
params = new ModifiableSolrParams(cmd.getParams());
setupAsyncRequest(asyncId, requestMap, params, tempSourceLeader.getNodeName());
sendShardRequest(tempSourceLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to create temp collection replica or timed out waiting for them to come up");
completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully created replica of temp source collection on target leader node");
log.info("Requesting merge of temp source collection replica to target leader");
@ -1473,18 +1605,27 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.ACTION, CoreAdminAction.MERGEINDEXES.toString());
params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
setupAsyncRequest(asyncId, requestMap, params, sourceLeader.getNodeName());
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName());
completeAsyncRequest(asyncId, requestMap, results);
log.info("Asking target leader to apply buffered updates");
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to request node to apply buffered updates");
completeAsyncRequest(asyncId, requestMap, results);
try {
log.info("Deleting temporary collection: " + tempSourceCollectionName);
props = ZkNodeProps.makeMap(
@ -1497,6 +1638,21 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
}
private void completeAsyncRequest(String asyncId, HashMap<String, String> requestMap, NamedList results) {
if(asyncId != null) {
waitForAsyncCallsToComplete(requestMap, results);
requestMap.clear();
}
}
private void setupAsyncRequest(String asyncId, HashMap<String, String> requestMap, ModifiableSolrParams params, String nodeName) {
if(asyncId != null) {
String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
params.set(ASYNC, coreAdminAsyncId);
requestMap.put(nodeName, coreAdminAsyncId);
}
}
private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
if (a == null || b == null || !a.overlaps(b)) {
return null;
@ -1542,6 +1698,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
// if it does not, find best nodes to create more cores
int repFactor = message.getInt( REPLICATION_FACTOR, 1);
String async = null;
if (message.containsKey("async"))
async = message.getStr("async");
Integer numSlices = message.getInt(NUM_SLICES, null);
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<>();
@ -1627,6 +1788,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
if (!created)
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
// For tracking async calls.
HashMap<String, String> requestMap = new HashMap<String, String>();
log.info("Creating SolrCores for new collection, shardNames {} , replicationFactor : {}", shardNames, repFactor);
Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
for (int i = 1; i <= shardNames.size(); i++) {
@ -1662,6 +1826,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
if (replica.startsWith("http://")) replica = replica.substring(7);
setupAsyncRequest(async, requestMap, params, nodeName);
addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
@ -1689,13 +1858,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
}
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
processResponses(results);
completeAsyncRequest(async, requestMap, results);
log.info("Finished create command on all shards for collection: "
+ collectionName);
@ -1829,6 +1994,18 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
"ADDREPLICA failed to create replica");
}
private void processResponses(NamedList results) {
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
}
private String createConfNode(String coll, ZkNodeProps message, boolean isLegacyCloud) throws KeeperException, InterruptedException {
String configName = message.getStr(OverseerCollectionProcessor.COLL_CONF);
if(configName == null){
@ -1873,14 +2050,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
Slice slice = entry.getValue();
sliceCmd(clusterState, params, stateMatcher, slice);
}
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
processResponses(results);
}
@ -1950,4 +2121,71 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
return isClosed;
}
private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
for(String k:requestMap.keySet()) {
log.debug("I am Waiting for : " + k + "/" + requestMap.get(k));
results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
}
}
private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
params.set(CoreAdminParams.REQUESTID, requestId);
int counter = 0;
ShardRequest sreq;
do {
sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
NamedList results = new NamedList();
processResponse(results, srsp);
String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
if(r.equals("running")) {
log.debug("The task is still RUNNING, continuing to wait.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
} else if(r.equals("completed")) {
log.debug("The task is COMPLETED, returning");
return srsp.getSolrResponse().getResponse();
} else if (r.equals("failed")) {
// TODO: Improve this. Get more information.
log.debug("The task is FAILED, returning");
return srsp.getSolrResponse().getResponse();
} else if (r.equals("notfound")) {
log.debug("The task is notfound, retry");
if(counter++ < 5) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
break;
}
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request: " + srsp.getSolrResponse().getResponse().get("STATUS") +
"retried " + counter + "times");
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
}
}
} while (srsp != null);
} while(true);
}
}

View File

@ -92,6 +92,10 @@ public final class ZkController {
private final DistributedQueue overseerJobQueue;
private final DistributedQueue overseerCollectionQueue;
private final DistributedMap overseerRunningMap;
private final DistributedMap overseerCompletedMap;
private final DistributedMap overseerFailureMap;
public static final String CONFIGS_ZKNODE = "/configs";
@ -279,6 +283,9 @@ public final class ZkController {
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
@ -1583,6 +1590,18 @@ public final class ZkController {
public DistributedQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
public DistributedMap getOverseerRunningMap() {
return overseerRunningMap;
}
public DistributedMap getOverseerCompletedMap() {
return overseerCompletedMap;
}
public DistributedMap getOverseerFailureMap() {
return overseerFailureMap;
}
public int getClientTimeout() {
return clientTimeout;

View File

@ -367,6 +367,13 @@ public class CoreContainer {
zkSys.publishCoresAsDown(solrCores.getCores());
}
try {
coreAdminHandler.shutdown();
} catch (Exception e) {
log.warn("Error shutting down CoreAdminHandler. Continuing to shutdown CoreContainer.");
e.printStackTrace();
}
try {
// First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
synchronized (solrCores.getModifyLock()) {

View File

@ -18,6 +18,7 @@ package org.apache.solr.handler.admin;
*/
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
@ -25,6 +26,7 @@ import static org.apache.solr.cloud.OverseerCollectionProcessor.DELETEREPLICA;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
@ -51,6 +53,7 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@ -199,6 +202,10 @@ public class CollectionsHandler extends RequestHandlerBase {
this.handleAddReplica(req, rsp);
break;
}
case REQUESTSTATUS: {
this.handleRequestStatus(req, rsp);
break;
}
default: {
throw new RuntimeException("Unknown action: " + action);
}
@ -236,6 +243,16 @@ public class CollectionsHandler extends RequestHandlerBase {
public static long DEFAULT_ZK_TIMEOUT = 180*1000;
private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
req.getParams().required().check(REQUESTID);
Map<String, Object> props = new HashMap<String, Object>();
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.REQUESTSTATUS);
props.put(REQUESTID, req.getParams().get(REQUESTID));
ZkNodeProps m = new ZkNodeProps(props);
handleResponse(OverseerCollectionProcessor.REQUESTSTATUS, m, rsp);
}
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@ -244,6 +261,35 @@ public class CollectionsHandler extends RequestHandlerBase {
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
String asyncId = m.getStr(ASYNC);
if(asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
}
NamedList<String> r = new NamedList<>();
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
r.add("error", "Task with the same requestid already exists.");
} else {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m));
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
SolrResponse response = new OverseerSolrResponse(r);
rsp.getValues().addAll(response.getResponse());
return;
}
QueueEvent event = coreContainer.getZkController()
.getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m), timeout);
@ -368,6 +414,7 @@ public class CollectionsHandler extends RequestHandlerBase {
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET ,
SHARDS_PROP,
ASYNC,
"router.");
copyPropertiesIfNotNull(req.getParams(), props);
@ -380,7 +427,7 @@ public class CollectionsHandler extends RequestHandlerBase {
log.info("Remove replica: " + req.getParamString());
req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
Map<String, Object> map = makeMap(QUEUE_OPERATION, DELETEREPLICA);
copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica");
copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(DELETEREPLICA, m, rsp);
}
@ -394,7 +441,7 @@ public class CollectionsHandler extends RequestHandlerBase {
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD);
copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET);
copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET, ASYNC);
copyPropertiesIfNotNull(req.getParams(), map);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(CREATESHARD, m, rsp);
@ -485,6 +532,10 @@ public class CollectionsHandler extends RequestHandlerBase {
if (rangesStr != null) {
props.put(CoreAdminParams.RANGES, rangesStr);
}
if (req.getParams().get(ASYNC) != null)
props.put(ASYNC, req.getParams().get(ASYNC));
copyPropertiesIfNotNull(req.getParams(), props);
ZkNodeProps m = new ZkNodeProps(props);
@ -497,7 +548,7 @@ public class CollectionsHandler extends RequestHandlerBase {
req.getParams().required().check("collection", "split.key", "target.collection");
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.MIGRATE);
copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout");
copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC);
ZkNodeProps m = new ZkNodeProps(props);
handleResponse(OverseerCollectionProcessor.MIGRATE, m, rsp, DEFAULT_ZK_TIMEOUT * 20);
}

View File

@ -17,21 +17,8 @@
package org.apache.solr.handler.admin;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.index.DirectoryReader;
@ -55,6 +42,7 @@ import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook;
@ -75,14 +63,31 @@ import org.apache.solr.update.SplitIndexCommand;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
/**
*
@ -91,6 +96,24 @@ import com.google.common.collect.Lists;
public class CoreAdminHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
protected final CoreContainer coreContainer;
protected static HashMap<String, Map<String, TaskObject>> requestStatusMap =
new HashMap<String,Map<String, TaskObject>>();
protected ExecutorService parallelExecutor = null;
protected static int MAX_TRACKED_REQUESTS = 100;
public static String RUNNING = "running";
public static String COMPLETED = "completed";
public static String FAILED = "failed";
public static String RESPONSE = "Response";
public static String RESPONSE_STATUS = "STATUS";
public static String RESPONSE_MESSAGE = "msg";
static {
requestStatusMap.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
requestStatusMap.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
requestStatusMap.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
}
public CoreAdminHandler() {
super();
@ -136,6 +159,18 @@ public class CoreAdminHandler extends RequestHandlerBase {
"Core container instance missing");
}
//boolean doPersist = false;
String taskId = req.getParams().get("async");
TaskObject taskObject = new TaskObject(taskId);
if(taskId != null) {
// Put the tasks into the maps for tracking
if (getMap(RUNNING).containsKey(taskId) || getMap(COMPLETED).containsKey(taskId) || getMap(FAILED).containsKey(taskId)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Duplicate request with the same requestid found.");
}
addTask(RUNNING, taskObject);
}
// Pick the action
SolrParams params = req.getParams();
@ -147,6 +182,19 @@ public class CoreAdminHandler extends RequestHandlerBase {
this.handleCustomAction(req, rsp);
}
}
if (taskId == null) {
handleRequestInternal(req, rsp, action);
} else {
ParallelCoreAdminHandlerThread parallelHandlerThread = new ParallelCoreAdminHandlerThread(req, rsp, action, taskObject);
if(parallelExecutor == null || parallelExecutor.isShutdown())
parallelExecutor = Executors.newFixedThreadPool(50,
new DefaultSolrThreadFactory("parallelCoreAdminExecutor"));
parallelExecutor.execute(parallelHandlerThread);
}
}
protected void handleRequestInternal(SolrQueryRequest req, SolrQueryResponse rsp, CoreAdminAction action) throws Exception {
if (action != null) {
switch (action) {
case CREATE: {
@ -199,17 +247,17 @@ public class CoreAdminHandler extends RequestHandlerBase {
this.handleWaitForStateAction(req, rsp);
break;
}
case REQUESTRECOVERY: {
this.handleRequestRecoveryAction(req, rsp);
break;
}
case REQUESTSYNCSHARD: {
this.handleRequestSyncAction(req, rsp);
break;
}
// todo : Can this be done by the regular RecoveryStrategy route?
case REQUESTAPPLYUPDATES: {
this.handleRequestApplyUpdatesAction(req, rsp);
@ -219,6 +267,10 @@ public class CoreAdminHandler extends RequestHandlerBase {
this.handleRequestBufferUpdatesAction(req, rsp);
break;
}
case REQUESTSTATUS: {
this.handleRequestActionStatus(req, rsp);
break;
}
case OVERSEEROP:{
ZkController zkController = coreContainer.getZkController();
if(zkController != null){
@ -240,7 +292,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
rsp.setHttpCaching(false);
}
/**
* Handle the core admin SPLIT action.
*/
@ -754,6 +806,28 @@ public class CoreAdminHandler extends RequestHandlerBase {
}
}
/**
* Handle "REQUESTSTATUS" action
*/
protected void handleRequestActionStatus(SolrQueryRequest req, SolrQueryResponse rsp) {
SolrParams params = req.getParams();
String requestId = params.get(CoreAdminParams.REQUESTID);
log.info("Checking request status for : " + requestId);
if (mapContainsTask(RUNNING, requestId)) {
rsp.add(RESPONSE_STATUS, RUNNING);
} else if(mapContainsTask(COMPLETED, requestId)) {
rsp.add(RESPONSE_STATUS, COMPLETED);
rsp.add(RESPONSE, getMap(COMPLETED).get(requestId).getRspObject());
} else if(mapContainsTask(FAILED, requestId)) {
rsp.add(RESPONSE_STATUS, FAILED);
rsp.add(RESPONSE, getMap(FAILED).get(requestId).getRspObject());
} else {
rsp.add(RESPONSE_STATUS, "notfound");
rsp.add(RESPONSE_MESSAGE, "No task found in running, completed or failed tasks");
}
}
/**
* Handle "SWAP" action
*/
@ -1172,4 +1246,123 @@ public class CoreAdminHandler extends RequestHandlerBase {
public String getSource() {
return "$URL$";
}
/**
* Class to implement multi-threaded CoreAdminHandler behaviour.
* This accepts all of the context from handleRequestBody.
*/
protected class ParallelCoreAdminHandlerThread implements Runnable {
SolrQueryRequest req;
SolrQueryResponse rsp;
CoreAdminAction action;
TaskObject taskObject;
public ParallelCoreAdminHandlerThread (SolrQueryRequest req, SolrQueryResponse rsp,
CoreAdminAction action, TaskObject taskObject){
this.req = req;
this.rsp = rsp;
this.action = action;
this.taskObject = taskObject;
}
public void run() {
boolean exceptionCaught = false;
try {
handleRequestInternal(req, rsp, action);
taskObject.setRspObject(rsp);
} catch (Exception e) {
exceptionCaught = true;
taskObject.setRspObjectFromException(e);
} finally {
removeTask("running", taskObject.taskId);
if(exceptionCaught) {
addTask("failed", taskObject, true);
} else
addTask("completed", taskObject, true);
}
}
}
/**
* Helper class to manage the tasks to be tracked.
* This contains the taskId, request and the response (if available).
*/
private class TaskObject {
String taskId;
String rspInfo;
public TaskObject(String taskId) {
this.taskId = taskId;
}
public String getRspObject() {
return rspInfo;
}
public void setRspObject(SolrQueryResponse rspObject) {
this.rspInfo = rspObject.getToLogAsString("TaskId: " + this.taskId + " ");
}
public void setRspObjectFromException(Exception e) {
this.rspInfo = e.getMessage();
}
}
/**
* Helper method to add a task to a tracking map.
*/
protected void addTask(String map, TaskObject o, boolean limit) {
if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
String key = getMap(map).entrySet().iterator().next().getKey();
getMap(map).remove(key);
}
addTask(map, o);
}
protected void addTask(String map, TaskObject o) {
synchronized (getMap(map)) {
getMap(map).put(o.taskId, o);
}
}
/**
* Helper method to remove a task from a tracking map.
*/
protected void removeTask(String map, String taskId) {
synchronized (getMap(map)) {
getMap(map).remove(taskId);
}
}
/**
* Helper method to check if a map contains a taskObject with the given taskId.
*/
protected boolean mapContainsTask(String map, String taskId) {
return getMap(map).containsKey(taskId);
}
/**
* Helper method to get a TaskObject given a map and a taskId.
*/
protected TaskObject getTask(String map, String taskId) {
return getMap(map).get(taskId);
}
/**
* Helper method to get a request status map given the name.
*/
private Map<String, TaskObject> getMap(String map) {
return requestStatusMap.get(map);
}
/**
* Method to ensure shutting down of the ThreadPool Executor.
*/
public void shutdown() {
if (parallelExecutor != null && !parallelExecutor.isShutdown())
ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
}
}

View File

@ -0,0 +1,125 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.junit.Before;
import java.io.IOException;
public class AsyncMigrateRouteKeyTest extends MigrateRouteKeyTest {
public AsyncMigrateRouteKeyTest() {
schemaString = "schema15.xml"; // we need a string id
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
public void doTest() throws Exception {
waitForThingsToLevelOut(15);
multipleShardMigrateTest();
printLayout();
}
protected void checkAsyncRequestForCompletion(String asyncId) throws SolrServerException, IOException {
ModifiableSolrParams params;
String message;
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
message = sendStatusRequestWithRetry(params, 10);
assertEquals("Task " + asyncId + " not found in completed tasks.",
"found " + asyncId + " in completed tasks", message);
}
@Override
protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
String asyncId = "20140128";
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.MIGRATE.toString());
params.set("collection", sourceCollection);
params.set("target.collection", targetCollection);
params.set("split.key", splitKey);
params.set("forward.timeout", 45);
params.set("async", asyncId);
invoke(params);
checkAsyncRequestForCompletion(asyncId);
}
/**
* Helper method to send a status request with specific retry limit and return
* the message/null from the success response.
*/
private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
throws SolrServerException, IOException {
NamedList status = null;
String state = null;
String message = null;
NamedList r;
while (maxCounter-- > 0) {
r = sendRequest(params);
status = (NamedList) r.get("status");
state = (String) status.get("state");
message = (String) status.get("msg");
if (state.equals("completed") || state.equals("failed"))
return (String) status.get("msg");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
// Return last state?
return message;
}
protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
.getBaseURL();
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
HttpSolrServer baseServer = null;
try {
baseServer = new HttpSolrServer(baseUrl);
baseServer.setConnectionTimeout(15000);
return baseServer.request(request);
} finally {
baseServer.shutdown();
}
}
}

View File

@ -0,0 +1,185 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer.RemoteSolrException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
/**
* Tests the Cloud Collections API.
*/
@Slow
public class CollectionsAPIAsyncDistributedZkTest extends AbstractFullDistribZkTestBase {
private static final boolean DEBUG = false;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
useJettyDataDir = false;
System.setProperty("numShards", Integer.toString(sliceCount));
System.setProperty("solr.xml.persist", "true");
}
public CollectionsAPIAsyncDistributedZkTest() {
fixShardCount = true;
sliceCount = 2;
shardCount = 4;
}
@Override
public void doTest() throws Exception {
testSolrJAPICalls();
if (DEBUG) {
super.printLayout();
}
}
private void testSolrJAPICalls() throws Exception {
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
CollectionAdminRequest.createCollection("testasynccollectioncreation", 2, "conf1", server, "1001");
String state = null;
state = getRequestStateAfterCompletion("1001", 10, server);
assertEquals("CreateCollection task did not complete!", "completed", state);
CollectionAdminRequest.createCollection("testasynccollectioncreation", 2, "conf1", server, "1002");
state = getRequestStateAfterCompletion("1002", 3, server);
assertEquals("Recreating a collection with the same name didn't fail, should have.", "failed", state);
CollectionAdminRequest.splitShard("testasynccollectioncreation", "shard1", server, "1003");
state = getRequestStateAfterCompletion("1003", 60, server);
assertEquals("Shard split did not complete. Last recorded state: " + state, "completed", state);
}
private String getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrServer server)
throws IOException, SolrServerException {
String state = null;
while(waitForSeconds-- > 0) {
state = getRequestState(requestId, server);
if(state.equals("completed") || state.equals("failed"))
return state;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
return state;
}
private String getRequestState(String requestId, SolrServer server) throws IOException, SolrServerException {
CollectionAdminResponse response = CollectionAdminRequest.requestStatus(requestId, server);
NamedList innerResponse = (NamedList) response.getResponse().get("status");
return (String) innerResponse.get("state");
}
@Override
public void tearDown() throws Exception {
super.tearDown();
System.clearProperty("numShards");
System.clearProperty("zkHost");
System.clearProperty("solr.xml.persist");
// insurance
DirectUpdateHandler2.commitOnClose = true;
}
}

View File

@ -114,7 +114,7 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest {
return ruleRemoved;
}
private void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
params.set("collection", sourceCollection);
@ -125,7 +125,7 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest {
invoke(params);
}
private void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
protected void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
@ -161,7 +161,7 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest {
waitForRecoveriesToFinish(targetCollection, false);
}
private void multipleShardMigrateTest() throws Exception {
protected void multipleShardMigrateTest() throws Exception {
del("*:*");
commit();
assertTrue(cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() == 0);

View File

@ -71,6 +71,9 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
private static final String CONFIG_NAME = "myconfig";
private static DistributedQueue workQueueMock;
private static DistributedMap runningMapMock;
private static DistributedMap completedMapMock;
private static DistributedMap failureMapMock;
private static ShardHandler shardHandlerMock;
private static ZkStateReader zkStateReaderMock;
private static ClusterState clusterStateMock;
@ -90,8 +93,10 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandler shardHandler, String adminPath,
DistributedQueue workQueue) {
super(zkStateReader, myId, shardHandler, adminPath, workQueue);
DistributedQueue workQueue, DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap);
}
@Override
@ -111,6 +116,9 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
@BeforeClass
public static void setUpOnce() throws Exception {
workQueueMock = createMock(DistributedQueue.class);
runningMapMock = createMock(DistributedMap.class);
completedMapMock = createMock(DistributedMap.class);
failureMapMock = createMock(DistributedMap.class);
shardHandlerMock = createMock(ShardHandler.class);
zkStateReaderMock = createMock(ZkStateReader.class);
clusterStateMock = createMock(ClusterState.class);
@ -120,6 +128,9 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
@AfterClass
public static void tearDownOnce() {
workQueueMock = null;
runningMapMock = null;
completedMapMock = null;
failureMapMock = null;
shardHandlerMock = null;
zkStateReaderMock = null;
clusterStateMock = null;
@ -131,13 +142,16 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
super.setUp();
queue.clear();
reset(workQueueMock);
reset(workQueueMock);
reset(runningMapMock);
reset(completedMapMock);
reset(failureMapMock);
reset(shardHandlerMock);
reset(zkStateReaderMock);
reset(clusterStateMock);
reset(solrZkClientMock);
underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
"1234", shardHandlerMock, ADMIN_PATH, workQueueMock);
"1234", shardHandlerMock, ADMIN_PATH, workQueueMock, runningMapMock,
completedMapMock, failureMapMock);
zkMap.clear();
collectionsSet.clear();
}

View File

@ -0,0 +1,224 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.junit.Before;
import java.io.IOException;
public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
public TestRequestStatusCollectionAPI() {
schemaString = "schema15.xml"; // we need a string id
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
public void doTest() {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
params.set("name", "collection2");
params.set("numShards", 2);
params.set("replicationFactor", 1);
params.set("maxShardsPerNode", 100);
params.set("collection.configName", "conf1");
params.set("async", "1000");
try {
sendRequest(params);
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
// Check for the request to be completed.
int maxCounter = 10;
NamedList r = null;
NamedList status = null;
String message = null;
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
params.set(OverseerCollectionProcessor.REQUESTID, "1000");
try {
message = sendStatusRequestWithRetry(params, 10);
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
assertEquals("Task 1000 not found in completed tasks.", "found 1000 in completed tasks", message);
// Check for a random (hopefully non-existent request id
params = new ModifiableSolrParams();
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString());
params.set(OverseerCollectionProcessor.REQUESTID, "9999999");
try {
r = sendRequest(params);
status = (NamedList) r.get("status");
message = (String) status.get("msg");
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
assertEquals("Task 9999999 found in tasks queue.", "Did not find taskid [9999999] in any tasks queue", message);
params = new ModifiableSolrParams();
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.SPLITSHARD.toString());
params.set("collection", "collection2");
params.set("shard", "shard1");
params.set("async", "1001");
try {
sendRequest(params);
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
// Check for the request to be completed.
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
params.set(OverseerCollectionProcessor.REQUESTID, "1001");
try {
message = sendStatusRequestWithRetry(params, maxCounter);
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
assertEquals("Task 1001 not found in completed tasks.", "found 1001 in completed tasks", message);
params = new ModifiableSolrParams();
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
params.set("name", "collection2");
params.set("numShards", 2);
params.set("replicationFactor", 1);
params.set("maxShardsPerNode", 100);
params.set("collection.configName", "conf1");
params.set("async", "1002");
try {
sendRequest(params);
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
params.set(OverseerCollectionProcessor.REQUESTID, "1002");
try {
message = sendStatusRequestWithRetry(params, 10);
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
assertEquals("Task 1002 not found in completed tasks.", "found 1002 in failed tasks", message);
params = new ModifiableSolrParams();
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
params.set("name", "collection3");
params.set("numShards", 1);
params.set("replicationFactor", 1);
params.set("maxShardsPerNode", 100);
params.set("collection.configName", "conf1");
params.set("async", "1002");
try {
r = sendRequest(params);
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
assertEquals("Did not error out on duplicate requests (same request id)",
"Task with the same requestid already exists.", r.get("error"));
}
/**
* Helper method to send a status request with specific retry limit and return
* the message/null from the success response.
*/
private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
throws SolrServerException, IOException{
NamedList status = null;
String state = null;
String message = null;
NamedList r;
while(maxCounter-- > 0) {
r = sendRequest(params);
status = (NamedList) r.get("status");
state = (String) status.get("state");
message = (String) status.get("msg");
if(state.equals("completed") || state.equals("failed"))
return (String) status.get("msg");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
// Return last state?
return message;
}
protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
.getBaseURL();
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
baseServer.setConnectionTimeout(15000);
return baseServer.request(request);
}
}

View File

@ -0,0 +1,111 @@
package org.apache.solr.handler.admin;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
public class CoreAdminRequestStatusTest extends SolrTestCaseJ4{
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig.xml", "schema.xml");
}
@Test
public void testCoreAdminRequestStatus() throws Exception {
final File workDir = new File(TEMP_DIR, this.getClass().getName());
if (workDir.exists()) {
FileUtils.deleteDirectory(workDir);
}
assertTrue("Failed to mkdirs workDir", workDir.mkdirs());
final CoreContainer cores = h.getCoreContainer();
final CoreAdminHandler admin = new CoreAdminHandler(cores);
String instDir = null;
{
SolrCore template = null;
try {
template = cores.getCore("collection1");
instDir = template.getCoreDescriptor().getInstanceDir();
} finally {
if (null != template) template.close();
}
}
final File instDirFile = new File(instDir);
assertTrue("instDir doesn't exist: " + instDir, instDirFile.exists());
final File instPropFile = new File(workDir, "instProp");
FileUtils.copyDirectory(instDirFile, instPropFile);
// create a new core (using CoreAdminHandler) w/ properties
SolrQueryResponse resp = new SolrQueryResponse();
admin.handleRequestBody
(req(CoreAdminParams.ACTION,
CoreAdminParams.CoreAdminAction.CREATE.toString(),
CoreAdminParams.INSTANCE_DIR, instPropFile.getAbsolutePath(),
CoreAdminParams.NAME, "dummycore",
"async", "42"),
resp);
assertNull("Exception on create", resp.getException());
int maxRetries = 10;
while(maxRetries-- > 0) {
resp = new SolrQueryResponse();
admin.handleRequestBody
(req(CoreAdminParams.ACTION,
CoreAdminParams.CoreAdminAction.REQUESTSTATUS.toString(),
CoreAdminParams.REQUESTID, "42"),
resp
);
if(resp.getValues().get("STATUS") != null && resp.getValues().get("STATUS").equals("completed"))
break;
Thread.sleep(1000);
}
assertEquals("The status of request was expected to be completed",
"completed", resp.getValues().get("STATUS"));
resp = new SolrQueryResponse();
admin.handleRequestBody
(req(CoreAdminParams.ACTION,
CoreAdminParams.CoreAdminAction.REQUESTSTATUS.toString(),
CoreAdminParams.REQUESTID, "9999999"),
resp
);
assertEquals("Was expecting it to be invalid but found a task with the id.",
"notfound", resp.getValues().get("STATUS"));
admin.shutdown();
}
}

View File

@ -17,21 +17,21 @@
package org.apache.solr.client.solrj.request;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* This class is experimental and subject to change.
*
@ -41,6 +41,7 @@ public class CollectionAdminRequest extends SolrRequest
{
protected String collection = null;
protected CollectionAction action = null;
protected String asyncId = null;
protected static class CollectionShardAdminRequest extends CollectionAdminRequest {
protected String shardName = null;
@ -53,6 +54,7 @@ public class CollectionAdminRequest extends SolrRequest
params.remove( "name" );
params.set( "collection", collection );
params.set( "shard", shardName);
params.set( "async", asyncId);
return params;
}
@ -124,6 +126,9 @@ public class CollectionAdminRequest extends SolrRequest
// OverseerCollectionProcessor.REPLICATION_FACTOR
params.set( "replicationFactor", replicationFactor);
}
if (asyncId != null) {
params.set("async", asyncId);
}
return params;
}
@ -188,6 +193,25 @@ public class CollectionAdminRequest extends SolrRequest
}
}
//a request status collection request
public static class RequestStatus extends CollectionAdminRequest {
protected String requestId = null;
public RequestStatus() {
action = CollectionAction.REQUESTSTATUS;
}
public void setRequestId(String requestId) {this.requestId = requestId; }
public String getRequestId() { return this.requestId; }
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
params.set("requestid", requestId);
return params;
}
}
//a collection alias create request
public static class CreateAlias extends CollectionAdminRequest {
protected String aliasedCollections = null;
@ -239,6 +263,10 @@ public class CollectionAdminRequest extends SolrRequest
this.action = action;
}
public void setAsyncId(String asyncId) {
this.asyncId = asyncId;
}
//---------------------------------------------------------------------------------------
//
//---------------------------------------------------------------------------------------
@ -285,7 +313,19 @@ public class CollectionAdminRequest extends SolrRequest
String nodeSet,
String conf,
String routerField,
SolrServer server ) throws SolrServerException, IOException
SolrServer server) throws SolrServerException, IOException
{
return createCollection(name, shards, repl, maxShards, nodeSet, conf, routerField, server, null);
}
// creates collection using a compositeId router
public static CollectionAdminResponse createCollection( String name,
Integer shards, Integer repl, Integer maxShards,
String nodeSet,
String conf,
String routerField,
SolrServer server,
String asyncId) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
@ -296,27 +336,47 @@ public class CollectionAdminRequest extends SolrRequest
req.setCreateNodeSet(nodeSet);
req.setConfigName(conf);
req.setRouterField(routerField);
req.setAsyncId(asyncId);
return req.process( server );
}
public static CollectionAdminResponse createCollection(String name, Integer shards,
String conf,
SolrServer server) throws SolrServerException, IOException {
return createCollection(name, shards, conf, server, null);
}
public static CollectionAdminResponse createCollection( String name,
Integer shards, String conf,
SolrServer server ) throws SolrServerException, IOException
SolrServer server,
String asyncId) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
req.setRouterName("compositeId");
req.setNumShards(shards);
req.setConfigName(conf);
req.setAsyncId(asyncId);
return req.process( server );
}
public static CollectionAdminResponse createCollection(String name,
String shards,
Integer repl, Integer maxShards,
String conf,
String routerField,
SolrServer server) throws SolrServerException, IOException {
return createCollection(name, shards, repl, maxShards, conf, routerField, null);
}
// creates a collection using an implicit router
public static CollectionAdminResponse createCollection( String name,
String shards, Integer repl, Integer maxShards,
String nodeSet,
String conf,
String routerField,
SolrServer server ) throws SolrServerException, IOException
SolrServer server,
String asyncId) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
@ -327,34 +387,68 @@ public class CollectionAdminRequest extends SolrRequest
req.setCreateNodeSet(nodeSet);
req.setConfigName(conf);
req.setRouterField(routerField);
req.setAsyncId(asyncId);
return req.process( server );
}
public static CollectionAdminResponse createCollection( String name,
String shards, String conf,
SolrServer server ) throws SolrServerException, IOException
SolrServer server) throws SolrServerException, IOException
{
return createCollection(name, shards, conf, server, null);
}
public static CollectionAdminResponse createCollection( String name,
String shards, String conf,
SolrServer server, String asyncId ) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
req.setRouterName("implicit");
req.setShards(shards);
req.setConfigName(conf);
req.setAsyncId(asyncId);
return req.process( server );
}
public static CollectionAdminResponse reloadCollection( String name, SolrServer server ) throws SolrServerException, IOException
public static CollectionAdminResponse reloadCollection( String name, SolrServer server)
throws SolrServerException, IOException {
return reloadCollection(name, server, null);
}
public static CollectionAdminResponse reloadCollection( String name, SolrServer server, String asyncId )
throws SolrServerException, IOException
{
CollectionAdminRequest req = new Reload();
req.setCollectionName(name);
req.setAsyncId(asyncId);
return req.process( server );
}
public static CollectionAdminResponse deleteCollection( String name, SolrServer server ) throws SolrServerException, IOException
public static CollectionAdminResponse deleteCollection( String name, SolrServer server)
throws SolrServerException, IOException
{
return deleteCollection(name, server, null);
}
public static CollectionAdminResponse deleteCollection( String name, SolrServer server,
String asyncId)
throws SolrServerException, IOException
{
CollectionAdminRequest req = new Delete();
req.setCollectionName(name);
req.setAsyncId(asyncId);
return req.process( server );
}
public static CollectionAdminResponse requestStatus(String requestId, SolrServer server)
throws SolrServerException, IOException {
RequestStatus req = new RequestStatus();
req.setRequestId(requestId);
return req.process(server);
}
public static CollectionAdminResponse createShard( String name, String shard, String nodeSet, SolrServer server ) throws SolrServerException, IOException
{
CreateShard req = new CreateShard();
@ -369,16 +463,30 @@ public class CollectionAdminRequest extends SolrRequest
}
public static CollectionAdminResponse splitShard( String name, String shard, String ranges, SolrServer server ) throws SolrServerException, IOException
{
return splitShard(name, shard, ranges, server, null);
}
public static CollectionAdminResponse splitShard( String name, String shard, String ranges, SolrServer server,
String asyncId) throws SolrServerException, IOException
{
SplitShard req = new SplitShard();
req.setCollectionName(name);
req.setShardName(shard);
req.setRanges(ranges);
req.setAsyncId(asyncId);
return req.process( server );
}
public static CollectionAdminResponse splitShard( String name, String shard, SolrServer server ) throws SolrServerException, IOException
public static CollectionAdminResponse splitShard(String name, String shard, SolrServer server)
throws SolrServerException, IOException {
return splitShard(name, shard, null, server, null);
}
public static CollectionAdminResponse splitShard( String name, String shard, SolrServer server,
String asyncId ) throws SolrServerException, IOException
{
return splitShard(name, shard, null, server);
return splitShard(name, shard, null, server, asyncId);
}
public static CollectionAdminResponse deleteShard( String name, String shard, SolrServer server ) throws SolrServerException, IOException

View File

@ -42,6 +42,7 @@ public interface CollectionParams
ADDROLE,
REMOVEROLE,
CLUSTERPROP,
REQUESTSTATUS,
ADDREPLICA;
public static CollectionAction get( String p )

View File

@ -91,7 +91,9 @@ public abstract class CoreAdminParams
public final static String RANGES = "ranges";
public static final String ROLES = "roles";
public static final String REQUESTID = "requestid";
public static final String CORE_NODE_NAME = "coreNodeName";
/** Prefix for core property name=value pair **/
@ -128,7 +130,8 @@ public abstract class CoreAdminParams
REQUESTAPPLYUPDATES,
LOAD_ON_STARTUP,
TRANSIENT,
OVERSEEROP;
OVERSEEROP,
REQUESTSTATUS;
public static CoreAdminAction get( String p )
{