SOLR-10239: MOVEREPLICA API

This commit is contained in:
Cao Manh Dat 2017-04-06 15:48:38 +07:00
parent 37b6c60548
commit 9c2ef561e5
8 changed files with 497 additions and 3 deletions

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.*;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
public class MoveReplicaCmd implements Cmd{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@Override
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
moveReplica(ocmh.zkStateReader.getClusterState(), message, results);
}
private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
log.info("moveReplica() : {}", Utils.toJSONString(message));
ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
String collection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr("targetNode");
String async = message.getStr(ASYNC);
DocCollection coll = clusterState.getCollection(collection);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
}
Replica replica = null;
if (message.containsKey(REPLICA_PROP)) {
String replicaName = message.getStr(REPLICA_PROP);
replica = coll.getReplica(replicaName);
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " replica: " + replicaName + " does not exist");
}
} else {
ocmh.checkRequired(message, SHARD_ID_PROP, "fromNode");
String fromNode = message.getStr("fromNode");
String shardId = message.getStr(SHARD_ID_PROP);
Slice slice = clusterState.getCollection(collection).getSlice(shardId);
List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
Collections.shuffle(sliceReplicas, RANDOM);
for (Replica r : slice.getReplicas()) {
if (r.getNodeName().equals(fromNode)) {
replica = r;
}
}
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " node: " + fromNode + " do not have any replica belong to shard: " + shardId);
}
}
log.info("Replica will be moved {}", replica);
Slice slice = null;
for (Slice s : coll.getSlices()) {
if (s.getReplicas().contains(replica)) {
slice = s;
}
}
assert slice != null;
Object dataDir = replica.get("dataDir");
if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice);
} else {
moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice);
}
}
private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice) throws Exception {
String newCoreName = Assign.buildCoreName(coll, slice.getName());
ZkNodeProps removeReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
REPLICA_PROP, replica.getName()
);
removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
NamedList deleteResult = new NamedList();
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
coll.getName(), slice.getName(), replica.getName());
log.warn(errorString);
results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
return;
}
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
CoreAdminParams.NODE, targetNode,
CoreAdminParams.NAME, newCoreName,
CoreAdminParams.DATA_DIR, dataDir);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
log.warn(errorString);
results.add("failure", errorString);
return;
} else {
String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
"to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
results.add("success", successString);
}
}
private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice) throws Exception {
String newCoreName = Assign.buildCoreName(coll, slice.getName());
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
CoreAdminParams.NODE, targetNode,
CoreAdminParams.NAME, newCoreName);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
log.warn(errorString);
results.add("failure", errorString);
return;
}
ZkNodeProps removeReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
REPLICA_PROP, replica.getName());
if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
NamedList deleteResult = new NamedList();
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
coll.getName(), slice.getName(), replica.getName());
log.warn(errorString);
results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
} else {
String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
"to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
results.add("success", successString);
}
}
}

View File

@ -208,6 +208,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
.put(DELETESHARD, new DeleteShardCmd(this))
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
.put(ADDREPLICA, new AddReplicaCmd(this))
.put(MOVEREPLICA, new MoveReplicaCmd(this))
.build()
;
}

View File

@ -859,6 +859,16 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
return null;
}),
REPLACENODE_OP(REPLACENODE, (req, rsp, h) -> req.getParams().required().getAll(req.getParams().getAll(null, "parallel"), "source", "target")),
MOVEREPLICA_OP(MOVEREPLICA, (req, rsp, h) -> {
Map<String, Object> map = req.getParams().required().getAll(null,
COLLECTION_PROP);
return req.getParams().getAll(map,
"fromNode",
"targetNode",
"replica",
"shard");
}),
DELETENODE_OP(DELETENODE, (req, rsp, h) -> req.getParams().required().getAll(null, "node"));
public final CollectionOp fun;
CollectionAction action;
@ -881,7 +891,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
for (CollectionOperation op : values()) {
if (op.action == action) return op;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "No such action" + action);
throw new SolrException(ErrorCode.SERVER_ERROR, "No such action " + action);
}
@Override

