diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index e01036684fa..1107c566592 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -164,6 +164,8 @@ Bug Fixes * SOLR-10169: PeerSync will hit an NPE on no response errors when looking for fingerprint. (Erick Erickson) +* SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed (Cao Manh Dat) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 872a8b9d7e1..8cd02b6b01a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -65,6 +66,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.BeforeReconnect; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DefaultConnectionStrategy; import org.apache.solr.common.cloud.DefaultZkACLProvider; import org.apache.solr.common.cloud.DefaultZkCredentialsProvider; @@ -1033,42 +1035,39 @@ public class ZkController { try { // pre register has published our down state final String baseUrl = getBaseUrl(); - final CloudDescriptor cloudDesc = desc.getCloudDescriptor(); final String collection = cloudDesc.getCollectionName(); - - final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName(); + final String shardId = cloudDesc.getShardId(); + final String coreZkNodeName = cloudDesc.getCoreNodeName(); assert coreZkNodeName != null : "we should have a coreNodeName by now"; + // check replica's existence in clusterstate first + try { + zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100, + TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null); + } catch (TimeoutException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate"); + } + Replica replica = getReplicaOrNull(zkStateReader.getClusterState().getCollectionOrNull(collection), shardId, coreZkNodeName); + if (replica == null) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate"); + } + ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId()); // This flag is used for testing rolling updates and should be removed in SOLR-11812 boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new")); - if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) { + if (isRunningInNewLIR && replica.getType() != Type.PULL) { shardTerms.registerTerm(coreZkNodeName); } - String shardId = cloudDesc.getShardId(); - Map props = new HashMap<>(); - // we only put a subset of props into the leader node - props.put(ZkStateReader.BASE_URL_PROP, baseUrl); - props.put(ZkStateReader.CORE_NAME_PROP, coreName); - props.put(ZkStateReader.NODE_NAME_PROP, getNodeName()); - + log.debug("Register replica - core:{} address:{} collection:{} shard:{}", - coreName, baseUrl, cloudDesc.getCollectionName(), shardId); - - ZkNodeProps leaderProps = new ZkNodeProps(props); + coreName, baseUrl, collection, shardId); try { // If we're a preferred leader, insert ourselves at the head of the queue - boolean joinAtHead = false; - final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection); - Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreZkNodeName); - if (replica != null) { - joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false); - } - //TODO WHy would replica be null? - if (replica == null || replica.getType() != Type.PULL) { + boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false); + if (replica.getType() != Type.PULL) { joinElection(desc, afterExpiration, joinAtHead); } else if (replica.getType() == Type.PULL) { if (joinAtHead) { @@ -1093,9 +1092,8 @@ public class ZkController { String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); log.debug("We are " + ourUrl + " and leader is " + leaderUrl); boolean isLeader = leaderUrl.equals(ourUrl); - Replica.Type replicaType = zkStateReader.getClusterState().getCollection(collection).getReplica(coreZkNodeName).getType(); - assert !(isLeader && replicaType == Type.PULL): "Pull replica became leader!"; - + assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!"; + try (SolrCore core = cc.getCore(desc.getName())) { // recover from local transaction log and wait for it to complete before @@ -1105,7 +1103,7 @@ public class ZkController { // leader election perhaps? UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - boolean isTlogReplicaAndNotLeader = replicaType == Replica.Type.TLOG && !isLeader; + boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader; if (isTlogReplicaAndNotLeader) { String commitVersion = ReplicateFromLeader.getCommitVersion(core); if (commitVersion != null) { @@ -1138,23 +1136,40 @@ public class ZkController { publish(desc, Replica.State.ACTIVE); } - if (isRunningInNewLIR && replicaType != Type.PULL) { + if (isRunningInNewLIR && replica.getType() != Type.PULL) { + // the watcher is added to a set so multiple calls of this method will left only one watcher shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer())); } core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true); + } catch (Exception e) { + unregister(coreName, desc, false); + throw e; } // make sure we have an update cluster state right away zkStateReader.forceUpdateCollection(collection); + // the watcher is added to a set so multiple calls of this method will left only one watcher + zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(), + new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName())); return shardId; - } catch (Exception e) { - unregister(coreName, desc, false); - throw e; } finally { MDCLoggingContext.clear(); } } + private Replica getReplicaOrNull(DocCollection docCollection, String shard, String coreNodeName) { + if (docCollection == null) return null; + + Slice slice = docCollection.getSlice(shard); + if (slice == null) return null; + + Replica replica = slice.getReplica(coreNodeName); + if (replica == null) return null; + if (!getNodeName().equals(replica.getNodeName())) return null; + + return replica; + } + public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException { log.info("{} starting background replication from leader", coreName); ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName); @@ -1359,11 +1374,7 @@ public class ZkController { } public void publish(final CoreDescriptor cd, final Replica.State state) throws Exception { - publish(cd, state, true); - } - - public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception { - publish(cd, state, updateLastState, false); + publish(cd, state, true, false); } /** @@ -1430,6 +1441,9 @@ public class ZkController { props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId()); props.put(ZkStateReader.COLLECTION_PROP, collection); props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString()); + if (!Overseer.isLegacy(zkStateReader)) { + props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false"); + } if (numShards != null) { props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString()); } @@ -1521,7 +1535,6 @@ public class ZkController { } } CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); - zkStateReader.unregisterCore(cloudDescriptor.getCollectionName()); if (removeCoreFromZk) { ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName, @@ -1653,7 +1666,6 @@ public class ZkController { "Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" : "Registering watch for collection {}", collectionName); - zkStateReader.registerCore(collectionName); } catch (KeeperException e) { log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); @@ -2707,6 +2719,56 @@ public class ZkController { }; } + private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher { + String coreNodeName; + String shard; + String coreName; + + public UnloadCoreOnDeletedWatcher(String coreNodeName, String shard, String coreName) { + this.coreNodeName = coreNodeName; + this.shard = shard; + this.coreName = coreName; + } + + @Override + // synchronized due to SOLR-11535 + public synchronized boolean onStateChanged(Set liveNodes, DocCollection collectionState) { + if (getCoreContainer().getCoreDescriptor(coreName) == null) return true; + + boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null; + if (replicaRemoved) { + try { + log.info("Replica {} removed from clusterstate, remove it.", coreName); + getCoreContainer().unload(coreName, true, true, true); + } catch (SolrException e) { + if (!e.getMessage().contains("Cannot unload non-existent core")) { + // no need to log if the core was already unloaded + log.warn("Failed to unregister core:{}", coreName, e); + } + } catch (Exception e) { + log.warn("Failed to unregister core:{}", coreName, e); + } + } + return replicaRemoved; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UnloadCoreOnDeletedWatcher that = (UnloadCoreOnDeletedWatcher) o; + return Objects.equals(coreNodeName, that.coreNodeName) && + Objects.equals(shard, that.shard) && + Objects.equals(coreName, that.coreName); + } + + @Override + public int hashCode() { + + return Objects.hash(coreNodeName, shard, coreName); + } + } + /** * Thrown during leader initiated recovery process if current node is not leader */ diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index f89367fdbc1..34e57643eca 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -222,22 +222,6 @@ public class ZkContainer { public ZkController getZkController() { return zkController; } - - public void publishCoresAsDown(List cores) { - - for (SolrCore core : cores) { - try { - zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN); - } catch (KeeperException e) { - ZkContainer.log.error("", e); - } catch (InterruptedException e) { - Thread.interrupted(); - ZkContainer.log.error("", e); - } catch (Exception e) { - ZkContainer.log.error("", e); - } - } - } public void close() { diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 5f4bc018789..c02271e8403 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -40,7 +40,6 @@ import org.apache.solr.api.Api; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder; -import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard; import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.client.solrj.util.SolrIdentifierValidator; @@ -282,7 +281,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission * In SOLR-11739 we change the way the async IDs are checked to decide if one has * already been used or not. For backward compatibility, we continue to check in the * old way (meaning, in all the queues) for now. This extra check should be removed - * in Solr 9 + * in Solr 9 */ private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true; @@ -306,7 +305,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission } NamedList r = new NamedList<>(); - + if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && ( coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) || coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) || @@ -1162,26 +1161,15 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission // Wait till we have an active leader boolean success = false; - for (int i = 0; i < 10; i++) { - ZkCoreNodeProps zombieLeaderProps = getZombieLeader(zkController, collectionName, sliceId); - if (zombieLeaderProps != null) { - log.warn("A replica {} on node {} won the leader election, but not exist in clusterstate, " + - "remove it and waiting for another round of election", - zombieLeaderProps.getCoreName(), zombieLeaderProps.getNodeName()); - try (HttpSolrClient solrClient = new HttpSolrClient.Builder(zombieLeaderProps.getBaseUrl()).build()) { - CoreAdminRequest.unloadCore(zombieLeaderProps.getCoreName(), solrClient); - } - // waiting for another election round - i = 0; - } - clusterState = zkController.getClusterState(); + for (int i = 0; i < 9; i++) { + Thread.sleep(5000); + clusterState = handler.coreContainer.getZkController().getClusterState(); collection = clusterState.getCollection(collectionName); slice = collection.getSlice(sliceId); if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) { success = true; break; } - Thread.sleep(5000); log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice); } @@ -1198,25 +1186,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission } } - /** - * Zombie leader is a replica won the election but does not exist in clusterstate - * @return null if the zombie leader does not exist - */ - private static ZkCoreNodeProps getZombieLeader(ZkController zkController, String collection, String shardId) { - try { - ZkCoreNodeProps leaderProps = zkController.getLeaderProps(collection, shardId, 1000); - DocCollection docCollection = zkController.getClusterState().getCollection(collection); - Replica replica = docCollection.getReplica(leaderProps.getNodeProps().getStr(ZkStateReader.CORE_NODE_NAME_PROP)); - if (replica == null) return leaderProps; - if (!replica.getNodeName().equals(leaderProps.getNodeName())) { - return leaderProps; - } - return null; - } catch (Exception e) { - return null; - } - } - public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse) throws KeeperException, InterruptedException { diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java index d9dbba04a9b..8c11713c37e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.EnumSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -34,11 +35,13 @@ import org.apache.solr.client.solrj.request.CoreStatus; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.cloud.ZkStateReaderAccessor; import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; import org.apache.solr.core.ZkContainer; @@ -86,12 +89,17 @@ public class DeleteReplicaTest extends SolrCloudTestCase { assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains("state is 'active'")); assertTrue("Data directory for " + replica.getName() + " should not have been deleted", Files.exists(dataDir)); + JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica); + ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader()); + Set watchers = accessor.getStateWatchers(collectionName); CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName()) .process(cluster.getSolrClient()); waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> { Slice testShard = c.getSlice(shard.getName()); return testShard.getReplica(replica.getName()) == null; }); + // the core no longer watch collection state since it was removed + assertEquals(watchers.size() - 1, accessor.getStateWatchers(collectionName).size()); assertFalse("Data directory for " + replica.getName() + " should have been removed", Files.exists(dataDir)); @@ -165,8 +173,63 @@ public class DeleteReplicaTest extends SolrCloudTestCase { } @Test + public void deleteReplicaFromClusterState() throws Exception { + deleteReplicaFromClusterState("true"); + deleteReplicaFromClusterState("false"); + CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient()); + } + + public void deleteReplicaFromClusterState(String legacyCloud) throws Exception { + CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient()); + final String collectionName = "deleteFromClusterState_"+legacyCloud; + CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3) + .process(cluster.getSolrClient()); + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1")); + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2")); + cluster.getSolrClient().commit(collectionName); + + Slice shard = getCollectionState(collectionName).getSlice("shard1"); + Replica replica = getRandomReplica(shard); + JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica); + ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader()); + Set watchers = accessor.getStateWatchers(collectionName); + + ZkNodeProps m = new ZkNodeProps( + Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), + ZkStateReader.CORE_NAME_PROP, replica.getCoreName(), + ZkStateReader.NODE_NAME_PROP, replica.getNodeName(), + ZkStateReader.COLLECTION_PROP, collectionName, + ZkStateReader.CORE_NODE_NAME_PROP, replica.getName(), + ZkStateReader.BASE_URL_PROP, replica.getBaseUrl()); + Overseer.getStateUpdateQueue(cluster.getZkClient()).offer(Utils.toJSON(m)); + + waitForState("Timeout waiting for replica get deleted", collectionName, + (liveNodes, collectionState) -> collectionState.getSlice("shard1").getReplicas().size() == 2); + + TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME); + timeOut.waitFor("Waiting for replica get unloaded", () -> + replicaJetty.getCoreContainer().getCoreDescriptor(replica.getCoreName()) == null + ); + // the core no longer watch collection state since it was removed + timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME); + timeOut.waitFor("Waiting for watcher get removed", () -> + watchers.size() - 1 == accessor.getStateWatchers(collectionName).size() + ); + + CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient()); + } + + @Test + @Slow public void raceConditionOnDeleteAndRegisterReplica() throws Exception { - final String collectionName = "raceDeleteReplica"; + raceConditionOnDeleteAndRegisterReplica("true"); + raceConditionOnDeleteAndRegisterReplica("false"); + CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient()); + } + + public void raceConditionOnDeleteAndRegisterReplica(String legacyCloud) throws Exception { + CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient()); + final String collectionName = "raceDeleteReplica_"+legacyCloud; CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2) .process(cluster.getSolrClient()); waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2)); @@ -246,15 +309,16 @@ public class DeleteReplicaTest extends SolrCloudTestCase { ZkContainer.testing_beforeRegisterInZk = null; } - - waitForState("Timeout for replica:"+replica1.getName()+" register itself as DOWN after failed to register", collectionName, (liveNodes, collectionState) -> { - Slice shard = collectionState.getSlice("shard1"); - Replica replica = shard.getReplica(replica1.getName()); - return replica != null && replica.getState() == DOWN; - }); - - CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") - .process(cluster.getSolrClient()); + while (true) { + try { + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .process(cluster.getSolrClient()); + break; + } catch (Exception e) { + // expected, when the node is not fully started + Thread.sleep(500); + } + } waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2)); String leaderJettyNodeName = leaderJetty.getNodeName(); diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java index beaeb2465e0..013434c2184 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java @@ -62,81 +62,6 @@ public class ForceLeaderTest extends HttpPartitionTest { } - /** - * Tests that FORCELEADER can get an active leader even in the case there are a replica won the election but not present in clusterstate - */ - @Test - @Slow - public void testZombieLeader() throws Exception { - String testCollectionName = "forceleader_zombie_leader_collection"; - createCollection(testCollectionName, "conf1", 1, 3, 1); - cloudClient.setDefaultCollection(testCollectionName); - try { - List notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive); - assertEquals("Expected 2 replicas for collection " + testCollectionName - + " but found " + notLeaders.size() + "; clusterState: " - + printClusterStateInfo(testCollectionName), 2, notLeaders.size()); - List notLeaderJetties = notLeaders.stream().map(rep -> getJettyOnPort(getReplicaPort(rep))) - .collect(Collectors.toList()); - - Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1); - JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader)); - - // remove leader from clusterstate - ZkNodeProps m = new ZkNodeProps( - Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), - ZkStateReader.CORE_NAME_PROP, leader.getCoreName(), - ZkStateReader.NODE_NAME_PROP, leader.getNodeName(), - ZkStateReader.COLLECTION_PROP, testCollectionName, - ZkStateReader.CORE_NODE_NAME_PROP, leader.getName(), - ZkStateReader.BASE_URL_PROP, leader.getBaseUrl()); - Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()).offer(Utils.toJSON(m)); - - boolean restartOtherReplicas = random().nextBoolean(); - log.info("Starting test with restartOtherReplicas:{}", restartOtherReplicas); - if (restartOtherReplicas) { - for (JettySolrRunner notLeaderJetty : notLeaderJetties) { - notLeaderJetty.stop(); - } - } - cloudClient.waitForState(testCollectionName, 30, TimeUnit.SECONDS, - (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2); - - if (restartOtherReplicas) { - for (JettySolrRunner notLeaderJetty : notLeaderJetties) { - notLeaderJetty.start(); - } - } - - log.info("Before forcing leader: " + cloudClient.getZkStateReader().getClusterState() - .getCollection(testCollectionName).getSlice(SHARD1)); - doForceLeader(cloudClient, testCollectionName, SHARD1); - - // By now we have an active leader. Wait for recoveries to begin - waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true); - ClusterState clusterState = cloudClient.getZkStateReader().getClusterState(); - log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1)); - - assertNull("Expected zombie leader get deleted", leaderJetty.getCoreContainer().getCore(leader.getCoreName())); - Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader(); - assertNotNull(newLeader); - assertEquals(State.ACTIVE, newLeader.getState()); - - int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1); - assertEquals(2, numActiveReplicas); - - // Assert that indexing works again - sendDoc(1); - cloudClient.commit(); - - assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1); - } finally { - log.info("Cleaning up after the test."); - // try to clean up - attemptCollectionDelete(cloudClient, testCollectionName); - } - } - /** * Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live */ diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java index 0879063aac7..652a2e2b595 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java @@ -60,9 +60,6 @@ import org.slf4j.LoggerFactory; public class MoveReplicaTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static ZkStateReaderAccessor accessor; - private static int overseerLeaderIndex; - // used by MoveReplicaHDFSTest protected boolean inPlaceMove = true; @@ -78,14 +75,12 @@ public class MoveReplicaTest extends SolrCloudTestCase { JettySolrRunner jetty = cluster.getJettySolrRunner(i); if (jetty.getNodeName().equals(overseerLeader)) { overseerJetty = jetty; - overseerLeaderIndex = i; break; } } if (overseerJetty == null) { fail("no overseer leader!"); } - accessor = new ZkStateReaderAccessor(overseerJetty.getCoreContainer().getZkController().getZkStateReader()); } protected String getSolrXml() { @@ -137,8 +132,6 @@ public class MoveReplicaTest extends SolrCloudTestCase { } } - Set watchers = new HashSet<>(accessor.getStateWatchers(coll)); - int sourceNumCores = getNumOfCores(cloudClient, replica.getNodeName(), coll); int targetNumCores = getNumOfCores(cloudClient, targetNode, coll); @@ -201,9 +194,6 @@ public class MoveReplicaTest extends SolrCloudTestCase { assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound()); - Set newWatchers = new HashSet<>(accessor.getStateWatchers(coll)); - assertEquals(watchers, newWatchers); - moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId); moveReplica.setInPlaceMove(inPlaceMove); moveReplica.process(cloudClient); @@ -243,8 +233,6 @@ public class MoveReplicaTest extends SolrCloudTestCase { } } assertTrue("replica never fully recovered", recovered); - newWatchers = new HashSet<>(accessor.getStateWatchers(coll)); - assertEquals(watchers, newWatchers); assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound()); } @@ -258,8 +246,6 @@ public class MoveReplicaTest extends SolrCloudTestCase { CloudSolrClient cloudClient = cluster.getSolrClient(); - Set watchers = new HashSet<>(accessor.getStateWatchers(coll)); - CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION); create.setAutoAddReplicas(false); cloudClient.request(create); @@ -303,9 +289,6 @@ public class MoveReplicaTest extends SolrCloudTestCase { } assertFalse(success); - Set newWatchers = new HashSet<>(accessor.getStateWatchers(coll)); - assertEquals(watchers, newWatchers); - log.info("--- current collection state: " + cloudClient.getZkStateReader().getClusterState().getCollection(coll)); assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound()); } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index b0b591a4721..7d5401dad5b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -1572,8 +1572,12 @@ public class ZkStateReader implements Closeable { return v; }); for (CollectionStateWatcher watcher : watchers) { - if (watcher.onStateChanged(liveNodes, collectionState)) { - removeCollectionStateWatcher(collection, watcher); + try { + if (watcher.onStateChanged(liveNodes, collectionState)) { + removeCollectionStateWatcher(collection, watcher); + } + } catch (Throwable throwable) { + LOG.warn("Error on calling watcher", throwable); } } }