SOLR-9319: DELETEREPLICA can accept a 'count' and remove appropriate replicas

This commit is contained in:
Noble Paul 2016-09-02 09:27:43 +05:30
parent e13f7aeafa
commit e203c9af95
8 changed files with 292 additions and 32 deletions

View File

@ -103,6 +103,8 @@ Bug Fixes
* SOLR-9461: DELETENODE, REPLACENODE should pass down the 'async' param to subcommands (shalin, noble)
* SOLR-9319: DELETEREPLICA can accept a 'count' and remove appropriate replicas (Nitin Sharma, noble )
Optimizations
----------------------

View File

@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;

View File

@ -18,8 +18,11 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
@ -35,6 +38,7 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -44,6 +48,7 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOW
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@ -62,36 +67,156 @@ public class DeleteReplicaCmd implements Cmd {
deleteReplica(clusterState, message, results,null);
}
@SuppressWarnings("unchecked")
void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws KeeperException, InterruptedException {
throws KeeperException, InterruptedException {
log.info("deleteReplica() : {}", Utils.toJSONString(message));
boolean parallel = message.getBool("parallel", false);
//If a count is specified the strategy needs be different
if (message.getStr(COUNT_PROP) != null) {
deleteReplicaBasedOnCount(clusterState, message, results, onComplete, parallel);
return;
}
ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
String replicaName = message.getStr(REPLICA_PROP);
boolean parallel = message.getBool("parallel", false);
DocCollection coll = clusterState.getCollection(collectionName);
Slice slice = coll.getSlice(shard);
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid shard name : " + shard + " in collection : " + collectionName);
"Invalid shard name : " + shard + " in collection : " + collectionName);
}
deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete, parallel);
}
/**
* Delete replicas based on count for a given collection. If a shard is passed, uses that
* else deletes given num replicas across all shards for the given collection.
*/
void deleteReplicaBasedOnCount(ClusterState clusterState,
ZkNodeProps message,
NamedList results,
Runnable onComplete,
boolean parallel)
throws KeeperException, InterruptedException {
ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
int count = Integer.parseInt(message.getStr(COUNT_PROP));
String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
DocCollection coll = clusterState.getCollection(collectionName);
Slice slice = null;
//Validate if shard is passed.
if (shard != null) {
slice = coll.getSlice(shard);
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid shard name : " + shard + " in collection : " + collectionName);
}
}
Map<Slice, Set<String>> shardToReplicasMapping = new HashMap<Slice, Set<String>>();
if (slice != null) {
Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(slice, shard, collectionName, count);
shardToReplicasMapping.put(slice,replicasToBeDeleted);
} else {
//If there are many replicas left, remove the rest based on count.
Collection<Slice> allSlices = coll.getSlices();
for (Slice individualSlice : allSlices) {
Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(individualSlice, individualSlice.getName(), collectionName, count);
shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
}
}
for (Slice shardSlice: shardToReplicasMapping.keySet()) {
String shardId = shardSlice.getName();
Set<String> replicas = shardToReplicasMapping.get(shardSlice);
//callDeleteReplica on all replicas
for (String replica: replicas) {
log.info("Deleting replica {} for shard {} based on count {}", replica, shardId, count);
deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete, parallel);
}
results.add("shard_id", shardId);
results.add("replicas_deleted", replicas);
}
}
/**
* Pick replicas to be deleted. Avoid picking the leader.
*/
private Set<String> pickReplicasTobeDeleted(Slice slice, String shard, String collectionName, int count) {
validateReplicaAvailability(slice, shard, collectionName, count);
Collection<Replica> allReplicas = slice.getReplicas();
Set<String> replicasToBeRemoved = new HashSet<String>();
Replica leader = slice.getLeader();
for (Replica replica: allReplicas) {
if (count == 0) {
break;
}
//Try avoiding to pick up the leader to minimize activity on the cluster.
if (leader.getCoreName().equals(replica.getCoreName())) {
continue;
}
replicasToBeRemoved.add(replica.getName());
count --;
}
return replicasToBeRemoved;
}
/**
* Validate if there is less replicas than requested to remove. Also error out if there is
* only one replica available
*/
private void validateReplicaAvailability(Slice slice, String shard, String collectionName, int count) {
//If there is a specific shard passed, validate if there any or just 1 replica left
if (slice != null) {
Collection<Replica> allReplicasForShard = slice.getReplicas();
if (allReplicasForShard == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No replicas found in shard/collection: " +
shard + "/" + collectionName);
}
if (allReplicasForShard.size() == 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There is only one replica available in shard/collection: " +
shard + "/" + collectionName + ". Cannot delete that.");
}
if (allReplicasForShard.size() <= count) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are lesser num replicas requested to be deleted than are available in shard/collection : " +
shard + "/" + collectionName + " Requested: " + count + " Available: " + allReplicasForShard.size() + ".");
}
}
}
void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException {
Replica replica = slice.getReplica(replicaName);
if (replica == null) {
ArrayList<String> l = new ArrayList<>();
for (Replica r : slice.getReplicas())
l.add(r.getName());
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
+ shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " +
shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
}
// If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
// on the command.
if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
+ " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
"Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName +
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
}
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
@ -140,7 +265,7 @@ public class DeleteReplicaCmd implements Cmd {
try {
if (!callable.call())
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
} catch (InterruptedException | KeeperException e) {
throw e;
} catch (Exception ex) {
@ -150,6 +275,7 @@ public class DeleteReplicaCmd implements Cmd {
} else {
ocmh.tpe.submit(callable);
}
}
}
}

