SOLR-4693: A "deleteshard" collections API that unloads all replicas of a given shard and then removes it from the cluster state. It will remove only those shards which are INACTIVE or have no range (created for custom sharding).

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1499655 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2013-07-04 05:04:21 +00:00
parent 969c50b403
commit dcbc79ce15
6 changed files with 371 additions and 67 deletions

View File

@ -151,6 +151,11 @@ New Features
* SOLR-4977: Add option to send IndexWriter's infostream to the logging system.
(Ryan Ernst via Robert Muir)
* SOLR-4693: A "deleteshard" collections API that unloads all replicas of a given
shard and then removes it from the cluster state. It will remove only those shards
which are INACTIVE or have no range (created for custom sharding).
(Anshum Gupta, shalin)
Bug Fixes
----------------------

View File

@ -52,6 +52,7 @@ public class Overseer {
public static final String QUEUE_OPERATION = "operation";
public static final String DELETECORE = "deletecore";
public static final String REMOVECOLLECTION = "removecollection";
public static final String REMOVESHARD = "removeshard";
private static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
@ -181,6 +182,8 @@ public class Overseer {
clusterState = removeCore(clusterState, message);
} else if (REMOVECOLLECTION.equals(operation)) {
clusterState = removeCollection(clusterState, message);
} else if (REMOVESHARD.equals(operation)) {
clusterState = removeShard(clusterState, message);
} else if (ZkStateReader.LEADER_PROP.equals(operation)) {
StringBuilder sb = new StringBuilder();
@ -548,8 +551,28 @@ public class Overseer {
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
return newState;
}
/*
/*
* Remove collection slice from cloudstate
*/
private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
DocCollection coll = newCollections.get(collection);
Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>(coll.getSlicesMap());
newSlices.remove(sliceId);
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
newCollections.put(newCollection.getName(), newCollection);
return new ClusterState(clusterState.getLiveNodes(), newCollections);
}
/*
* Remove core from cloudstate
*/
private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {

View File

@ -75,6 +75,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public static final String SPLITSHARD = "splitshard";
public static final String DELETESHARD = "deleteshard";
// TODO: use from Overseer?
private static final String QUEUE_OPERATION = "operation";
@ -164,31 +166,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
if (CREATECOLLECTION.equals(operation)) {
createCollection(zkStateReader.getClusterState(), message, results);
} else if (DELETECOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
params.set(CoreAdminParams.DELETE_DATA_DIR, true);
collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
// wait for a while until we don't see the collection
long now = System.currentTimeMillis();
long timeout = now + 30000;
boolean removed = false;
while (System.currentTimeMillis() < timeout) {
Thread.sleep(100);
removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
if (removed) {
Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
break;
}
}
if (!removed) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
}
deleteCollection(message, results);
} else if (RELOADCOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
@ -199,6 +177,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
deleteAlias(zkStateReader.getAliases(), message);
} else if (SPLITSHARD.equals(operation)) {
splitShard(zkStateReader.getClusterState(), message, results);
} else if (DELETESHARD.equals(operation)) {
deleteShard(zkStateReader.getClusterState(), message, results);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
@ -217,6 +197,34 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
return new OverseerSolrResponse(results);
}
private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
params.set(CoreAdminParams.DELETE_DATA_DIR, true);
collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
// wait for a while until we don't see the collection
long now = System.currentTimeMillis();
long timeout = now + 30000;
boolean removed = false;
while (System.currentTimeMillis() < timeout) {
Thread.sleep(100);
removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
if (removed) {
Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
break;
}
}
if (!removed) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
}
}
private void createAlias(Aliases aliases, ZkNodeProps message) {
String aliasName = message.getStr("name");
String collections = message.getStr("collections");
@ -604,6 +612,75 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
} while (srsp != null);
}
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
log.info("Delete shard invoked");
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
Slice slice = clusterState.getSlice(collection, sliceId);
if (slice == null) {
if(clusterState.getCollections().contains(collection)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"No shard with the specified name exists: " + slice);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST,
"No collection with the specified name exists: " + collection);
}
}
// For now, only allow for deletions of Inactive slices or custom hashes (range==null).
// TODO: Add check for range gaps on Slice deletion
if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"The slice: " + slice.getName() + " is currently "
+ slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted.");
}
try {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INDEX, "true");
sliceCmd(clusterState, params, null, slice);
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection);
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
// wait for a while until we don't see the shard
long now = System.currentTimeMillis();
long timeout = now + 30000;
boolean removed = false;
while (System.currentTimeMillis() < timeout) {
Thread.sleep(100);
removed = zkStateReader.getClusterState().getSlice(collection, message.getStr("name")) == null;
if (removed) {
Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
break;
}
}
if (!removed) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not fully remove collection: " + collection + " shard: " + message.getStr("name"));
}
log.info("Successfully deleted collection " + collection + ", shard: " + message.getStr("name"));
} catch (SolrException e) {
throw e;
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + message.getStr("name"), e);
}
}
private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
@ -756,33 +833,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
Slice slice = entry.getValue();
Map<String,Replica> shards = slice.getReplicasMap();
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
// For thread safety, only simple clone the ModifiableSolrParams
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
cloneParams.add(params);
cloneParams.set(CoreAdminParams.CORE,
node.getStr(ZkStateReader.CORE_NAME_PROP));
String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
// yes, they must use same admin handler path everywhere...
cloneParams.set("qt", adminPath);
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = cloneParams;
log.info("Collection Admin sending CoreAdmin cmd to " + replica
+ " params:" + sreq.params);
shardHandler.submit(sreq, replica, sreq.params);
}
}
sliceCmd(clusterState, params, stateMatcher, slice);
}
ShardResponse srsp;
@ -795,6 +846,36 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, String stateMatcher, Slice slice) {
Map<String,Replica> shards = slice.getReplicasMap();
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
// For thread safety, only simple clone the ModifiableSolrParams
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
cloneParams.add(params);
cloneParams.set(CoreAdminParams.CORE,
node.getStr(ZkStateReader.CORE_NAME_PROP));
String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
// yes, they must use same admin handler path everywhere...
cloneParams.set("qt", adminPath);
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = cloneParams;
log.info("Collection Admin sending CoreAdmin cmd to " + replica
+ " params:" + sreq.params);
shardHandler.submit(sreq, replica, sreq.params);
}
}
}
private void processResponse(NamedList results, ShardResponse srsp) {
Throwable e = srsp.getException();
if (e != null) {

View File

@ -132,10 +132,14 @@ public class CollectionsHandler extends RequestHandlerBase {
this.handleDeleteAliasAction(req, rsp);
break;
}
case SPLITSHARD: {
this.handleSplitShardAction(req, rsp);
break;
}
case SPLITSHARD: {
this.handleSplitShardAction(req, rsp);
break;
}
case DELETESHARD: {
this.handleDeleteShardAction(req, rsp);
break;
}
default: {
throw new RuntimeException("Unknown action: " + action);
@ -146,13 +150,18 @@ public class CollectionsHandler extends RequestHandlerBase {
}
public static long DEFAULT_ZK_TIMEOUT = 60*1000;
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
}
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.currentTimeMillis();
QueueEvent event = coreContainer.getZkController()
.getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m), DEFAULT_ZK_TIMEOUT);
.offer(ZkStateReader.toJSON(m), timeout);
if (event.getBytes() != null) {
SolrResponse response = SolrResponse.deserialize(event.getBytes());
rsp.getValues().addAll(response.getResponse());
@ -162,9 +171,9 @@ public class CollectionsHandler extends RequestHandlerBase {
rsp.setException(new SolrException(code != null && code != -1 ? ErrorCode.getErrorCode(code) : ErrorCode.SERVER_ERROR, (String)exp.get("msg")));
}
} else {
if (System.currentTimeMillis() - time >= DEFAULT_ZK_TIMEOUT) {
if (System.currentTimeMillis() - time >= timeout) {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the collection time out:" + DEFAULT_ZK_TIMEOUT / 1000 + "s");
+ " the collection time out:" + timeout / 1000 + "s");
} else if (event.getWatchedEvent() != null) {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the collection error [Watcher fired on path: "
@ -280,6 +289,21 @@ public class CollectionsHandler extends RequestHandlerBase {
handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
}
private void handleDeleteShardAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws InterruptedException, KeeperException {
log.info("Deleting Shard : " + req.getParamString());
String name = req.getParams().required().get("collection");
String shard = req.getParams().required().get("shard");
Map<String,Object> props = new HashMap<String,Object>();
props.put("collection", name);
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETESHARD);
props.put(ZkStateReader.SHARD_ID_PROP, shard);
ZkNodeProps m = new ZkNodeProps(props);
handleResponse(OverseerCollectionProcessor.DELETESHARD, m, rsp);
}
private void handleSplitShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Splitting shard : " + req.getParamString());
@ -295,10 +319,7 @@ public class CollectionsHandler extends RequestHandlerBase {
ZkNodeProps m = new ZkNodeProps(props);
// todo remove this hack
DEFAULT_ZK_TIMEOUT *= 5;
handleResponse(OverseerCollectionProcessor.SPLITSHARD, m, rsp);
DEFAULT_ZK_TIMEOUT /= 5;
handleResponse(OverseerCollectionProcessor.SPLITSHARD, m, rsp, DEFAULT_ZK_TIMEOUT * 5);
}
public static ModifiableSolrParams params(String... params) {

View File

@ -0,0 +1,174 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DeleteShardTest extends AbstractFullDistribZkTestBase {
public DeleteShardTest() {
super();
fixShardCount = true;
shardCount = 2;
sliceCount = 2;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", "2");
System.setProperty("solr.xml.persist", "true");
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
if (VERBOSE || printLayoutOnTearDown) {
super.printLayout();
}
if (controlClient != null) {
controlClient.shutdown();
}
if (cloudClient != null) {
cloudClient.shutdown();
}
if (controlClientCloud != null) {
controlClientCloud.shutdown();
}
super.tearDown();
System.clearProperty("numShards");
System.clearProperty("solr.xml.persist");
}
// TODO: Custom hash slice deletion test
@Override
public void doTest() throws Exception {
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Slice slice1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
Slice slice2 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2);
assertNotNull("Shard1 not found", slice1);
assertNotNull("Shard2 not found", slice2);
assertEquals("Shard1 is not active", Slice.ACTIVE, slice1.getState());
assertEquals("Shard2 is not active", Slice.ACTIVE, slice2.getState());
setSliceAsInactive(SHARD1);
clusterState = cloudClient.getZkStateReader().getClusterState();
slice1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
assertEquals("Shard1 is not inactive yet.", Slice.INACTIVE, slice1.getState());
deleteShard(SHARD1);
confirmShardDeletion();
}
protected void confirmShardDeletion() throws SolrServerException, KeeperException,
InterruptedException {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = null;
int counter = 10;
while (counter-- > 0) {
zkStateReader.updateClusterState(true);
clusterState = zkStateReader.getClusterState();
if (clusterState.getSlice("collection1", SHARD1) == null) {
break;
}
Thread.sleep(1000);
}
assertNull("Cluster still contains shard1 even after waiting for it to be deleted.",
clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1));
}
protected void deleteShard(String shard) throws SolrServerException, IOException,
KeeperException, InterruptedException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.DELETESHARD.toString());
params.set("collection", AbstractFullDistribZkTestBase.DEFAULT_COLLECTION);
params.set("shard", shard);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
.getBaseURL();
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
baseServer.setConnectionTimeout(15000);
baseServer.setSoTimeout((int) (CollectionsHandler.DEFAULT_ZK_TIMEOUT));
baseServer.request(request);
}
protected void setSliceAsInactive(String slice) throws SolrServerException, IOException,
KeeperException, InterruptedException {
DistributedQueue inQueue = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient());
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
propMap.put(slice, Slice.INACTIVE);
propMap.put(ZkStateReader.COLLECTION_PROP, "collection1");
ZkNodeProps m = new ZkNodeProps(propMap);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
inQueue.offer(ZkStateReader.toJSON(m));
boolean transition = false;
for (int counter = 10; counter > 0; counter--) {
zkStateReader.updateClusterState(true);
ClusterState clusterState = zkStateReader.getClusterState();
String sliceState = clusterState.getSlice("collection1", slice).getState();
if (sliceState.equals(Slice.INACTIVE)) {
transition = true;
break;
}
Thread.sleep(1000);
}
if (!transition) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not set shard [" + slice + "] as INACTIVE");
}
}
}

View File

@ -28,7 +28,7 @@ public interface CollectionParams
public enum CollectionAction {
CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD;
CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD, DELETESHARD;
public static CollectionAction get( String p )
{