diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7f620420ec5..5e4bf518066 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -78,8 +78,11 @@ Upgrade Notes * LUCENE-8161: If you are using the spatial JTS library with Solr, you must upgrade to 1.15.0. This new version of JTS is now dual-licensed to include a BSD style license. -* SOLR-12011: Replicas which are not up-to-date are not allowed to become leader. Use FORCELEADER API to - allow these replicas become leader. +* SOLR-12051: By using term value introduced in SOLR-11702, a replica will know that it is in-sync with the leader or not. + If all the replicas who participate in the election are out-of-sync with previous leader, the election will hang for + leaderVoteWait before allowing out-of-sync with previous leader replicas become leader. Note that the new leader + still needs to contains more updates than any other active replicas in the same shard. Therefore by increasing + leaderVoteWait will increase the consistency (over availability) of the system. * SOLR-11957: The default Solr log file size and number of backups is raised to 32MB and 10 respectively @@ -373,6 +376,8 @@ Other Changes * SOLR-12047: Increase checkStateInZk timeout (Cao Manh Dat, Varun Thacker) +* SOLR-12051: Election timeout when no replicas are qualified to become leader (Cao Manh Dat) + ================== 7.2.1 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 3ba4dfce939..a7c21b4f305 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -346,7 +346,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } Replica.Type replicaType; - + String coreNodeName; try (SolrCore core = cc.getCore(coreName)) { if (core == null) { @@ -360,13 +360,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType(); - String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); + coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); // should I be leader? - if (zkController.getShardTerms(collection, shardId).registered(coreNodeName) - && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) { - log.info("Can't become leader, term of replica {} less than leader", coreNodeName); - rejoinLeaderElection(core); - return; + ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); + if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) { + if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreNodeName, leaderVoteWait)) { + log.info("Can't become leader, term of replica {} less than leader", coreNodeName); + rejoinLeaderElection(core); + return; + } } if (isClosed) { @@ -467,7 +469,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } } } - + // in case of leaderVoteWait timeout, a replica with lower term can win the election + if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)) { + zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName); + } super.runLeaderProcess(weAreReplacement, 0); try (SolrCore core = cc.getCore(coreName)) { if (core != null) { @@ -517,6 +522,53 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } } + /** + * Wait for other replicas with higher terms participate in the electioon + * @return true if after {@code timeout} there are no other replicas with higher term participate in the election, + * false if otherwise + */ + private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException { + long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); + while (!isClosed && !cc.isShutDown()) { + if (System.nanoTime() > timeoutAt) { + return true; + } + if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) { + log.info("Can't become leader, other replicas with higher term participated in leader election"); + return false; + } + Thread.sleep(500L); + } + return false; + } + + /** + * Do other replicas with higher term participated in the election + * @return true if other replicas with higher term participated in the election, false if otherwise + */ + private boolean replicasWithHigherTermParticipated(ZkShardTerms zkShardTerms, String coreNodeName) { + ClusterState clusterState = zkController.getClusterState(); + DocCollection docCollection = clusterState.getCollectionOrNull(collection); + Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId); + if (slices == null) return false; + + long replicaTerm = zkShardTerms.getTerm(coreNodeName); + boolean isRecovering = zkShardTerms.isRecovering(coreNodeName); + + for (Replica replica : slices.getReplicas()) { + if (replica.getName().equals(coreNodeName)) continue; + + if (clusterState.getLiveNodes().contains(replica.getNodeName())) { + long otherTerm = zkShardTerms.getTerm(replica.getName()); + boolean isOtherReplicaRecovering = zkShardTerms.isRecovering(replica.getName()); + + if (isRecovering && !isOtherReplicaRecovering) return true; + if (otherTerm > replicaTerm) return true; + } + } + return false; + } + public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception { if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) { ZkStateReader zkStateReader = zkController.getZkStateReader(); diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 63dfe19b1b8..912f6ec52f0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -755,13 +755,20 @@ public class RecoveryStrategy implements Runnable, Closeable { zkController.publish(coreDesc, Replica.State.DOWN); } numTried++; - final Replica leaderReplica = zkStateReader.getLeaderRetry( - cloudDesc.getCollectionName(), cloudDesc.getShardId()); + Replica leaderReplica = null; if (isClosed()) { return leaderReplica; } + try { + leaderReplica = zkStateReader.getLeaderRetry( + cloudDesc.getCollectionName(), cloudDesc.getShardId()); + } catch (SolrException e) { + Thread.sleep(500); + continue; + } + if (leaderReplica.getCoreUrl().equals(ourUrl)) { return leaderReplica; } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index ca79982b1ef..e6db14ff9a2 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -170,10 +170,16 @@ public class ZkShardTerms implements AutoCloseable{ listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms)); numListeners = listeners.size(); } + return removeTerm(cd.getCloudDescriptor().getCoreNodeName()) || numListeners == 0; + } + + // package private for testing, only used by tests + // return true if this object should not be reused + boolean removeTerm(String coreNodeName) { Terms newTerms; - while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) { + while ( (newTerms = terms.removeTerm(coreNodeName)) != null) { try { - if (saveTerms(newTerms)) return numListeners == 0; + if (saveTerms(newTerms)) return false; } catch (KeeperException.NoNodeException e) { return true; } @@ -232,6 +238,11 @@ public class ZkShardTerms implements AutoCloseable{ } } + public boolean isRecovering(String name) { + return terms.values.containsKey(name + "_recovering"); + } + + /** * When first updates come in, all replicas have some data now, * so we must switch from term 0 (registered) to 1 (have some data) diff --git a/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java new file mode 100644 index 00000000000..c83739efde3 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterStateUtil; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.update.processor.DistributedUpdateProcessor; +import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; +import org.apache.solr.util.TimeOut; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +@LuceneTestCase.Nightly +@LuceneTestCase.Slow +@Deprecated +public class LIROnShardRestartTest extends SolrCloudTestCase { + + @BeforeClass + public static void setupCluster() throws Exception { + System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); + System.setProperty("solr.ulog.numRecordsToKeep", "1000"); + + configureCluster(3) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + } + + @AfterClass + public static void tearDownCluster() throws Exception { + System.clearProperty("solr.directoryFactory"); + System.clearProperty("solr.ulog.numRecordsToKeep"); + } + + public void testAllReplicasInLIR() throws Exception { + String collection = "allReplicasInLIR"; + CollectionAdminRequest.createCollection(collection, 1, 3) + .process(cluster.getSolrClient()); + cluster.getSolrClient().add(collection, new SolrInputDocument("id", "1")); + cluster.getSolrClient().add(collection, new SolrInputDocument("id", "2")); + cluster.getSolrClient().commit(collection); + + DocCollection docCollection = getCollectionState(collection); + Slice shard1 = docCollection.getSlice("shard1"); + Replica newLeader = shard1.getReplicas(rep -> !rep.getName().equals(shard1.getLeader().getName())).get(random().nextInt(2)); + JettySolrRunner jettyOfNewLeader = cluster.getJettySolrRunners().stream() + .filter(jetty -> jetty.getNodeName().equals(newLeader.getNodeName())) + .findAny().get(); + assertNotNull(jettyOfNewLeader); + + // randomly add too many docs to peer sync to one replica so that only one random replica is the valid leader + // the versions don't matter, they just have to be higher than what the last 2 docs got + try (HttpSolrClient client = getHttpSolrClient(jettyOfNewLeader.getBaseUrl().toString())) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString()); + + for (int i = 0; i < 101; i++) { + UpdateRequest ureq = new UpdateRequest(); + ureq.setParams(new ModifiableSolrParams(params)); + ureq.add(sdoc("id", 3 + i, "_version_", Long.MAX_VALUE - 1 - i)); + ureq.process(client, collection); + } + client.commit(collection); + } + + ChaosMonkey.stop(cluster.getJettySolrRunners()); + assertTrue("Timeout waiting for all not live", + ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 45000)); + + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) { + for (Replica replica : docCollection.getReplicas()) { + zkShardTerms.removeTerm(replica.getName()); + } + } + + Map stateObj = Utils.makeMap(); + stateObj.put(ZkStateReader.STATE_PROP, "down"); + stateObj.put("createdByNodeName", "test"); + stateObj.put("createdByCoreNodeName", "test"); + byte[] znodeData = Utils.toJSON(stateObj); + + for (Replica replica : docCollection.getReplicas()) { + try { + cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(), + znodeData, true); + } catch (KeeperException.NodeExistsException e) { + + } + } + + ChaosMonkey.start(cluster.getJettySolrRunners()); + waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3)); + + assertEquals(103, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound()); + + + // now expire each node + for (Replica replica : docCollection.getReplicas()) { + try { + cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(), + znodeData, true); + } catch (KeeperException.NodeExistsException e) { + + } + } + + // only 2 replicas join the election and all of them are in LIR state, no one should win the election + List oldElectionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient()); + + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + expire(jetty); + } + + TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.CURRENT_TIME); + while (!timeOut.hasTimedOut()) { + List electionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient()); + electionNodes.retainAll(oldElectionNodes); + if (electionNodes.isEmpty()) break; + } + assertFalse("Timeout waiting for replicas rejoin election", timeOut.hasTimedOut()); + waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3)); + + assertEquals(103, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound()); + + CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient()); + } + + public void expire(JettySolrRunner jetty) { + CoreContainer cores = jetty.getCoreContainer(); + ChaosMonkey.causeConnectionLoss(jetty); + long sessionId = cores.getZkController().getZkClient() + .getSolrZooKeeper().getSessionId(); + cluster.getZkServer().expire(sessionId); + } + + + public void testSeveralReplicasInLIR() throws Exception { + String collection = "severalReplicasInLIR"; + CollectionAdminRequest.createCollection(collection, 1, 3) + .process(cluster.getSolrClient()); + cluster.getSolrClient().add(collection, new SolrInputDocument("id", "1")); + cluster.getSolrClient().add(collection, new SolrInputDocument("id", "2")); + cluster.getSolrClient().commit(collection); + + DocCollection docCollection = getCollectionState(collection); + Map nodeNameToJetty = cluster.getJettySolrRunners().stream() + .collect(Collectors.toMap(jetty -> jetty, JettySolrRunner::getNodeName)); + ChaosMonkey.stop(cluster.getJettySolrRunners()); + assertTrue("Timeout waiting for all not live", + ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 45000)); + + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) { + for (Replica replica : docCollection.getReplicas()) { + zkShardTerms.removeTerm(replica.getName()); + } + } + + Map stateObj = Utils.makeMap(); + stateObj.put(ZkStateReader.STATE_PROP, "down"); + stateObj.put("createdByNodeName", "test"); + stateObj.put("createdByCoreNodeName", "test"); + byte[] znodeData = Utils.toJSON(stateObj); + + Replica replicaNotInLIR = docCollection.getReplicas().get(random().nextInt(3)); + for (Replica replica : docCollection.getReplicas()) { + if (replica.getName().equals(replicaNotInLIR.getName())) continue; + try { + cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(), + znodeData, true); + } catch (KeeperException.NodeExistsException e) { + + } + } + + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + if (nodeNameToJetty.get(jetty).equals(replicaNotInLIR.getNodeName())) continue; + jetty.start(); + } + waitForState("Timeout waiting for no leader", collection, (liveNodes, collectionState) -> { + Replica leader = collectionState.getSlice("shard1").getLeader(); + return leader == null; + }); + + // only 2 replicas join the election and all of them are in LIR state, no one should win the election + List oldElectionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient()); + TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.CURRENT_TIME); + while (!timeOut.hasTimedOut()) { + List electionNodes = getElectionNodes(collection, "shard1", cluster.getZkClient()); + electionNodes.retainAll(oldElectionNodes); + if (electionNodes.isEmpty()) break; + } + assertFalse("Timeout waiting for replicas rejoin election", timeOut.hasTimedOut()); + + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + if (nodeNameToJetty.get(jetty).equals(replicaNotInLIR.getNodeName())) { + jetty.start(); + } + } + waitForState("Timeout waiting for new leader", collection, (liveNodes, collectionState) -> { + Replica leader = collectionState.getSlice("shard1").getLeader(); + return leader != null; + }); + waitForState("Timeout waiting for new leader", collection, clusterShape(1, 3)); + + assertEquals(2L, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound()); + CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient()); + } + + private List getElectionNodes(String collection, String shard, SolrZkClient client) throws KeeperException, InterruptedException { + return client.getChildren("/collections/"+collection+"/leader_elect/"+shard+LeaderElector.ELECTION_NODE, null, true); + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java deleted file mode 100644 index 9d2af90058c..00000000000 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.cloud; - -import java.lang.invoke.MethodHandles; -import java.util.Map; -import java.util.Properties; - -import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.LuceneTestCase.Nightly; -import org.apache.lucene.util.LuceneTestCase.Slow; -import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.client.solrj.embedded.JettySolrRunner; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException; -import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.QueryRequest; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.CollectionParams.CollectionAction; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.util.Utils; -import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; -import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Slow -@Nightly -@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10071") -@Deprecated -public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistribZkTestBase { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public LeaderInitiatedRecoveryOnShardRestartTest() throws Exception { - super(); - sliceCount = 1; - // we want 3 jetties, but we are using the control jetty as one - fixShardCount(2); - useFactory("solr.StandardDirectoryFactory"); - } - - @BeforeClass - public static void before() { - // we want more realistic leaderVoteWait so raise from - // test default of 10s to 30s. - System.setProperty("leaderVoteWait", "300000"); - } - - @AfterClass - public static void after() { - System.clearProperty("leaderVoteWait"); - } - - @Test - public void testRestartWithAllInLIR() throws Exception { - - // still waiting to be able to properly start with no default collection1, - // delete to remove confusion - waitForRecoveriesToFinish(false); - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set("action", CollectionAction.DELETE.toString()); - params.set("name", DEFAULT_COLLECTION); - QueryRequest request = new QueryRequest(params); - request.setPath("/admin/collections"); - String baseUrl = ((HttpSolrClient) clients.get(0)).getBaseURL(); - HttpSolrClient delClient = getHttpSolrClient(baseUrl.substring(0, baseUrl.lastIndexOf("/"))); - delClient.request(request); - delClient.close(); - - String testCollectionName = "all_in_lir"; - String shardId = "shard1"; - CollectionAdminRequest.createCollection(testCollectionName, "conf1", 1, 3) - .setCreateNodeSet("") - .process(cloudClient); - Properties oldLir = new Properties(); - oldLir.setProperty("lirVersion", "old"); - for (int i = 0; i < 3; i++) { - CollectionAdminRequest.addReplicaToShard(testCollectionName, "shard1").setProperties(oldLir).process(cloudClient); - } - - waitForRecoveriesToFinish(testCollectionName, false); - - cloudClient.setDefaultCollection(testCollectionName); - - Map stateObj = Utils.makeMap(); - stateObj.put(ZkStateReader.STATE_PROP, "down"); - stateObj.put("createdByNodeName", "test"); - stateObj.put("createdByCoreNodeName", "test"); - - byte[] znodeData = Utils.toJSON(stateObj); - - SolrZkClient zkClient = cloudClient.getZkStateReader().getZkClient(); - zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node1", znodeData, true); - zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node2", znodeData, true); - zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node3", znodeData, true); - - // everyone gets a couple docs so that everyone has tlog entries - // and won't become leader simply because they have no tlog versions - SolrInputDocument doc = new SolrInputDocument(); - addFields(doc, "id", "1"); - SolrInputDocument doc2 = new SolrInputDocument(); - addFields(doc2, "id", "2"); - cloudClient.add(doc); - cloudClient.add(doc2); - - cloudClient.commit(); - - assertEquals("We just added 2 docs, we should be able to find them", 2, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound()); - - // randomly add too many docs to peer sync to one replica so that only one random replica is the valid leader - // the versions don't matter, they just have to be higher than what the last 2 docs got - HttpSolrClient client = (HttpSolrClient) clients.get(random().nextInt(clients.size())); - client.setBaseURL(client.getBaseURL().substring(0, client.getBaseURL().lastIndexOf("/")) + "/" + testCollectionName); - params = new ModifiableSolrParams(); - params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - - try { - for (int i = 0; i < 101; i++) { - add(client, params, sdoc("id", 3 + i, "_version_", Long.MAX_VALUE - 1 - i)); - } - } catch (RemoteSolrException e) { - // if we got a conflict it's because we tried to send a versioned doc to the leader, - // resend without version - if (e.getMessage().contains("conflict")) { - for (int i = 0; i < 101; i++) { - add(client, params, sdoc("id", 3 + i)); - } - } - } - - client.commit(); - - for (JettySolrRunner jetty : jettys) { - ChaosMonkey.stop(jetty); - } - ChaosMonkey.stop(controlJetty); - - Thread.sleep(10000); - - log.info("Start back up"); - - for (JettySolrRunner jetty : jettys) { - ChaosMonkey.start(jetty); - } - ChaosMonkey.start(controlJetty); - - // recoveries will not finish without SOLR-8075 and SOLR-8367 - waitForRecoveriesToFinish(testCollectionName, true); - - // now expire each node - try { - zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node1", znodeData, true); - } catch (NodeExistsException e) { - - } - try { - zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node2", znodeData, true); - } catch (NodeExistsException e) { - - } - try { - zkClient.makePath("/collections/" + testCollectionName + "/leader_initiated_recovery/" + shardId + "/core_node3", znodeData, true); - } catch (NodeExistsException e) { - - } - - for (JettySolrRunner jetty : jettys) { - chaosMonkey.expireSession(jetty); - } - - Thread.sleep(2000); - - // recoveries will not finish without SOLR-8075 and SOLR-8367 - waitForRecoveriesToFinish(testCollectionName, true); - } -} diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderVoteWaitTimeoutTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderVoteWaitTimeoutTest.java new file mode 100644 index 00000000000..e487fcbfffd --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderVoteWaitTimeoutTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.solr.JSONTestUtil; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.CollectionStatePredicate; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.util.NamedList; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LeaderVoteWaitTimeoutTest extends SolrCloudTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static Map proxies; + private static Map jettys; + + @BeforeClass + public static void setupCluster() throws Exception { + System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); + System.setProperty("solr.ulog.numRecordsToKeep", "1000"); + System.setProperty("leaderVoteWait", "2000"); + + configureCluster(4) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + + // Add proxies + proxies = new HashMap<>(cluster.getJettySolrRunners().size()); + jettys = new HashMap<>(); + for (JettySolrRunner jetty:cluster.getJettySolrRunners()) { + SocketProxy proxy = new SocketProxy(); + jetty.setProxyPort(proxy.getListenPort()); + cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart + cluster.startJettySolrRunner(jetty); + proxy.open(jetty.getBaseUrl().toURI()); + LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl()); + proxies.put(jetty, proxy); + jettys.put(proxy.getUrl(), jetty); + } + } + + @AfterClass + public static void tearDownCluster() throws Exception { + for (SocketProxy proxy:proxies.values()) { + proxy.close(); + } + proxies = null; + jettys = null; + System.clearProperty("solr.directoryFactory"); + System.clearProperty("solr.ulog.numRecordsToKeep"); + System.clearProperty("leaderVoteWait"); + } + + @Test + public void basicTest() throws Exception { + final String collectionName = "basicTest"; + CollectionAdminRequest.createCollection(collectionName, 1, 1) + .setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName()) + .process(cluster.getSolrClient()); + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1")); + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2")); + cluster.getSolrClient().commit(collectionName); + + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) { + assertEquals(1, zkShardTerms.getTerms().size()); + assertEquals(1L, zkShardTerms.getHighestTerm()); + } + + cluster.getJettySolrRunner(0).stop(); + + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .setNode(cluster.getJettySolrRunner(1).getNodeName()) + .process(cluster.getSolrClient()); + + waitForState("Timeout waiting for replica win the election", collectionName, (liveNodes, collectionState) -> { + Replica newLeader = collectionState.getSlice("shard1").getLeader(); + return newLeader.getNodeName().equals(cluster.getJettySolrRunner(1).getNodeName()); + }); + + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) { + Replica newLeader = getCollectionState(collectionName).getSlice("shard1").getLeader(); + assertEquals(2, zkShardTerms.getTerms().size()); + assertEquals(1L, zkShardTerms.getTerm(newLeader.getName())); + } + + cluster.getJettySolrRunner(0).start(); + CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient()); + } + + @Test + public void testMostInSyncReplicasCanWinElection() throws Exception { + final String collectionName = "collection1"; + CollectionAdminRequest.createCollection(collectionName, 1, 3) + .setCreateNodeSet("") + .process(cluster.getSolrClient()); + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .setNode(cluster.getJettySolrRunner(0).getNodeName()) + .process(cluster.getSolrClient()); + waitForState("Timeout waiting for shard leader", collectionName, clusterShape(1, 1)); + Replica leader = getCollectionState(collectionName).getSlice("shard1").getLeader(); + + // this replica will ahead of election queue + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .setNode(cluster.getJettySolrRunner(1).getNodeName()) + .process(cluster.getSolrClient()); + waitForState("Timeout waiting for 1x2 collection", collectionName, clusterShape(1, 2)); + Replica replica1 = getCollectionState(collectionName).getSlice("shard1") + .getReplicas(replica -> replica.getNodeName().equals(cluster.getJettySolrRunner(1).getNodeName())).get(0); + + CollectionAdminRequest.addReplicaToShard(collectionName, "shard1") + .setNode(cluster.getJettySolrRunner(2).getNodeName()) + .process(cluster.getSolrClient()); + waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3)); + Replica replica2 = getCollectionState(collectionName).getSlice("shard1") + .getReplicas(replica -> replica.getNodeName().equals(cluster.getJettySolrRunner(2).getNodeName())).get(0); + + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1")); + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2")); + cluster.getSolrClient().commit(collectionName); + + // replica in node1 won't be able to do recovery + proxies.get(cluster.getJettySolrRunner(0)).close(); + // leader won't be able to send request to replica in node1 + proxies.get(cluster.getJettySolrRunner(1)).close(); + + addDoc(collectionName, 3, cluster.getJettySolrRunner(0)); + + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) { + assertEquals(3, zkShardTerms.getTerms().size()); + assertEquals(zkShardTerms.getHighestTerm(), zkShardTerms.getTerm(leader.getName())); + assertEquals(zkShardTerms.getHighestTerm(), zkShardTerms.getTerm(replica2.getName())); + assertTrue(zkShardTerms.getHighestTerm() > zkShardTerms.getTerm(replica1.getName())); + } + + proxies.get(cluster.getJettySolrRunner(2)).close(); + addDoc(collectionName, 4, cluster.getJettySolrRunner(0)); + + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", cluster.getZkClient())) { + assertEquals(3, zkShardTerms.getTerms().size()); + assertEquals(zkShardTerms.getHighestTerm(), zkShardTerms.getTerm(leader.getName())); + assertTrue(zkShardTerms.getHighestTerm() > zkShardTerms.getTerm(replica2.getName())); + assertTrue(zkShardTerms.getHighestTerm() > zkShardTerms.getTerm(replica1.getName())); + assertTrue(zkShardTerms.getTerm(replica2.getName()) > zkShardTerms.getTerm(replica1.getName())); + } + + proxies.get(cluster.getJettySolrRunner(1)).reopen(); + proxies.get(cluster.getJettySolrRunner(2)).reopen(); + cluster.getJettySolrRunner(0).stop(); + + // even replica2 joined election at the end of the queue, but it is the one with highest term + waitForState("Timeout waiting for new leader", collectionName, new CollectionStatePredicate() { + @Override + public boolean matches(Set liveNodes, DocCollection collectionState) { + Replica newLeader = collectionState.getSlice("shard1").getLeader(); + return newLeader.getName().equals(replica2.getName()); + } + }); + + cluster.getJettySolrRunner(0).start(); + proxies.get(cluster.getJettySolrRunner(0)).reopen(); + + waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3)); + assertDocsExistInAllReplicas(Arrays.asList(leader, replica1), collectionName, 1, 3); + CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient()); + } + + + private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException { + try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) { + solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId))); + solrClient.commit(collection); + } + } + + private void assertDocsExistInAllReplicas(List notLeaders, + String testCollectionName, int firstDocId, int lastDocId) throws Exception { + Replica leader = + cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000); + HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName); + List replicas = + new ArrayList(notLeaders.size()); + + for (Replica r : notLeaders) { + replicas.add(getHttpSolrClient(r, testCollectionName)); + } + try { + for (int d = firstDocId; d <= lastDocId; d++) { + String docId = String.valueOf(d); + assertDocExists(leaderSolr, testCollectionName, docId); + for (HttpSolrClient replicaSolr : replicas) { + assertDocExists(replicaSolr, testCollectionName, docId); + } + } + } finally { + if (leaderSolr != null) { + leaderSolr.close(); + } + for (HttpSolrClient replicaSolr : replicas) { + replicaSolr.close(); + } + } + } + + private void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception { + NamedList rsp = realTimeGetDocId(solr, docId); + String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId); + assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL() + + " due to: " + match + "; rsp="+rsp, match == null); + } + + private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException { + QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false")); + return solr.request(qr); + } + + protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception { + ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica); + String url = zkProps.getBaseUrl() + "/" + coll; + return getHttpSolrClient(url); + } + + +} diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java index ee8bf51ac5e..38b154d23fb 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java @@ -57,6 +57,7 @@ public class TestCloudConsistency extends SolrCloudTestCase { public static void setupCluster() throws Exception { System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); System.setProperty("solr.ulog.numRecordsToKeep", "1000"); + System.setProperty("leaderVoteWait", "60000"); configureCluster(4) .addConfig("conf", configset("cloud-minimal")) @@ -83,6 +84,9 @@ public class TestCloudConsistency extends SolrCloudTestCase { } proxies = null; jettys = null; + System.clearProperty("solr.directoryFactory"); + System.clearProperty("solr.ulog.numRecordsToKeep"); + System.clearProperty("leaderVoteWait"); } @Test @@ -262,15 +266,4 @@ public class TestCloudConsistency extends SolrCloudTestCase { return getHttpSolrClient(url); } - - protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception { - String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); - assertNotNull(replicaBaseUrl); - URL baseUrl = new URL(replicaBaseUrl); - - JettySolrRunner proxy = jettys.get(baseUrl.toURI()); - assertNotNull("No proxy found for " + baseUrl + "!", proxy); - return proxy; - } - } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java index 07e440a1bfc..9ffea0f009e 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java @@ -83,7 +83,7 @@ public class MiniSolrCloudCluster { " ${hostContext:solr}\n" + " ${solr.zkclienttimeout:30000}\n" + " ${genericCoreNodeNames:true}\n" + - " 10000\n" + + " ${leaderVoteWait:10000}\n" + " ${distribUpdateConnTimeout:45000}\n" + " ${distribUpdateSoTimeout:340000}\n" + " ${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider} \n" +