View File

@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.junit.BeforeClass;
import org.junit.Test;
@ -178,11 +179,22 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
//expected
}
String replica = shard1.getReplicas().iterator().next().getName();
Replica replica = shard1.getReplicas().iterator().next();
for (String liveNode : client.getZkStateReader().getClusterState().getLiveNodes()) {
if (!replica.getNodeName().equals(liveNode)) {
state = new CollectionAdminRequest.MoveReplica(collection, replica.getName(), liveNode)
.processAndWait(client, MAX_TIMEOUT_SECONDS);
assertSame("MoveReplica did not complete", RequestStatusState.COMPLETED, state);
break;
}
}
shard1 = client.getZkStateReader().getClusterState().getSlice(collection, "shard1");
String replicaName = shard1.getReplicas().iterator().next().getName();
state = new CollectionAdminRequest.DeleteReplica()
.setCollectionName(collection)
.setShardName("shard1")
.setReplica(replica)
.setReplica(replicaName)
.processAndWait(client, MAX_TIMEOUT_SECONDS);
assertSame("DeleteReplica did not complete", RequestStatusState.COMPLETED, state);

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MoveReplicaTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
.configure();
}
protected String getSolrXml() {
return "solr.xml";
}
@Test
public void test() throws Exception {
cluster.waitForAllNodes(5000);
String coll = "movereplicatest_coll";
log.info("total_jettys: " + cluster.getJettySolrRunners().size());
CloudSolrClient cloudClient = cluster.getSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, 2);
create.setMaxShardsPerNode(2);
cloudClient.request(create);
Replica replica = getRandomReplica(coll, cloudClient);
Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
ArrayList<String> l = new ArrayList<>(liveNodes);
Collections.shuffle(l, random());
String targetNode = null;
for (String node : liveNodes) {
if (!replica.getNodeName().equals(node)) {
targetNode = node;
break;
}
}
assertNotNull(targetNode);
String shardId = null;
for (Slice slice : cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlices()) {
if (slice.getReplicas().contains(replica)) {
shardId = slice.getName();
}
}
CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(coll, replica.getName(), targetNode);
moveReplica.processAsync("000", cloudClient);
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
// wait for async request success
boolean success = false;
for (int i = 0; i < 200; i++) {
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
success = true;
break;
}
assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
Thread.sleep(50);
}
assertTrue(success);
checkNumOfCores(cloudClient, replica.getNodeName(), 0);
checkNumOfCores(cloudClient, targetNode, 2);
moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName());
moveReplica.process(cloudClient);
checkNumOfCores(cloudClient, replica.getNodeName(), 1);
checkNumOfCores(cloudClient, targetNode, 1);
}
private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) {
List<Replica> replicas = cloudClient.getZkStateReader().getClusterState().getCollection(coll).getReplicas();
Collections.shuffle(replicas, random());
return replicas.get(0);
}
private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, int expectedCores) throws IOException, SolrServerException {
assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient, nodeName));
}
private int getNumOfCores(CloudSolrClient cloudClient, String nodeName) throws IOException, SolrServerException {
try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(nodeName))) {
CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
return status.getCoreStatus().size();
}
}
}

View File

