SOLR-9461: DELETENODE, REPLACENODE should pass down the 'async' param to subcommands

This commit is contained in:
Noble Paul 2016-09-01 18:03:59 +05:30
parent 6a4184c674
commit e13f7aeafa
3 changed files with 21 additions and 5 deletions

View File

@ -101,6 +101,8 @@ Bug Fixes
* SOLR-9455: Deleting a sub-shard in recovery state can mark parent shard as inactive. (shalin) * SOLR-9455: Deleting a sub-shard in recovery state can mark parent shard as inactive. (shalin)
* SOLR-9461: DELETENODE, REPLACENODE should pass down the 'async' param to subcommands (shalin, noble)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd { public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -53,24 +55,30 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + node + " is not live"); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + node + " is not live");
} }
List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, state); List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, state);
cleanupReplicas(results, state, sourceReplicas, ocmh, node); cleanupReplicas(results, state, sourceReplicas, ocmh, node, message.getStr(ASYNC));
} }
static void cleanupReplicas(NamedList results, static void cleanupReplicas(NamedList results,
ClusterState clusterState, ClusterState clusterState,
List<ZkNodeProps> sourceReplicas, List<ZkNodeProps> sourceReplicas,
OverseerCollectionMessageHandler ocmh, String node) throws InterruptedException { OverseerCollectionMessageHandler ocmh,
String node,
String async) throws InterruptedException {
CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size()); CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
for (ZkNodeProps sourceReplica : sourceReplicas) { for (ZkNodeProps sourceReplica : sourceReplicas) {
log.info("Deleting replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), node); String coll = sourceReplica.getStr(COLLECTION_PROP);
String shard = sourceReplica.getStr(SHARD_ID_PROP);
log.info("Deleting replica for collection={} shard={} on node={}", coll, shard, node);
NamedList deleteResult = new NamedList(); NamedList deleteResult = new NamedList();
try { try {
if (async != null) sourceReplica = sourceReplica.plus(ASYNC, async);
((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> { ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
cleanupLatch.countDown(); cleanupLatch.countDown();
if (deleteResult.get("failure") != null) { if (deleteResult.get("failure") != null) {
synchronized (results) { synchronized (results) {
results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" + results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" +
" on node=%s", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), node)); " on node=%s", coll, shard, node));
} }
} }
}); });

View File

@ -34,14 +34,18 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.util.StrUtils.formatString;
public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -58,6 +62,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
ocmh.checkRequired(message, "source", "target"); ocmh.checkRequired(message, "source", "target");
String source = message.getStr("source"); String source = message.getStr("source");
String target = message.getStr("target"); String target = message.getStr("target");
String async = message.getStr("async");
boolean parallel = message.getBool("parallel", false); boolean parallel = message.getBool("parallel", false);
ClusterState clusterState = zkStateReader.getClusterState(); ClusterState clusterState = zkStateReader.getClusterState();
@ -78,6 +83,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
NamedList nl = new NamedList(); NamedList nl = new NamedList();
log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target); log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target); ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
if(async!=null) msg.getProperties().put(ASYNC, async);
final ZkNodeProps addedReplica = ocmh.addReplica(clusterState, final ZkNodeProps addedReplica = ocmh.addReplica(clusterState,
msg, nl, () -> { msg, nl, () -> {
countDownLatch.countDown(); countDownLatch.countDown();
@ -136,7 +142,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
// we have reached this far means all replicas could be recreated // we have reached this far means all replicas could be recreated
//now cleanup the replicas in the source node //now cleanup the replicas in the source node
DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ocmh, source); DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ocmh, source, async);
results.add("success", "REPLACENODE action completed successfully from : " + source + " to : " + target); results.add("success", "REPLACENODE action completed successfully from : " + source + " to : " + target);
} }