SOLR-10878: MOVEREPLICA command may lose data when replicationFactor==1.

This commit is contained in:
Andrzej Bialecki 2017-07-04 11:33:41 +02:00
parent e9d33ee5ac
commit bc37e8b4cc
8 changed files with 220 additions and 32 deletions

View File

@ -299,6 +299,8 @@ Bug Fixes
* SOLR-6807: CloudSolrClient's ZK state version check with the server was ignored when handleSelect=false * SOLR-6807: CloudSolrClient's ZK state version check with the server was ignored when handleSelect=false
(David Smiley) (David Smiley)
* SOLR-10878: MOVEREPLICA command may lose data when replicationFactor is 1. (ab, shalin)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -68,7 +68,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws KeeperException, InterruptedException { throws KeeperException, InterruptedException {
log.info("addReplica() : {}", Utils.toJSONString(message)); log.debug("addReplica() : {}", Utils.toJSONString(message));
String collection = message.getStr(COLLECTION_PROP); String collection = message.getStr(COLLECTION_PROP);
String node = message.getStr(CoreAdminParams.NODE); String node = message.getStr(CoreAdminParams.NODE);
String shard = message.getStr(SHARD_ID_PROP); String shard = message.getStr(SHARD_ID_PROP);

View File

@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -29,6 +31,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
@ -56,10 +59,11 @@ public class MoveReplicaCmd implements Cmd{
} }
private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
log.info("moveReplica() : {}", Utils.toJSONString(message)); log.debug("moveReplica() : {}", Utils.toJSONString(message));
ocmh.checkRequired(message, COLLECTION_PROP, "targetNode"); ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
String collection = message.getStr(COLLECTION_PROP); String collection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr("targetNode"); String targetNode = message.getStr("targetNode");
int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
String async = message.getStr(ASYNC); String async = message.getStr(ASYNC);
@ -103,14 +107,14 @@ public class MoveReplicaCmd implements Cmd{
assert slice != null; assert slice != null;
Object dataDir = replica.get("dataDir"); Object dataDir = replica.get("dataDir");
if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) { if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice); moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout);
} else { } else {
moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice); moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout);
} }
} }
private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async, private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice) throws Exception { DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType()); String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
ZkNodeProps removeReplicasProps = new ZkNodeProps( ZkNodeProps removeReplicasProps = new ZkNodeProps(
@ -154,7 +158,7 @@ public class MoveReplicaCmd implements Cmd{
} }
private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async, private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice) throws Exception { DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType()); String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
ZkNodeProps addReplicasProps = new ZkNodeProps( ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(), COLLECTION_PROP, coll.getName(),
@ -163,14 +167,41 @@ public class MoveReplicaCmd implements Cmd{
CoreAdminParams.NAME, newCoreName); CoreAdminParams.NAME, newCoreName);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async); if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList(); NamedList addResult = new NamedList();
CountDownLatch countDownLatch = new CountDownLatch(1);
ReplaceNodeCmd.RecoveryWatcher watcher = null;
if (replica.equals(slice.getLeader())) {
watcher = new ReplaceNodeCmd.RecoveryWatcher(coll.getName(), slice.getName(),
replica.getName(), null, countDownLatch);
ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
}
ocmh.addReplica(clusterState, addReplicasProps, addResult, null); ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
if (addResult.get("failure") != null) { if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode); " on node=%s", coll.getName(), slice.getName(), targetNode);
log.warn(errorString); log.warn(errorString);
results.add("failure", errorString); results.add("failure", errorString);
if (watcher != null) { // unregister
ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
}
return; return;
} }
// wait for the other replica to be active if the source replica was a leader
if (watcher != null) {
try {
log.debug("Waiting for leader's replica to recover.");
if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
log.warn(errorString);
results.add("failure", errorString);
return;
} else {
log.debug("Replica " + watcher.getRecoveredReplica() + " is active - deleting the source...");
}
} finally {
ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
}
}
ZkNodeProps removeReplicasProps = new ZkNodeProps( ZkNodeProps removeReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(), COLLECTION_PROP, coll.getName(),

View File

@ -419,20 +419,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException { boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS); TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS);
boolean deleted = false; // TODO: remove this workaround for SOLR-9440
zkStateReader.registerCore(collectionName);
try {
while (! timeout.hasTimedOut()) { while (! timeout.hasTimedOut()) {
Thread.sleep(100); Thread.sleep(100);
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName); DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
if(docCollection != null) { if (docCollection == null) { // someone already deleted the collection
return true;
}
Slice slice = docCollection.getSlice(shard); Slice slice = docCollection.getSlice(shard);
if(slice == null || slice.getReplica(replicaName) == null) { if(slice == null || slice.getReplica(replicaName) == null) {
deleted = true; return true;
} }
} }
// Return true if either someone already deleted the collection/slice/replica. // replica still exists after the timeout
if (docCollection == null || deleted) break; return false;
} finally {
zkStateReader.unregisterCore(collectionName);
} }
return deleted;
} }
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException { void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {

View File

@ -93,15 +93,6 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
CountDownLatch replicasToRecover = new CountDownLatch(numLeaders); CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
for (ZkNodeProps sourceReplica : sourceReplicas) { for (ZkNodeProps sourceReplica : sourceReplicas) {
if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
String shardName = sourceReplica.getStr(SHARD_ID_PROP);
String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
String collectionName = sourceReplica.getStr(COLLECTION_PROP);
String key = collectionName + "_" + replicaName;
RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
watchers.put(key, watcher);
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
}
NamedList nl = new NamedList(); NamedList nl = new NamedList();
log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target); log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target); ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@ -128,6 +119,16 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
if (addedReplica != null) { if (addedReplica != null) {
createdReplicas.add(addedReplica); createdReplicas.add(addedReplica);
if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
String shardName = sourceReplica.getStr(SHARD_ID_PROP);
String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
String collectionName = sourceReplica.getStr(COLLECTION_PROP);
String key = collectionName + "_" + replicaName;
RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName,
addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
watchers.put(key, watcher);
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
}
} }
} }
@ -208,16 +209,27 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
} }
// we use this watcher to wait for replicas to recover // we use this watcher to wait for replicas to recover
private static class RecoveryWatcher implements CollectionStateWatcher { static class RecoveryWatcher implements CollectionStateWatcher {
String collectionId; String collectionId;
String shardId; String shardId;
String replicaId; String replicaId;
String targetCore;
CountDownLatch countDownLatch; CountDownLatch countDownLatch;
Replica recovered;
RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) { /**
* Watch for recovery of a replica
* @param collectionId collection name
* @param shardId shard id
* @param replicaId source replica name (coreNodeName)
* @param targetCore specific target core name - if null then any active replica will do
* @param countDownLatch countdown when recovered
*/
RecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) {
this.collectionId = collectionId; this.collectionId = collectionId;
this.shardId = shardId; this.shardId = shardId;
this.replicaId = replicaId; this.replicaId = replicaId;
this.targetCore = targetCore;
this.countDownLatch = countDownLatch; this.countDownLatch = countDownLatch;
} }
@ -241,7 +253,12 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
continue; continue;
} }
// check its state // check its state
String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
if (targetCore != null && !targetCore.equals(coreName)) {
continue;
}
if (replica.isActive(liveNodes)) { // recovered - stop waiting if (replica.isActive(liveNodes)) { // recovered - stop waiting
recovered = replica;
countDownLatch.countDown(); countDownLatch.countDown();
return true; return true;
} }
@ -250,5 +267,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
// set the watch again to wait for the new replica to recover // set the watch again to wait for the new replica to recover
return false; return false;
} }
public Replica getRecoveredReplica() {
return recovered;
}
} }
} }