@ -16,15 +16,37 @@
*/
package org.apache.solr.cloud.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Metric;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreStatus;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.cloud.CollectionsAPIDistributedZkTest;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@Slow
@Nightly
@ -59,4 +81,96 @@ public class HdfsCollectionsAPIDistributedZkTest extends CollectionsAPIDistribut
System.clearProperty("solr.hdfs.home");
}
@Test
public void moveReplicaTest() throws Exception {
cluster.waitForAllNodes(5000);
String coll = "movereplicatest_coll";
CloudSolrClient cloudClient = cluster.getSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf", 2, 2);
create.setMaxShardsPerNode(2);
cloudClient.request(create);
for (int i = 0; i < 10; i++) {
cloudClient.add(coll, sdoc("id",String.valueOf(i)));
cloudClient.commit(coll);
}
List<Slice> slices = new ArrayList<>(cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlices());
Collections.shuffle(slices, random());
Slice slice = null;
Replica replica = null;
for (Slice s : slices) {
slice = s;
for (Replica r : s.getReplicas()) {
if (s.getLeader() != r) {
replica = r;
}
}
}
String dataDir = getDataDir(replica);
Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
ArrayList<String> l = new ArrayList<>(liveNodes);
Collections.shuffle(l, random());
String targetNode = null;
for (String node : liveNodes) {
if (!replica.getNodeName().equals(node)) {
targetNode = node;
break;
}
}
assertNotNull(targetNode);
CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(coll, replica.getName(), targetNode);
moveReplica.process(cloudClient);
checkNumOfCores(cloudClient, replica.getNodeName(), 0);
checkNumOfCores(cloudClient, targetNode, 2);
waitForState("Wait for recovery finish failed",coll, clusterShape(2,2));
slice = cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlice(slice.getName());
boolean found = false;
for (Replica newReplica : slice.getReplicas()) {
if (getDataDir(newReplica).equals(dataDir)) {
found = true;
}
}
assertTrue(found);
// data dir is reused so replication will be skipped
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
SolrMetricManager manager = jetty.getCoreContainer().getMetricManager();
List<String> registryNames = manager.registryNames().stream()
.filter(s -> s.startsWith("solr.core.")).collect(Collectors.toList());
for (String registry : registryNames) {
Map<String, Metric> metrics = manager.registry(registry).getMetrics();
Counter counter = (Counter) metrics.get("REPLICATION./replication.requests");
if (counter != null) {
assertEquals(0, counter.getCount());
}
}
}
}
private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, int expectedCores) throws IOException, SolrServerException {
assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient, nodeName));
}
private int getNumOfCores(CloudSolrClient cloudClient, String nodeName) throws IOException, SolrServerException {
try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(nodeName))) {
CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
return status.getCoreStatus().size();
}
}
private String getDataDir(Replica replica) throws IOException, SolrServerException {
try (HttpSolrClient coreclient = getHttpSolrClient(replica.getBaseUrl())) {
CoreStatus status = CoreAdminRequest.getCoreStatus(replica.getCoreName(), coreclient);
return status.getDataDirectory();
}
}
}

View File

@ -613,6 +613,44 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
public static class MoveReplica extends AsyncCollectionAdminRequest {
String collection, replica, targetNode;
String shard, fromNode;
boolean randomlyMoveReplica;
public MoveReplica(String collection, String replica, String targetNode) {
super(CollectionAction.MOVEREPLICA);
this.collection = collection;
this.replica = replica;
this.targetNode = targetNode;
this.randomlyMoveReplica = false;
}
public MoveReplica(String collection, String shard, String fromNode, String targetNode) {
super(CollectionAction.MOVEREPLICA);
this.collection = collection;
this.shard = shard;
this.fromNode = fromNode;
this.targetNode = targetNode;
this.randomlyMoveReplica = true;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
params.set("collection", collection);
params.set("targetNode", targetNode);
if (randomlyMoveReplica) {
params.set("shard", shard);
params.set("fromNode", fromNode);
} else {
params.set("replica", replica);
}
return params;
}
}
/*
* Returns a RebalanceLeaders object to rebalance leaders for a collection
*/

View File

@ -80,6 +80,7 @@ public interface CollectionParams {
REQUESTSTATUS(false, LockLevel.NONE),
DELETESTATUS(false, LockLevel.NONE),
ADDREPLICA(true, LockLevel.SHARD),
MOVEREPLICA(true, LockLevel.SHARD),
OVERSEERSTATUS(false, LockLevel.NONE),
LIST(false, LockLevel.NONE),
CLUSTERSTATUS(false, LockLevel.NONE),