diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index ab6edf0b66c..0c069d91c51 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -122,6 +122,9 @@ Optimizations * SOLR-9142: JSON Facet API: new method=dvhash can be chosen for fields with high cardinality. (David Smiley) +* SOLR-9446: Leader failure after creating a freshly replicated index can send nodes into recovery even if + index was not changed (Pushkar Raste, noble) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index 66f81c391b9..2c690a6eb49 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -112,6 +112,12 @@ public class RealTimeGetComponent extends SearchComponent return; } + val = params.get("getFingerprint"); + if(val != null) { + processGetFingeprint(rb); + return; + } + val = params.get("getVersions"); if (val != null) { processGetVersions(rb); @@ -597,6 +603,17 @@ public class RealTimeGetComponent extends SearchComponent return null; } + + + public void processGetFingeprint(ResponseBuilder rb) throws IOException { + SolrQueryRequest req = rb.req; + SolrParams params = req.getParams(); + + long maxVersion = params.getLong("getFingerprint", Long.MAX_VALUE); + IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Math.abs(maxVersion)); + rb.rsp.add("fingerprint", fingerprint); + } + /////////////////////////////////////////////////////////////////////////////////// // Returns last versions added to index diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index 1eb69a5b16d..dfacd4b89fb 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -188,6 +188,11 @@ public class PeerSync { log.debug(msg() + "startingVersions=" + startingVersions.size() + " " + startingVersions); } } + // check if we already in sync to begin with + if(doFingerprint && alreadyInSync()) { + return true; + } + // Fire off the requests before getting our own recent updates (for better concurrency) // This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates, @@ -277,6 +282,51 @@ public class PeerSync { MDCLoggingContext.clear(); } } + + /** + * Check if we are already in sync. Simple fingerprint comparison should do + */ + private boolean alreadyInSync() { + for (String replica : replicas) { + requestFingerprint(replica); + } + + for (;;) { + ShardResponse srsp = shardHandler.takeCompletedOrError(); + if (srsp == null) break; + + try { + IndexFingerprint otherFingerprint = IndexFingerprint.fromObject(srsp.getSolrResponse().getResponse().get("fingerprint")); + IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE); + if(IndexFingerprint.compare(otherFingerprint, ourFingerprint) == 0) { + log.info("We are already in sync. No need to do a PeerSync "); + return true; + } + } catch(IOException e) { + log.warn("Could not cofirm if we are already in sync. Continue with PeerSync"); + } + } + + return false; + } + + + private void requestFingerprint(String replica) { + SyncShardRequest sreq = new SyncShardRequest(); + requests.add(sreq); + + sreq.shards = new String[]{replica}; + sreq.actualShards = sreq.shards; + sreq.params = new ModifiableSolrParams(); + sreq.params = new ModifiableSolrParams(); + sreq.params.set("qt","/get"); + sreq.params.set("distrib",false); + sreq.params.set("getFingerprint", String.valueOf(Long.MAX_VALUE)); + + shardHandler.submit(sreq, replica, sreq.params); + } + + private void requestVersions(String replica) { SyncShardRequest sreq = new SyncShardRequest(); diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java new file mode 100644 index 00000000000..348532c308a --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.cloud.ZkTestServer.LimitViolationAction; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.Slice.State; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.core.Diagnostics; +import org.apache.solr.handler.ReplicationHandler; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.singletonList; + +/** + * + * Test for SOLR-9446 + * + * This test is modeled after SyncSliceTest + */ +@Slow +public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestBase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private boolean success = false; + int docId = 0; + + List nodesDown = new ArrayList<>(); + + @Override + public void distribTearDown() throws Exception { + if (!success) { + printLayoutOnTearDown = true; + } + System.clearProperty("solr.directoryFactory"); + System.clearProperty("solr.ulog.numRecordsToKeep"); + System.clearProperty("tests.zk.violationReportAction"); + super.distribTearDown(); + } + + public LeaderFailureAfterFreshStartTest() { + super(); + sliceCount = 1; + fixShardCount(3); + } + + protected String getCloudSolrConfig() { + return "solrconfig-tlog.xml"; + } + + @Override + public void distribSetUp() throws Exception { + // tlog gets deleted after node restarts if we use CachingDirectoryFactory. + // make sure that tlog stays intact after we restart a node + System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); + System.setProperty("solr.ulog.numRecordsToKeep", "1000"); + System.setProperty("tests.zk.violationReportAction", LimitViolationAction.IGNORE.toString()); + super.distribSetUp(); + } + + @Test + public void test() throws Exception { + handle.clear(); + handle.put("timestamp", SKIPVAL); + + try { + CloudJettyRunner initialLeaderJetty = shardToLeaderJetty.get("shard1"); + List otherJetties = getOtherAvailableJetties(initialLeaderJetty); + + log.info("Leader node_name: {}, url: {}", initialLeaderJetty.coreNodeName, initialLeaderJetty.url); + for (CloudJettyRunner cloudJettyRunner : otherJetties) { + log.info("Nonleader node_name: {}, url: {}", cloudJettyRunner.coreNodeName, cloudJettyRunner.url); + } + + CloudJettyRunner secondNode = otherJetties.get(0); + CloudJettyRunner freshNode = otherJetties.get(1); + + // shutdown a node to simulate fresh start + otherJetties.remove(freshNode); + forceNodeFailures(singletonList(freshNode)); + + del("*:*"); + waitForThingsToLevelOut(30); + + checkShardConsistency(false, true); + + // index a few docs and commit + for (int i = 0; i < 100; i++) { + indexDoc(id, docId, i1, 50, tlong, 50, t1, + "document number " + docId++); + } + commit(); + waitForThingsToLevelOut(30); + + checkShardConsistency(false, true); + + // start the freshNode + ChaosMonkey.start(freshNode.jetty); + nodesDown.remove(freshNode); + + waitTillNodesActive(); + waitForThingsToLevelOut(30); + + //TODO check how to see if fresh node went into recovery (may be check count for replication handler on new leader) + + long numRequestsBefore = (Long) secondNode.jetty + .getCoreContainer() + .getCores() + .iterator() + .next() + .getRequestHandler(ReplicationHandler.PATH) + .getStatistics().get("requests"); + + // shutdown the original leader + log.info("Now shutting down initial leader"); + forceNodeFailures(singletonList(initialLeaderJetty)); + waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info , 15); + log.info("Updating mappings from zk"); + updateMappingsFromZk(jettys, clients, true); + + long numRequestsAfter = (Long) secondNode.jetty + .getCoreContainer() + .getCores() + .iterator() + .next() + .getRequestHandler(ReplicationHandler.PATH) + .getStatistics().get("requests"); + + assertEquals("Node went into replication", numRequestsBefore, numRequestsAfter); + + success = true; + } finally { + System.clearProperty("solr.disableFingerprint"); + } + } + + + private void forceNodeFailures(List replicasToShutDown) throws Exception { + for (CloudJettyRunner replicaToShutDown : replicasToShutDown) { + chaosMonkey.killJetty(replicaToShutDown); + waitForNoShardInconsistency(); + } + + int totalDown = 0; + + Set jetties = new HashSet<>(); + jetties.addAll(shardToJetty.get("shard1")); + + if (replicasToShutDown != null) { + jetties.removeAll(replicasToShutDown); + totalDown += replicasToShutDown.size(); + } + + jetties.removeAll(nodesDown); + totalDown += nodesDown.size(); + + assertEquals(getShardCount() - totalDown, jetties.size()); + + nodesDown.addAll(replicasToShutDown); + } + + + static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs) + throws Exception { + log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs); + boolean waitForLeader = true; + int i = 0; + ZkStateReader zkStateReader = cloudClient.getZkStateReader(); + zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION); + + while(waitForLeader) { + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection coll = clusterState.getCollection("collection1"); + Slice slice = coll.getSlice(shardName); + if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) { + log.info("New leader got elected in {} secs", i); + break; + } + + if(i == maxWaitInSecs) { + Diagnostics.logThreadDumps("Could not find new leader in specified timeout"); + zkStateReader.getZkClient().printLayoutToStdOut(); + fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs"); + } + + i++; + Thread.sleep(1000); + } + } + + + + private void waitTillNodesActive() throws Exception { + for (int i = 0; i < 60; i++) { + Thread.sleep(3000); + ZkStateReader zkStateReader = cloudClient.getZkStateReader(); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection collection1 = clusterState.getCollection("collection1"); + Slice slice = collection1.getSlice("shard1"); + Collection replicas = slice.getReplicas(); + boolean allActive = true; + + Collection replicasToCheck = null; + replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName())) + .collect(Collectors.toList()); + + for (Replica replica : replicasToCheck) { + if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) { + allActive = false; + break; + } + } + if (allActive) { + return; + } + } + printLayout(); + fail("timeout waiting to see all nodes active"); + } + + + private List getOtherAvailableJetties(CloudJettyRunner leader) { + List candidates = new ArrayList<>(); + candidates.addAll(shardToJetty.get("shard1")); + + if (leader != null) { + candidates.remove(leader); + } + + candidates.removeAll(nodesDown); + + return candidates; + } + + protected void indexDoc(Object... fields) throws IOException, + SolrServerException { + SolrInputDocument doc = new SolrInputDocument(); + + addFields(doc, fields); + addFields(doc, "rnd_s", RandomStringUtils.random(random().nextInt(100) + 100)); + + UpdateRequest ureq = new UpdateRequest(); + ureq.add(doc); + ModifiableSolrParams params = new ModifiableSolrParams(); + ureq.setParams(params); + ureq.process(cloudClient); + } + + // skip the randoms - they can deadlock... + @Override + protected void indexr(Object... fields) throws Exception { + SolrInputDocument doc = new SolrInputDocument(); + addFields(doc, fields); + addFields(doc, "rnd_b", true); + indexDoc(doc); + } + +} diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java index f85b7f1b7ef..3ded7d2f754 100644 --- a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java @@ -45,6 +45,9 @@ import org.apache.solr.handler.ReplicationHandler; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import static java.util.Collections.singletonList; + /** * Test sync peer sync when a node restarts and documents are indexed when node was down. * @@ -120,7 +123,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase { otherJetties.remove(neverLeader) ; // first shutdown a node that will never be a leader - forceNodeFailures(Arrays.asList(neverLeader)); + forceNodeFailures(singletonList(neverLeader)); // node failure and recovery via PeerSync log.info("Forcing PeerSync"); @@ -144,9 +147,9 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase { // now shutdown the original leader log.info("Now shutting down initial leader"); - forceNodeFailures(Arrays.asList(initialLeaderJetty)); + forceNodeFailures(singletonList(initialLeaderJetty)); log.info("Updating mappings from zk"); - Thread.sleep(15000); // sleep for a while for leader to change ... + LeaderFailureAfterFreshStartTest.waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15); updateMappingsFromZk(jettys, clients, true); assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));