SOLR-12969: Inconsistency with leader when PeerSync return ALREADY_IN_SYNC

This commit is contained in:
Cao Manh Dat 2018-11-12 10:10:22 +00:00
parent cd1e829732
commit f357c06276
6 changed files with 325 additions and 105 deletions

View File

@ -692,33 +692,18 @@ public class PeerSync implements SolrMetricProducer {
}
}
/**
* Helper class for doing comparison ourUpdates and other replicas's updates to find the updates that we missed
*/
public static class MissedUpdatesFinder {
private List<Long> ourUpdates;
static abstract class MissedUpdatesFinderBase {
private Set<Long> ourUpdateSet;
private Set<Long> requestedUpdateSet;
private Set<Long> requestedUpdateSet = new HashSet<>();
private long ourLowThreshold; // 20th percentile
private long ourHighThreshold; // 80th percentile
private long ourHighest; // currently just used for logging/debugging purposes
private String logPrefix;
private long nUpdates;
long ourLowThreshold; // 20th percentile
List<Long> ourUpdates;
MissedUpdatesFinder(List<Long> ourUpdates, String logPrefix, long nUpdates,
long ourLowThreshold, long ourHighThreshold) {
MissedUpdatesFinderBase(List<Long> ourUpdates, long ourLowThreshold) {
assert sorted(ourUpdates);
this.logPrefix = logPrefix;
this.ourUpdates = ourUpdates;
this.ourLowThreshold = ourLowThreshold;
this.ourHighThreshold = ourHighThreshold;
this.ourHighest = ourUpdates.get(0);
this.nUpdates = nUpdates;
this.ourUpdateSet = new HashSet<>(ourUpdates);
this.requestedUpdateSet = new HashSet<>();
this.ourLowThreshold = ourLowThreshold;
}
private boolean sorted(List<Long> list) {
@ -730,61 +715,7 @@ public class PeerSync implements SolrMetricProducer {
return true;
}
public MissedUpdatesRequest find(List<Long> otherVersions, Object updateFrom, Supplier<Boolean> canHandleVersionRanges) {
otherVersions.sort(absComparator);
if (debug) {
log.debug("{} sorted versions from {} = {}", logPrefix, otherVersions, updateFrom);
}
long otherHigh = percentile(otherVersions, .2f);
long otherLow = percentile(otherVersions, .8f);
long otherHighest = otherVersions.get(0);
if (ourHighThreshold < otherLow) {
// Small overlap between version windows and ours is older
// This means that we might miss updates if we attempted to use this method.
// Since there exists just one replica that is so much newer, we must
// fail the sync.
log.info("{} Our versions are too old. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}",
logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
if (ourLowThreshold > otherHigh) {
// Small overlap between windows and ours is newer.
// Using this list to sync would result in requesting/replaying results we don't need
// and possibly bringing deleted docs back to life.
log.info("{} Our versions are newer. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}",
logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest);
// Because our versions are newer, IndexFingerprint with the remote would not match us.
// We return true on our side, but the remote peersync with us should fail.
return MissedUpdatesRequest.ALREADY_IN_SYNC;
}
boolean completeList = otherVersions.size() < nUpdates;
MissedUpdatesRequest updatesRequest;
if (canHandleVersionRanges.get()) {
updatesRequest = handleVersionsWithRanges(otherVersions, completeList);
} else {
updatesRequest = handleIndividualVersions(otherVersions, completeList);
}
if (updatesRequest.totalRequestedUpdates > nUpdates) {
log.info("{} PeerSync will fail because number of missed updates is more than:{}", logPrefix, nUpdates);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
if (updatesRequest == MissedUpdatesRequest.EMPTY) {
log.info("{} No additional versions requested. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}",
logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest);
}
return updatesRequest;
}
private MissedUpdatesRequest handleVersionsWithRanges(List<Long> otherVersions, boolean completeList) {
MissedUpdatesRequest handleVersionsWithRanges(List<Long> otherVersions, boolean completeList) {
// we may endup asking for updates for too many versions, causing 2MB post payload limit. Construct a range of
// versions to request instead of asking individual versions
List<String> rangesToRequest = new ArrayList<>();
@ -829,7 +760,7 @@ public class PeerSync implements SolrMetricProducer {
return MissedUpdatesRequest.of(rangesToRequestStr, totalRequestedVersions);
}
private MissedUpdatesRequest handleIndividualVersions(List<Long> otherVersions, boolean completeList) {
MissedUpdatesRequest handleIndividualVersions(List<Long> otherVersions, boolean completeList) {
List<Long> toRequest = new ArrayList<>();
for (Long otherVersion : otherVersions) {
// stop when the entries get old enough that reorders may lead us to see updates we don't need
@ -848,7 +779,80 @@ public class PeerSync implements SolrMetricProducer {
return MissedUpdatesRequest.of(StrUtils.join(toRequest, ','), toRequest.size());
}
}
/**
* Helper class for doing comparison ourUpdates and other replicas's updates to find the updates that we missed
*/
public static class MissedUpdatesFinder extends MissedUpdatesFinderBase {
private long ourHighThreshold; // 80th percentile
private long ourHighest; // currently just used for logging/debugging purposes
private String logPrefix;
private long nUpdates;
MissedUpdatesFinder(List<Long> ourUpdates, String logPrefix, long nUpdates,
long ourLowThreshold, long ourHighThreshold) {
super(ourUpdates, ourLowThreshold);
this.logPrefix = logPrefix;
this.ourHighThreshold = ourHighThreshold;
this.ourHighest = ourUpdates.get(0);
this.nUpdates = nUpdates;
}
public MissedUpdatesRequest find(List<Long> otherVersions, Object updateFrom, Supplier<Boolean> canHandleVersionRanges) {
otherVersions.sort(absComparator);
if (debug) {
log.debug("{} sorted versions from {} = {}", logPrefix, otherVersions, updateFrom);
}
long otherHigh = percentile(otherVersions, .2f);
long otherLow = percentile(otherVersions, .8f);
long otherHighest = otherVersions.get(0);
if (ourHighThreshold < otherLow) {
// Small overlap between version windows and ours is older
// This means that we might miss updates if we attempted to use this method.
// Since there exists just one replica that is so much newer, we must
// fail the sync.
log.info("{} Our versions are too old. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}",
logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
if (ourLowThreshold > otherHigh && ourHighest >= otherHighest) {
// Small overlap between windows and ours is newer.
// Using this list to sync would result in requesting/replaying results we don't need
// and possibly bringing deleted docs back to life.
log.info("{} Our versions are newer. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}",
logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest);
// Because our versions are newer, IndexFingerprint with the remote would not match us.
// We return true on our side, but the remote peersync with us should fail.
return MissedUpdatesRequest.ALREADY_IN_SYNC;
}
boolean completeList = otherVersions.size() < nUpdates;
MissedUpdatesRequest updatesRequest;
if (canHandleVersionRanges.get()) {
updatesRequest = handleVersionsWithRanges(otherVersions, completeList);
} else {
updatesRequest = handleIndividualVersions(otherVersions, completeList);
}
if (updatesRequest.totalRequestedUpdates > nUpdates) {
log.info("{} PeerSync will fail because number of missed updates is more than:{}", logPrefix, nUpdates);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
if (updatesRequest == MissedUpdatesRequest.EMPTY) {
log.info("{} No additional versions requested. ourHighThreshold={} otherLowThreshold={} ourHighest={} otherHighest={}",
logPrefix, ourHighThreshold, otherLow, ourHighest, otherHighest);
}
return updatesRequest;
}
}
/**

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
@ -43,9 +44,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.update.PeerSync.MissedUpdatesRequest;
import static org.apache.solr.update.PeerSync.absComparator;
import static org.apache.solr.update.PeerSync.percentile;
import static org.apache.solr.update.PeerSync.MissedUpdatesRequest;
public class PeerSyncWithLeader implements SolrMetricProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -62,7 +63,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
private SolrCore core;
private PeerSync.Updater updater;
private PeerSync.MissedUpdatesFinder missedUpdatesFinder;
private MissedUpdatesFinder missedUpdatesFinder;
private Set<Long> bufferedUpdates;
// metrics
@ -203,7 +204,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
log.info("Leader fingerprint {}", leaderFingerprint);
}
missedUpdatesFinder = new PeerSync.MissedUpdatesFinder(ourUpdates, msg(), nUpdates, ourLowThreshold, ourHighThreshold);
missedUpdatesFinder = new MissedUpdatesFinder(ourUpdates, msg(), nUpdates, ourLowThreshold);
MissedUpdatesRequest missedUpdates = buildMissedUpdatesRequest(leaderVersionsAndFingerprint);
if (missedUpdates == MissedUpdatesRequest.ALREADY_IN_SYNC) return true;
if (missedUpdates != MissedUpdatesRequest.UNABLE_TO_SYNC) {
@ -369,4 +370,56 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
}
return false;
}
/**
* Helper class for doing comparison ourUpdates and other replicas's updates to find the updates that we missed
*/
public static class MissedUpdatesFinder extends PeerSync.MissedUpdatesFinderBase {
private long ourHighest;
private String logPrefix;
private long nUpdates;
MissedUpdatesFinder(List<Long> ourUpdates, String logPrefix, long nUpdates,
long ourLowThreshold) {
super(ourUpdates, ourLowThreshold);
this.logPrefix = logPrefix;
this.ourHighest = ourUpdates.get(0);
this.nUpdates = nUpdates;
}
public MissedUpdatesRequest find(List<Long> leaderVersions, Object updateFrom, Supplier<Boolean> canHandleVersionRanges) {
leaderVersions.sort(absComparator);
log.debug("{} sorted versions from {} = {}", logPrefix, leaderVersions, updateFrom);
long leaderLowest = leaderVersions.get(leaderVersions.size() - 1);
if (Math.abs(ourHighest) < Math.abs(leaderLowest)) {
log.info("{} Our versions are too old comparing to leader, ourHighest={} otherLowest={}", logPrefix, ourHighest, leaderLowest);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
// we don't have to check the case we ahead of the leader.
// (maybe we are the old leader and we contain some updates that no one have)
// In that case, we will fail on compute fingerprint with the current leader and start segments replication
boolean completeList = leaderVersions.size() < nUpdates;
MissedUpdatesRequest updatesRequest;
if (canHandleVersionRanges.get()) {
updatesRequest = handleVersionsWithRanges(leaderVersions, completeList);
} else {
updatesRequest = handleIndividualVersions(leaderVersions, completeList);
}
if (updatesRequest.totalRequestedUpdates > nUpdates) {
log.info("{} PeerSync will fail because number of missed updates is more than:{}", logPrefix, nUpdates);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
if (updatesRequest == MissedUpdatesRequest.EMPTY) {
log.info("{} No additional versions requested", logPrefix);
}
return updatesRequest;
}
}
}

View File

@ -252,7 +252,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
log.info("Looked up max version bucket seed "+maxVersionBefore+" for core "+coreName);
// now up the stakes and do more docs
int numDocs = TEST_NIGHTLY ? 1000 : 100;
int numDocs = TEST_NIGHTLY ? 1000 : 105;
boolean hasPartition = false;
for (int d = 0; d < numDocs; d++) {
// create / restore partition every 100 docs

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.cloud.Replica;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestCloudRecovery2 extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION = "collection1";
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
configureCluster(2)
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
CollectionAdminRequest
.createCollection(COLLECTION, "config", 1,2)
.setMaxShardsPerNode(2)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
}
@Test
public void test() throws Exception {
JettySolrRunner node1 = cluster.getJettySolrRunner(0);
JettySolrRunner node2 = cluster.getJettySolrRunner(1);
try (HttpSolrClient client1 = getHttpSolrClient(node1.getBaseUrl().toString())) {
node2.stop();
waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
UpdateRequest req = new UpdateRequest();
for (int i = 0; i < 100; i++) {
req = req.add("id", i+"", "num", i+"");
}
req.commit(client1, COLLECTION);
node2.start();
waitForState("", COLLECTION, clusterShape(1, 2));
try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) {
long numFound = client.query(COLLECTION, new SolrQuery("q","*:*", "distrib", "false")).getResults().getNumFound();
assertEquals(100, numFound);
}
long numFound = client1.query(COLLECTION, new SolrQuery("q","*:*", "distrib", "false")).getResults().getNumFound();
assertEquals(100, numFound);
new UpdateRequest().add("id", "1", "num", "10")
.commit(client1, COLLECTION);
try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) {
Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("10", v.toString());
}
Object v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("10", v.toString());
//
node2.stop();
waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
new UpdateRequest().add("id", "1", "num", "20")
.commit(client1, COLLECTION);
v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("20", v.toString());
node2.start();
waitForState("", COLLECTION, clusterShape(1, 2));
try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) {
v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("20", v.toString());
}
node2.stop();
waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
new UpdateRequest().add("id", "1", "num", "30")
.commit(client1, COLLECTION);
v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
node2.start();
waitForState("", COLLECTION, clusterShape(1, 2));
try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) {
v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
node1.stop();
waitForState("", COLLECTION, (liveNodes, collectionState) -> {
Replica leader = collectionState.getLeader("shard1");
return leader != null && leader.getNodeName().equals(node2.getNodeName());
});
node1.start();
waitForState("", COLLECTION, clusterShape(1, 2));
try (HttpSolrClient client = getHttpSolrClient(node1.getBaseUrl().toString())) {
Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
try (HttpSolrClient client = getHttpSolrClient(node2.getBaseUrl().toString())) {
Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
}
}

View File

@ -42,10 +42,9 @@ import static org.junit.internal.matchers.StringContains.containsString;
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
public class PeerSyncTest extends BaseDistributedSearchTestCase {
private static int numVersions = 100; // number of versions to use when syncing
private final String FROM_LEADER = DistribPhase.FROMLEADER.toString();
private ModifiableSolrParams seenLeader =
protected static int numVersions = 100; // number of versions to use when syncing
protected static final String FROM_LEADER = DistribPhase.FROMLEADER.toString();
protected static final ModifiableSolrParams seenLeader =
params(DISTRIB_UPDATE_PARAM, FROM_LEADER);
public PeerSyncTest() {
@ -117,24 +116,7 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
validateDocs(docsAdded, client0, client1);
int toAdd = (int)(numVersions *.95);
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
docsAdded.add(i+11);
}
// sync should fail since there's not enough overlap to give us confidence
assertSync(client1, numVersions, false, shardsArr[0]);
// add some of the docs that were missing... just enough to give enough overlap
int toAdd2 = (int)(numVersions * .25);
for (int i=0; i<toAdd2; i++) {
add(client1, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
}
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
testOverlap(docsAdded, client0, client1, v);
// test delete and deleteByQuery
v=1000;
SolrInputDocument doc = sdoc("id","1000","_version_",++v);
@ -201,7 +183,7 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
v = 4000;
add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
docsAdded.add(4000);
toAdd = numVersions+10;
int toAdd = numVersions+10;
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
@ -326,7 +308,27 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
}
private void validateDocs(Set<Integer> docsAdded, SolrClient client0, SolrClient client1) throws SolrServerException, IOException {
protected void testOverlap(Set<Integer> docsAdded, SolrClient client0, SolrClient client1, long v) throws IOException, SolrServerException {
int toAdd = (int)(numVersions *.95);
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
docsAdded.add(i+11);
}
// sync should fail since there's not enough overlap to give us confidence
assertSync(client1, numVersions, false, shardsArr[0]);
// add some of the docs that were missing... just enough to give enough overlap
int toAdd2 = (int)(numVersions * .25);
for (int i=0; i<toAdd2; i++) {
add(client1, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
}
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
}
protected void validateDocs(Set<Integer> docsAdded, SolrClient client0, SolrClient client1) throws SolrServerException, IOException {
client0.commit();
client1.commit();
QueryResponse qacResponse;

View File

@ -19,6 +19,7 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
@ -30,6 +31,23 @@ import org.apache.solr.common.util.StrUtils;
@SolrTestCaseJ4.SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
public class PeerSyncWithLeaderTest extends PeerSyncTest {
@Override
protected void testOverlap(Set<Integer> docsAdded, SolrClient client0, SolrClient client1, long v) throws IOException, SolrServerException {
for (int i=0; i<numVersions; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
docsAdded.add(i+11);
}
// sync should fail since we are too far with the leader
assertSync(client1, numVersions, false, shardsArr[0]);
// add a doc that was missing... just enough to give enough overlap
add(client1, seenLeader, sdoc("id",Integer.toString(11),"_version_",v+1));
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
}
@Override
void assertSync(SolrClient client, int numVersions, boolean expectedResult, String... syncWith) throws IOException, SolrServerException {
QueryRequest qr = new QueryRequest(params("qt","/get", "getVersions",Integer.toString(numVersions), "syncWithLeader", StrUtils.join(Arrays.asList(syncWith), ',')));