From f357c06276139defa26d0569fe5903cfd3d66cdb Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Mon, 12 Nov 2018 10:10:22 +0000 Subject: [PATCH] SOLR-12969: Inconsistency with leader when PeerSync return ALREADY_IN_SYNC --- .../java/org/apache/solr/update/PeerSync.java | 158 +++++++++--------- .../solr/update/PeerSyncWithLeader.java | 59 ++++++- .../apache/solr/cloud/HttpPartitionTest.java | 2 +- .../apache/solr/cloud/TestCloudRecovery2.java | 143 ++++++++++++++++ .../org/apache/solr/update/PeerSyncTest.java | 50 +++--- .../solr/update/PeerSyncWithLeaderTest.java | 18 ++ 6 files changed, 325 insertions(+), 105 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index cb7b918371f..8fd3befd2df 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -692,33 +692,18 @@ public class PeerSync implements SolrMetricProducer { } } - /** - * Helper class for doing comparison ourUpdates and other replicas's updates to find the updates that we missed - */ - public static class MissedUpdatesFinder { - private List ourUpdates; + static abstract class MissedUpdatesFinderBase { private Set ourUpdateSet; - private Set requestedUpdateSet; + private Set requestedUpdateSet = new HashSet<>(); - private long ourLowThreshold; // 20th percentile - private long ourHighThreshold; // 80th percentile - private long ourHighest; // currently just used for logging/debugging purposes - private String logPrefix; - private long nUpdates; + long ourLowThreshold; // 20th percentile + List ourUpdates; - MissedUpdatesFinder(List ourUpdates, String logPrefix, long nUpdates, - long ourLowThreshold, long ourHighThreshold) { + MissedUpdatesFinderBase(List ourUpdates, long ourLowThreshold) { assert sorted(ourUpdates); - - this.logPrefix = logPrefix; this.ourUpdates = ourUpdates; - this.ourLowThreshold = ourLowThreshold; - this.ourHighThreshold = ourHighThreshold; - this.ourHighest = ourUpdates.get(0); - this.nUpdates = nUpdates; - this.ourUpdateSet = new HashSet<>(ourUpdates); - this.requestedUpdateSet = new HashSet<>(); + this.ourLowThreshold = ourLowThreshold; } private boolean sorted(List list) { @@ -730,61 +715,7 @@ public class PeerSync implements SolrMetricProducer { return true; } - public MissedUpdatesRequest find(List otherVersions, Object updateFrom, Supplier canHandleVersionRanges) { - otherVersions.sort(absComparator); - if (debug) { - log.debug("{} sorted versions from {} = {}", logPrefix, otherVersions, updateFrom); - } - - long otherHigh = percentile(otherVersions, .2f); - long otherLow = percentile(otherVersions, .8f); - long otherHighest = otherVersions.get(0); - - if (ourHighThreshold < otherLow) { - // Small overlap between version windows and ours is older - // This means that we might miss updates if we attempted to use this method. - // Since there exists just one replica that is so much newer, we must - // fail the sync. - log.info("{} Our versions are too old. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}", - logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest); - return MissedUpdatesRequest.UNABLE_TO_SYNC; - } - - if (ourLowThreshold > otherHigh) { - // Small overlap between windows and ours is newer. - // Using this list to sync would result in requesting/replaying results we don't need - // and possibly bringing deleted docs back to life. - log.info("{} Our versions are newer. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}", - logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest); - - // Because our versions are newer, IndexFingerprint with the remote would not match us. - // We return true on our side, but the remote peersync with us should fail. - return MissedUpdatesRequest.ALREADY_IN_SYNC; - } - - boolean completeList = otherVersions.size() < nUpdates; - - MissedUpdatesRequest updatesRequest; - if (canHandleVersionRanges.get()) { - updatesRequest = handleVersionsWithRanges(otherVersions, completeList); - } else { - updatesRequest = handleIndividualVersions(otherVersions, completeList); - } - - if (updatesRequest.totalRequestedUpdates > nUpdates) { - log.info("{} PeerSync will fail because number of missed updates is more than:{}", logPrefix, nUpdates); - return MissedUpdatesRequest.UNABLE_TO_SYNC; - } - - if (updatesRequest == MissedUpdatesRequest.EMPTY) { - log.info("{} No additional versions requested. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}", - logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest); - } - - return updatesRequest; - } - - private MissedUpdatesRequest handleVersionsWithRanges(List otherVersions, boolean completeList) { + MissedUpdatesRequest handleVersionsWithRanges(List otherVersions, boolean completeList) { // we may endup asking for updates for too many versions, causing 2MB post payload limit. Construct a range of // versions to request instead of asking individual versions List rangesToRequest = new ArrayList<>(); @@ -829,7 +760,7 @@ public class PeerSync implements SolrMetricProducer { return MissedUpdatesRequest.of(rangesToRequestStr, totalRequestedVersions); } - private MissedUpdatesRequest handleIndividualVersions(List otherVersions, boolean completeList) { + MissedUpdatesRequest handleIndividualVersions(List otherVersions, boolean completeList) { List toRequest = new ArrayList<>(); for (Long otherVersion : otherVersions) { // stop when the entries get old enough that reorders may lead us to see updates we don't need @@ -848,7 +779,80 @@ public class PeerSync implements SolrMetricProducer { return MissedUpdatesRequest.of(StrUtils.join(toRequest, ','), toRequest.size()); } + } + /** + * Helper class for doing comparison ourUpdates and other replicas's updates to find the updates that we missed + */ + public static class MissedUpdatesFinder extends MissedUpdatesFinderBase { + private long ourHighThreshold; // 80th percentile + private long ourHighest; // currently just used for logging/debugging purposes + private String logPrefix; + private long nUpdates; + + MissedUpdatesFinder(List ourUpdates, String logPrefix, long nUpdates, + long ourLowThreshold, long ourHighThreshold) { + super(ourUpdates, ourLowThreshold); + + this.logPrefix = logPrefix; + this.ourHighThreshold = ourHighThreshold; + this.ourHighest = ourUpdates.get(0); + this.nUpdates = nUpdates; + } + + public MissedUpdatesRequest find(List otherVersions, Object updateFrom, Supplier canHandleVersionRanges) { + otherVersions.sort(absComparator); + if (debug) { + log.debug("{} sorted versions from {} = {}", logPrefix, otherVersions, updateFrom); + } + + long otherHigh = percentile(otherVersions, .2f); + long otherLow = percentile(otherVersions, .8f); + long otherHighest = otherVersions.get(0); + + if (ourHighThreshold < otherLow) { + // Small overlap between version windows and ours is older + // This means that we might miss updates if we attempted to use this method. + // Since there exists just one replica that is so much newer, we must + // fail the sync. + log.info("{} Our versions are too old. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}", + logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest); + return MissedUpdatesRequest.UNABLE_TO_SYNC; + } + + if (ourLowThreshold > otherHigh && ourHighest >= otherHighest) { + // Small overlap between windows and ours is newer. + // Using this list to sync would result in requesting/replaying results we don't need + // and possibly bringing deleted docs back to life. + log.info("{} Our versions are newer. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}", + logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest); + + // Because our versions are newer, IndexFingerprint with the remote would not match us. + // We return true on our side, but the remote peersync with us should fail. + return MissedUpdatesRequest.ALREADY_IN_SYNC; + } + + boolean completeList = otherVersions.size() < nUpdates; + + MissedUpdatesRequest updatesRequest; + if (canHandleVersionRanges.get()) { + updatesRequest = handleVersionsWithRanges(otherVersions, completeList); + } else { + updatesRequest = handleIndividualVersions(otherVersions, completeList); + } + + if (updatesRequest.totalRequestedUpdates > nUpdates) { + log.info("{} PeerSync will fail because number of missed updates is more than:{}", logPrefix, nUpdates); + return MissedUpdatesRequest.UNABLE_TO_SYNC; + } + + if (updatesRequest == MissedUpdatesRequest.EMPTY) { + log.info("{} No additional versions requested. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}", + logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest); + } + + return updatesRequest; + } } /** diff --git a/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java b/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java index b485727c838..ae586627c8c 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Set; +import java.util.function.Supplier; import com.codahale.metrics.Counter; import com.codahale.metrics.Timer; @@ -43,9 +44,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.solr.common.params.CommonParams.DISTRIB; +import static org.apache.solr.update.PeerSync.MissedUpdatesRequest; import static org.apache.solr.update.PeerSync.absComparator; import static org.apache.solr.update.PeerSync.percentile; -import static org.apache.solr.update.PeerSync.MissedUpdatesRequest; public class PeerSyncWithLeader implements SolrMetricProducer { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -62,7 +63,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer { private SolrCore core; private PeerSync.Updater updater; - private PeerSync.MissedUpdatesFinder missedUpdatesFinder; + private MissedUpdatesFinder missedUpdatesFinder; private Set bufferedUpdates; // metrics @@ -203,7 +204,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer { log.info("Leader fingerprint {}", leaderFingerprint); } - missedUpdatesFinder = new PeerSync.MissedUpdatesFinder(ourUpdates, msg(), nUpdates, ourLowThreshold, ourHighThreshold); + missedUpdatesFinder = new MissedUpdatesFinder(ourUpdates, msg(), nUpdates, ourLowThreshold); MissedUpdatesRequest missedUpdates = buildMissedUpdatesRequest(leaderVersionsAndFingerprint); if (missedUpdates == MissedUpdatesRequest.ALREADY_IN_SYNC) return true; if (missedUpdates != MissedUpdatesRequest.UNABLE_TO_SYNC) { @@ -369,4 +370,56 @@ public class PeerSyncWithLeader implements SolrMetricProducer { } return false; } + + /** + * Helper class for doing comparison ourUpdates and other replicas's updates to find the updates that we missed + */ + public static class MissedUpdatesFinder extends PeerSync.MissedUpdatesFinderBase { + private long ourHighest; + private String logPrefix; + private long nUpdates; + + MissedUpdatesFinder(List ourUpdates, String logPrefix, long nUpdates, + long ourLowThreshold) { + super(ourUpdates, ourLowThreshold); + + this.logPrefix = logPrefix; + this.ourHighest = ourUpdates.get(0); + this.nUpdates = nUpdates; + } + + public MissedUpdatesRequest find(List leaderVersions, Object updateFrom, Supplier canHandleVersionRanges) { + leaderVersions.sort(absComparator); + log.debug("{} sorted versions from {} = {}", logPrefix, leaderVersions, updateFrom); + + long leaderLowest = leaderVersions.get(leaderVersions.size() - 1); + if (Math.abs(ourHighest) < Math.abs(leaderLowest)) { + log.info("{} Our versions are too old comparing to leader, ourHighest={} otherLowest={}", logPrefix, ourHighest, leaderLowest); + return MissedUpdatesRequest.UNABLE_TO_SYNC; + } + // we don't have to check the case we ahead of the leader. + // (maybe we are the old leader and we contain some updates that no one have) + // In that case, we will fail on compute fingerprint with the current leader and start segments replication + + boolean completeList = leaderVersions.size() < nUpdates; + MissedUpdatesRequest updatesRequest; + if (canHandleVersionRanges.get()) { + updatesRequest = handleVersionsWithRanges(leaderVersions, completeList); + } else { + updatesRequest = handleIndividualVersions(leaderVersions, completeList); + } + + if (updatesRequest.totalRequestedUpdates > nUpdates) { + log.info("{} PeerSync will fail because number of missed updates is more than:{}", logPrefix, nUpdates); + return MissedUpdatesRequest.UNABLE_TO_SYNC; + } + + if (updatesRequest == MissedUpdatesRequest.EMPTY) { + log.info("{} No additional versions requested", logPrefix); + } + + return updatesRequest; + } + } + } diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index 7f77d5741df..b0ce886a910 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -252,7 +252,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { log.info("Looked up max version bucket seed "+maxVersionBefore+" for core "+coreName); // now up the stakes and do more docs - int numDocs = TEST_NIGHTLY ? 1000 : 100; + int numDocs = TEST_NIGHTLY ? 1000 : 105; boolean hasPartition = false; for (int d = 0; d < numDocs; d++) { // create / restore partition every 100 docs diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java new file mode 100644 index 00000000000..ae5e769d47d --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; + +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.cloud.Replica; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestCloudRecovery2 extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String COLLECTION = "collection1"; + + @BeforeClass + public static void setupCluster() throws Exception { + System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); + System.setProperty("solr.ulog.numRecordsToKeep", "1000"); + + configureCluster(2) + .addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) + .configure(); + + CollectionAdminRequest + .createCollection(COLLECTION, "config", 1,2) + .setMaxShardsPerNode(2) + .process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + } + + @Test + public void test() throws Exception { + JettySolrRunner node1 = cluster.getJettySolrRunner(0); + JettySolrRunner node2 = cluster.getJettySolrRunner(1); + try (HttpSolrClient client1 = getHttpSolrClient(node1.getBaseUrl().toString())) { + + node2.stop(); + waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1); + + UpdateRequest req = new UpdateRequest(); + for (int i = 0; i < 100; i++) { + req = req.add("id", i+"", "num", i+""); + } + req.commit(client1, COLLECTION); + + node2.start(); + waitForState("", COLLECTION, clusterShape(1, 2)); + + try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) { + long numFound = client.query(COLLECTION, new SolrQuery("q","*:*", "distrib", "false")).getResults().getNumFound(); + assertEquals(100, numFound); + } + long numFound = client1.query(COLLECTION, new SolrQuery("q","*:*", "distrib", "false")).getResults().getNumFound(); + assertEquals(100, numFound); + + new UpdateRequest().add("id", "1", "num", "10") + .commit(client1, COLLECTION); + + try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) { + Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("10", v.toString()); + } + Object v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("10", v.toString()); + + // + node2.stop(); + waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1); + + new UpdateRequest().add("id", "1", "num", "20") + .commit(client1, COLLECTION); + v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("20", v.toString()); + + node2.start(); + waitForState("", COLLECTION, clusterShape(1, 2)); + try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) { + v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("20", v.toString()); + } + + node2.stop(); + waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1); + + new UpdateRequest().add("id", "1", "num", "30") + .commit(client1, COLLECTION); + v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("30", v.toString()); + + node2.start(); + waitForState("", COLLECTION, clusterShape(1, 2)); + + try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) { + v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("30", v.toString()); + } + v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("30", v.toString()); + } + + node1.stop(); + waitForState("", COLLECTION, (liveNodes, collectionState) -> { + Replica leader = collectionState.getLeader("shard1"); + return leader != null && leader.getNodeName().equals(node2.getNodeName()); + }); + + node1.start(); + waitForState("", COLLECTION, clusterShape(1, 2)); + try (HttpSolrClient client = getHttpSolrClient(node1.getBaseUrl().toString())) { + Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("30", v.toString()); + } + try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) { + Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num"); + assertEquals("30", v.toString()); + } + + } + +} diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java index 848d1bc975f..001c7279fcb 100644 --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java @@ -42,10 +42,9 @@ import static org.junit.internal.matchers.StringContains.containsString; @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") public class PeerSyncTest extends BaseDistributedSearchTestCase { - private static int numVersions = 100; // number of versions to use when syncing - private final String FROM_LEADER = DistribPhase.FROMLEADER.toString(); - - private ModifiableSolrParams seenLeader = + protected static int numVersions = 100; // number of versions to use when syncing + protected static final String FROM_LEADER = DistribPhase.FROMLEADER.toString(); + protected static final ModifiableSolrParams seenLeader = params(DISTRIB_UPDATE_PARAM, FROM_LEADER); public PeerSyncTest() { @@ -117,24 +116,7 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase { validateDocs(docsAdded, client0, client1); - int toAdd = (int)(numVersions *.95); - for (int i=0; i docsAdded, SolrClient client0, SolrClient client1) throws SolrServerException, IOException { + protected void testOverlap(Set docsAdded, SolrClient client0, SolrClient client1, long v) throws IOException, SolrServerException { + int toAdd = (int)(numVersions *.95); + for (int i=0; i docsAdded, SolrClient client0, SolrClient client1) throws SolrServerException, IOException { client0.commit(); client1.commit(); QueryResponse qacResponse; diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncWithLeaderTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncWithLeaderTest.java index 4ca343a1d2b..f1c7f696ad9 100644 --- a/solr/core/src/test/org/apache/solr/update/PeerSyncWithLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncWithLeaderTest.java @@ -19,6 +19,7 @@ package org.apache.solr.update; import java.io.IOException; import java.util.Arrays; +import java.util.Set; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrClient; @@ -30,6 +31,23 @@ import org.apache.solr.common.util.StrUtils; @SolrTestCaseJ4.SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") public class PeerSyncWithLeaderTest extends PeerSyncTest { + @Override + protected void testOverlap(Set docsAdded, SolrClient client0, SolrClient client1, long v) throws IOException, SolrServerException { + for (int i=0; i