View File

@ -34,10 +34,8 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,7 +43,6 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.util.StrUtils.formatString;
public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

View File

@ -116,6 +116,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
@ -491,16 +492,15 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}),
DELETEREPLICA_OP(DELETEREPLICA, (req, rsp, h) -> {
Map<String, Object> map = req.getParams().required().getAll(null,
COLLECTION_PROP,
SHARD_ID_PROP,
REPLICA_PROP);
COLLECTION_PROP);
req.getParams().getAll(map,
return req.getParams().getAll(map,
DELETE_INDEX,
DELETE_DATA_DIR,
DELETE_INSTANCE_DIR);
return req.getParams().getAll(map, ONLY_IF_DOWN);
DELETE_INSTANCE_DIR,
COUNT_PROP, REPLICA_PROP,
SHARD_ID_PROP,
ONLY_IF_DOWN);
}),
MIGRATE_OP(MIGRATE, (req, rsp, h) -> {
Map<String, Object> map = req.getParams().required().getAll(null, COLLECTION_PROP, "split.key", "target.collection");

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -51,6 +52,9 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOW
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REQUESTSTATUS;
import org.apache.solr.client.solrj.response.RequestStatusState;
public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
@ -120,6 +124,7 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
}
}
protected void tryToRemoveOnlyIfDown(String collectionName, CloudSolrClient client, Replica replica, String shard) throws IOException, SolrServerException {
Map m = makeMap("collection", collectionName,
"action", DELETEREPLICA.toLower(),
@ -133,10 +138,10 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
}
static void removeAndWaitForReplicaGone(String COLL_NAME,
CloudSolrClient client, Replica replica, String shard)
throws SolrServerException, IOException, InterruptedException {
CloudSolrClient client, Replica replica, String shard)
throws SolrServerException, IOException, InterruptedException {
Map m = makeMap("collection", COLL_NAME, "action", DELETEREPLICA.toLower(), "shard",
shard, "replica", replica.getName());
shard, "replica", replica.getName());
SolrParams params = new MapSolrParams(m);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
@ -146,11 +151,11 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
DocCollection testcoll = null;
while (! timeout.hasTimedOut()) {
testcoll = client.getZkStateReader()
.getClusterState().getCollection(COLL_NAME);
.getClusterState().getCollection(COLL_NAME);
success = testcoll.getSlice(shard).getReplica(replica.getName()) == null;
if (success) {
log.info("replica cleaned up {}/{} core {}",
shard + "/" + replica.getName(), replica.getStr("core"));
shard + "/" + replica.getName(), replica.getStr("core"));
log.info("current state {}", testcoll);
break;
}
@ -159,6 +164,44 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
assertTrue("Replica not cleaned up", success);
}
protected void tryRemoveReplicaByCountAndShard(String collectionName, CloudSolrClient client, int count, String shard) throws IOException, SolrServerException {
Map m = makeMap("collection", collectionName,
"action", DELETEREPLICA.toLower(),
"shard", shard,
"count", count);
SolrParams params = new MapSolrParams(m);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
client.request(request);
}
protected void tryRemoveReplicaByCountAsync(String collectionName, CloudSolrClient client, int count, String requestid) throws IOException, SolrServerException {
Map m = makeMap("collection", collectionName,
"action", DELETEREPLICA.toLower(),
"count", count,
"async", requestid);
SolrParams params = new MapSolrParams(m);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
client.request(request);
}
protected String trackRequestStatus(CloudSolrClient client, String requestId) throws IOException, SolrServerException {
Map m = makeMap("action", REQUESTSTATUS.toLower(),
"requestid", requestId);
SolrParams params = new MapSolrParams(m);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
NamedList<Object> resultsList = client.request(request);
NamedList innerResponse = (NamedList) resultsList.get("status");
return (String) innerResponse.get("state");
}
protected void createCollection(String COLL_NAME, CloudSolrClient client) throws Exception {
int replicationFactor = 2;
int numShards = 2;
@ -212,4 +255,90 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
assertFalse("Instance directory still exists", FileUtils.fileExists(instanceDir));
assertFalse("DataDirectory still exists", FileUtils.fileExists(dataDir));
}
@Test
@ShardsFixed(num = 4)
public void deleteReplicaByCount() throws Exception {
String collectionName = "deleteByCount";
try (CloudSolrClient client = createCloudClient(null)) {
createCollection(collectionName, 1, 3, 5);
waitForRecoveriesToFinish(collectionName, false);
DocCollection testcoll = getCommonCloudSolrClient().getZkStateReader()
.getClusterState().getCollection(collectionName);
Collection<Slice> slices = testcoll.getActiveSlices();
assertEquals(slices.size(), 1);
for (Slice individualShard: slices) {
assertEquals(individualShard.getReplicas().size(),3);
}
try {
// Should not be able to delete 2 replicas (non leader ones for a given shard
tryRemoveReplicaByCountAndShard(collectionName, client, 2, "shard1");
testcoll = getCommonCloudSolrClient().getZkStateReader()
.getClusterState().getCollection(collectionName);
slices = testcoll.getActiveSlices();
assertEquals(slices.size(), 1);
for (Slice individualShard: slices) {
assertEquals(individualShard.getReplicas().size(),1);
}
} catch (SolrException se) {
fail("Should have been able to remove the replica successfully");
}
}
}
@Test
@ShardsFixed(num = 4)
public void deleteReplicaByCountForAllShards() throws Exception {
String collectionName = "deleteByCountNew";
try (CloudSolrClient client = createCloudClient(null)) {
createCollection(collectionName, 2, 2, 5);
waitForRecoveriesToFinish(collectionName, false);
DocCollection testcoll = getCommonCloudSolrClient().getZkStateReader()
.getClusterState().getCollection(collectionName);
Collection<Slice> slices = testcoll.getActiveSlices();
assertEquals(slices.size(), 2);
for (Slice individualShard: slices) {
assertEquals(individualShard.getReplicas().size(),2);
}
String requestIdAsync = "1000";
try {
// Should not be able to delete 2 replicas from all shards (non leader ones)
tryRemoveReplicaByCountAsync(collectionName, client, 1, requestIdAsync);
//Make sure request completes
String requestStatus = trackRequestStatus(client, requestIdAsync);
while ((!requestStatus.equals(RequestStatusState.COMPLETED.getKey())) && (!requestStatus.equals(RequestStatusState.FAILED.getKey()))) {
requestStatus = trackRequestStatus(client, requestIdAsync);
}
testcoll = getCommonCloudSolrClient().getZkStateReader()
.getClusterState().getCollection(collectionName);
slices = testcoll.getActiveSlices();
assertEquals(slices.size(), 2);
for (Slice individualShard: slices) {
assertEquals(individualShard.getReplicas().size(),1);
}
} catch (SolrException se) {
fail("Should have been able to remove the replica successfully");
}
}
}
}

View File

@ -43,6 +43,8 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
/**
* This class is experimental and subject to change.
*
@ -1531,6 +1533,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
protected Boolean onlyIfDown;
private Boolean deleteDataDir;
private Boolean deleteInstanceDir;
private Integer count;
private Boolean deleteIndexDir;
/**
@ -1579,10 +1582,8 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return this;
}
@Override
@Deprecated
public DeleteReplica setAsyncId(String id) {
this.asyncId = id;
public DeleteReplica setCount(Integer count) {
this.count = count;
return this;
}
@ -1603,6 +1604,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
if (deleteIndexDir != null) {
params.set(CoreAdminParams.DELETE_INDEX, deleteIndexDir);
}
if (count != null) {
params.set(COUNT_PROP, deleteIndexDir);
}
return params;
}

View File

@ -16,11 +16,14 @@
*/
package org.apache.solr.common.params;
public abstract class CollectionAdminParams {
public interface CollectionAdminParams {
/* Param used by DELETESTATUS call to clear all stored responses */
public static final String FLUSH = "flush";
String FLUSH = "flush";
String COLLECTION = "collection";
String COUNT_PROP = "count";
public static final String COLLECTION = "collection";
}