View File

@ -0,0 +1,53 @@
package org.apache.solr.cloud;
import com.carrotsearch.randomizedtesting.ThreadFilter;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
*
*/
@ThreadLeakFilters(defaultFilters = true, filters = {
BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
})
public class MoveReplicaHDFSTest extends MoveReplicaTest {
private static MiniDFSCluster dfsCluster;
@BeforeClass
public static void setupClass() throws Exception {
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
ZkConfigManager configManager = new ZkConfigManager(zkClient());
configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
}
@AfterClass
public static void teardownClass() throws Exception {
cluster.shutdown(); // need to close before the MiniDFSCluster
HdfsTestUtil.teardownClass(dfsCluster);
dfsCluster = null;
}
public static class ForkJoinThreadsFilter implements ThreadFilter {
@Override
public boolean reject(Thread t) {
String name = t.getName();
if (name.startsWith("ForkJoinPool.commonPool")) {
return true;
}
return false;
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.solr.cloud;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -31,8 +32,10 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.response.CoreAdminResponse; import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory;
public class MoveReplicaTest extends SolrCloudTestCase { public class MoveReplicaTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
configureCluster(4) configureCluster(4)
@ -56,10 +60,11 @@ public class MoveReplicaTest extends SolrCloudTestCase {
cluster.waitForAllNodes(5000); cluster.waitForAllNodes(5000);
String coll = "movereplicatest_coll"; String coll = "movereplicatest_coll";
log.info("total_jettys: " + cluster.getJettySolrRunners().size()); log.info("total_jettys: " + cluster.getJettySolrRunners().size());
int REPLICATION = 2;
CloudSolrClient cloudClient = cluster.getSolrClient(); CloudSolrClient cloudClient = cluster.getSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, 2); CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
create.setMaxShardsPerNode(2); create.setMaxShardsPerNode(2);
cloudClient.request(create); cloudClient.request(create);
@ -94,16 +99,87 @@ public class MoveReplicaTest extends SolrCloudTestCase {
break; break;
} }
assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED); assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
Thread.sleep(50); Thread.sleep(500);
} }
assertTrue(success); assertTrue(success);
checkNumOfCores(cloudClient, replica.getNodeName(), 0); checkNumOfCores(cloudClient, replica.getNodeName(), 0);
checkNumOfCores(cloudClient, targetNode, 2); assertTrue("should be at least one core on target node!", getNumOfCores(cloudClient, targetNode) > 0);
// wait for recovery
boolean recovered = false;
for (int i = 0; i < 300; i++) {
DocCollection collState = getCollectionState(coll);
log.debug("###### " + collState);
Collection<Replica> replicas = collState.getSlice(shardId).getReplicas();
boolean allActive = true;
boolean hasLeaders = true;
if (replicas != null && !replicas.isEmpty()) {
for (Replica r : replicas) {
if (!r.getNodeName().equals(targetNode)) {
continue;
}
if (!r.isActive(Collections.singleton(targetNode))) {
log.info("Not active: " + r);
allActive = false;
}
}
} else {
allActive = false;
}
for (Slice slice : collState.getSlices()) {
if (slice.getLeader() == null) {
hasLeaders = false;
}
}
if (allActive && hasLeaders) {
// check the number of active replicas
assertEquals("total number of replicas", REPLICATION, replicas.size());
recovered = true;
break;
} else {
log.info("--- waiting, allActive=" + allActive + ", hasLeaders=" + hasLeaders);
Thread.sleep(1000);
}
}
assertTrue("replica never fully recovered", recovered);
moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName()); moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName());
moveReplica.process(cloudClient); moveReplica.process(cloudClient);
checkNumOfCores(cloudClient, replica.getNodeName(), 1); checkNumOfCores(cloudClient, replica.getNodeName(), 1);
checkNumOfCores(cloudClient, targetNode, 1); // wait for recovery
recovered = false;
for (int i = 0; i < 300; i++) {
DocCollection collState = getCollectionState(coll);
log.debug("###### " + collState);
Collection<Replica> replicas = collState.getSlice(shardId).getReplicas();
boolean allActive = true;
boolean hasLeaders = true;
if (replicas != null && !replicas.isEmpty()) {
for (Replica r : replicas) {
if (!r.getNodeName().equals(replica.getNodeName())) {
continue;
}
if (!r.isActive(Collections.singleton(replica.getNodeName()))) {
log.info("Not active yet: " + r);
allActive = false;
}
}
} else {
allActive = false;
}
for (Slice slice : collState.getSlices()) {
if (slice.getLeader() == null) {
hasLeaders = false;
}
}
if (allActive && hasLeaders) {
assertEquals("total number of replicas", REPLICATION, replicas.size());
recovered = true;
break;
} else {
Thread.sleep(1000);
}
}
assertTrue("replica never fully recovered", recovered);
} }
private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) { private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) {