diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 360916be91e..403b6cd27e8 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -299,6 +299,8 @@ Bug Fixes * SOLR-6807: CloudSolrClient's ZK state version check with the server was ignored when handleSelect=false (David Smiley) +* SOLR-10878: MOVEREPLICA command may lose data when replicationFactor is 1. (ab, shalin) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java index 63acdd12b9a..c42d073b0fb 100644 --- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java @@ -68,7 +68,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) throws KeeperException, InterruptedException { - log.info("addReplica() : {}", Utils.toJSONString(message)); + log.debug("addReplica() : {}", Utils.toJSONString(message)); String collection = message.getStr(COLLECTION_PROP); String node = message.getStr(CoreAdminParams.NODE); String shard = message.getStr(SHARD_ID_PROP); diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java index b79fa46e6a6..e71d7e89c3d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java @@ -265,7 +265,7 @@ public class DeleteReplicaCmd implements Cmd { try { if (!callable.call()) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName); + "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName); } catch (InterruptedException | KeeperException e) { throw e; } catch (Exception ex) { diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java index fed1398e858..53d05e135b1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; @@ -29,6 +31,7 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Utils; @@ -56,10 +59,11 @@ public class MoveReplicaCmd implements Cmd{ } private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { - log.info("moveReplica() : {}", Utils.toJSONString(message)); + log.debug("moveReplica() : {}", Utils.toJSONString(message)); ocmh.checkRequired(message, COLLECTION_PROP, "targetNode"); String collection = message.getStr(COLLECTION_PROP); String targetNode = message.getStr("targetNode"); + int timeout = message.getInt("timeout", 10 * 60); // 10 minutes String async = message.getStr(ASYNC); @@ -103,14 +107,14 @@ public class MoveReplicaCmd implements Cmd{ assert slice != null; Object dataDir = replica.get("dataDir"); if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) { - moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice); + moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout); } else { - moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice); + moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout); } } private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async, - DocCollection coll, Replica replica, Slice slice) throws Exception { + DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception { String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType()); ZkNodeProps removeReplicasProps = new ZkNodeProps( @@ -154,7 +158,7 @@ public class MoveReplicaCmd implements Cmd{ } private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async, - DocCollection coll, Replica replica, Slice slice) throws Exception { + DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception { String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType()); ZkNodeProps addReplicasProps = new ZkNodeProps( COLLECTION_PROP, coll.getName(), @@ -163,20 +167,47 @@ public class MoveReplicaCmd implements Cmd{ CoreAdminParams.NAME, newCoreName); if(async!=null) addReplicasProps.getProperties().put(ASYNC, async); NamedList addResult = new NamedList(); + CountDownLatch countDownLatch = new CountDownLatch(1); + ReplaceNodeCmd.RecoveryWatcher watcher = null; + if (replica.equals(slice.getLeader())) { + watcher = new ReplaceNodeCmd.RecoveryWatcher(coll.getName(), slice.getName(), + replica.getName(), null, countDownLatch); + ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher); + } ocmh.addReplica(clusterState, addReplicasProps, addResult, null); if (addResult.get("failure") != null) { String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + " on node=%s", coll.getName(), slice.getName(), targetNode); log.warn(errorString); results.add("failure", errorString); + if (watcher != null) { // unregister + ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher); + } return; } + // wait for the other replica to be active if the source replica was a leader + if (watcher != null) { + try { + log.debug("Waiting for leader's replica to recover."); + if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { + String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" + + " on node=%s", coll.getName(), slice.getName(), targetNode); + log.warn(errorString); + results.add("failure", errorString); + return; + } else { + log.debug("Replica " + watcher.getRecoveredReplica() + " is active - deleting the source..."); + } + } finally { + ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher); + } + } ZkNodeProps removeReplicasProps = new ZkNodeProps( COLLECTION_PROP, coll.getName(), SHARD_ID_PROP, slice.getName(), REPLICA_PROP, replica.getName()); - if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async); + if (async != null) removeReplicasProps.getProperties().put(ASYNC, async); NamedList deleteResult = new NamedList(); ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null); if (deleteResult.get("failure") != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index a8d74e87938..2c55f3cdec4 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -419,20 +419,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException { TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS); - boolean deleted = false; - while (! timeout.hasTimedOut()) { - Thread.sleep(100); - DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName); - if(docCollection != null) { + // TODO: remove this workaround for SOLR-9440 + zkStateReader.registerCore(collectionName); + try { + while (! timeout.hasTimedOut()) { + Thread.sleep(100); + DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName); + if (docCollection == null) { // someone already deleted the collection + return true; + } Slice slice = docCollection.getSlice(shard); if(slice == null || slice.getReplica(replicaName) == null) { - deleted = true; + return true; } } - // Return true if either someone already deleted the collection/slice/replica. - if (docCollection == null || deleted) break; + // replica still exists after the timeout + return false; + } finally { + zkStateReader.unregisterCore(collectionName); } - return deleted; } void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException { diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java index 5adbe8c9577..ba60908a8d8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java @@ -93,15 +93,6 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { CountDownLatch replicasToRecover = new CountDownLatch(numLeaders); for (ZkNodeProps sourceReplica : sourceReplicas) { - if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) { - String shardName = sourceReplica.getStr(SHARD_ID_PROP); - String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP); - String collectionName = sourceReplica.getStr(COLLECTION_PROP); - String key = collectionName + "_" + replicaName; - RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover); - watchers.put(key, watcher); - zkStateReader.registerCollectionStateWatcher(collectionName, watcher); - } NamedList nl = new NamedList(); log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target); ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target); @@ -128,6 +119,16 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { if (addedReplica != null) { createdReplicas.add(addedReplica); + if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) { + String shardName = sourceReplica.getStr(SHARD_ID_PROP); + String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP); + String collectionName = sourceReplica.getStr(COLLECTION_PROP); + String key = collectionName + "_" + replicaName; + RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, + addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover); + watchers.put(key, watcher); + zkStateReader.registerCollectionStateWatcher(collectionName, watcher); + } } } @@ -208,16 +209,27 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { } // we use this watcher to wait for replicas to recover - private static class RecoveryWatcher implements CollectionStateWatcher { + static class RecoveryWatcher implements CollectionStateWatcher { String collectionId; String shardId; String replicaId; + String targetCore; CountDownLatch countDownLatch; + Replica recovered; - RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) { + /** + * Watch for recovery of a replica + * @param collectionId collection name + * @param shardId shard id + * @param replicaId source replica name (coreNodeName) + * @param targetCore specific target core name - if null then any active replica will do + * @param countDownLatch countdown when recovered + */ + RecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) { this.collectionId = collectionId; this.shardId = shardId; this.replicaId = replicaId; + this.targetCore = targetCore; this.countDownLatch = countDownLatch; } @@ -241,7 +253,12 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { continue; } // check its state + String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP); + if (targetCore != null && !targetCore.equals(coreName)) { + continue; + } if (replica.isActive(liveNodes)) { // recovered - stop waiting + recovered = replica; countDownLatch.countDown(); return true; } @@ -250,5 +267,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { // set the watch again to wait for the new replica to recover return false; } + + public Replica getRecoveredReplica() { + return recovered; + } } } diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java new file mode 100644 index 00000000000..884d49e9b0e --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java @@ -0,0 +1,53 @@ +package org.apache.solr.cloud; + +import com.carrotsearch.randomizedtesting.ThreadFilter; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.solr.cloud.hdfs.HdfsTestUtil; +import org.apache.solr.common.cloud.ZkConfigManager; +import org.apache.solr.util.BadHdfsThreadsFilter; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * + */ +@ThreadLeakFilters(defaultFilters = true, filters = { + BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s) + MoveReplicaHDFSTest.ForkJoinThreadsFilter.class +}) +public class MoveReplicaHDFSTest extends MoveReplicaTest { + + private static MiniDFSCluster dfsCluster; + + @BeforeClass + public static void setupClass() throws Exception { + dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath()); + + ZkConfigManager configManager = new ZkConfigManager(zkClient()); + configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1"); + + System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data")); + } + + @AfterClass + public static void teardownClass() throws Exception { + cluster.shutdown(); // need to close before the MiniDFSCluster + HdfsTestUtil.teardownClass(dfsCluster); + dfsCluster = null; + } + + + public static class ForkJoinThreadsFilter implements ThreadFilter { + + @Override + public boolean reject(Thread t) { + String name = t.getName(); + if (name.startsWith("ForkJoinPool.commonPool")) { + return true; + } + return false; + } + } + +} diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java index 4368fea1726..8f00431f20f 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java @@ -20,6 +20,7 @@ package org.apache.solr.cloud; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; @@ -31,8 +32,10 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.response.CoreAdminResponse; import org.apache.solr.client.solrj.response.RequestStatusState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory; public class MoveReplicaTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @BeforeClass public static void setupCluster() throws Exception { configureCluster(4) @@ -56,10 +60,11 @@ public class MoveReplicaTest extends SolrCloudTestCase { cluster.waitForAllNodes(5000); String coll = "movereplicatest_coll"; log.info("total_jettys: " + cluster.getJettySolrRunners().size()); + int REPLICATION = 2; CloudSolrClient cloudClient = cluster.getSolrClient(); - CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, 2); + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION); create.setMaxShardsPerNode(2); cloudClient.request(create); @@ -94,16 +99,87 @@ public class MoveReplicaTest extends SolrCloudTestCase { break; } assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED); - Thread.sleep(50); + Thread.sleep(500); } assertTrue(success); checkNumOfCores(cloudClient, replica.getNodeName(), 0); - checkNumOfCores(cloudClient, targetNode, 2); + assertTrue("should be at least one core on target node!", getNumOfCores(cloudClient, targetNode) > 0); + // wait for recovery + boolean recovered = false; + for (int i = 0; i < 300; i++) { + DocCollection collState = getCollectionState(coll); + log.debug("###### " + collState); + Collection replicas = collState.getSlice(shardId).getReplicas(); + boolean allActive = true; + boolean hasLeaders = true; + if (replicas != null && !replicas.isEmpty()) { + for (Replica r : replicas) { + if (!r.getNodeName().equals(targetNode)) { + continue; + } + if (!r.isActive(Collections.singleton(targetNode))) { + log.info("Not active: " + r); + allActive = false; + } + } + } else { + allActive = false; + } + for (Slice slice : collState.getSlices()) { + if (slice.getLeader() == null) { + hasLeaders = false; + } + } + if (allActive && hasLeaders) { + // check the number of active replicas + assertEquals("total number of replicas", REPLICATION, replicas.size()); + recovered = true; + break; + } else { + log.info("--- waiting, allActive=" + allActive + ", hasLeaders=" + hasLeaders); + Thread.sleep(1000); + } + } + assertTrue("replica never fully recovered", recovered); moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName()); moveReplica.process(cloudClient); checkNumOfCores(cloudClient, replica.getNodeName(), 1); - checkNumOfCores(cloudClient, targetNode, 1); + // wait for recovery + recovered = false; + for (int i = 0; i < 300; i++) { + DocCollection collState = getCollectionState(coll); + log.debug("###### " + collState); + Collection replicas = collState.getSlice(shardId).getReplicas(); + boolean allActive = true; + boolean hasLeaders = true; + if (replicas != null && !replicas.isEmpty()) { + for (Replica r : replicas) { + if (!r.getNodeName().equals(replica.getNodeName())) { + continue; + } + if (!r.isActive(Collections.singleton(replica.getNodeName()))) { + log.info("Not active yet: " + r); + allActive = false; + } + } + } else { + allActive = false; + } + for (Slice slice : collState.getSlices()) { + if (slice.getLeader() == null) { + hasLeaders = false; + } + } + if (allActive && hasLeaders) { + assertEquals("total number of replicas", REPLICATION, replicas.size()); + recovered = true; + break; + } else { + Thread.sleep(1000); + } + } + assertTrue("replica never fully recovered", recovered); } private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) {