mirror of https://github.com/apache/lucene.git
SOLR-9207: PeerSync recovery failes if number of updates requested is high. A new useRangeVersions config option is introduced (defaults to true) to send version ranges instead of individual versions for peer sync.
This commit is contained in:
parent
c473441958
commit
380c5a6b97
|
@ -119,6 +119,10 @@ Bug Fixes
|
|||
|
||||
* SOLR-9088: Fixed TestManagedSchemaAPI failures which exposed race conditions in the schema API ( Varun Thacker, noble)
|
||||
|
||||
* SOLR-9207: PeerSync recovery failes if number of updates requested is high. A new useRangeVersions config option
|
||||
is introduced (defaults to true) to send version ranges instead of individual versions for peer sync.
|
||||
(Pushkar Raste, shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -234,7 +234,8 @@ public class SolrConfig extends Config implements MapSerializable {
|
|||
queryResultWindowSize = Math.max(1, getInt("query/queryResultWindowSize", 1));
|
||||
queryResultMaxDocsCached = getInt("query/queryResultMaxDocsCached", Integer.MAX_VALUE);
|
||||
enableLazyFieldLoading = getBool("query/enableLazyFieldLoading", false);
|
||||
|
||||
|
||||
useRangeVersionsForPeerSync = getBool("peerSync/useRangeVersions", true);
|
||||
|
||||
filterCacheConfig = CacheConfig.getConfig(this, "query/filterCache");
|
||||
queryResultCacheConfig = CacheConfig.getConfig(this, "query/queryResultCache");
|
||||
|
@ -462,6 +463,9 @@ public class SolrConfig extends Config implements MapSerializable {
|
|||
public final int queryResultWindowSize;
|
||||
public final int queryResultMaxDocsCached;
|
||||
public final boolean enableLazyFieldLoading;
|
||||
|
||||
public final boolean useRangeVersionsForPeerSync;
|
||||
|
||||
// DocSet
|
||||
public final float hashSetInverseLoadFactor;
|
||||
public final int hashDocSetMaxSize;
|
||||
|
@ -864,6 +868,10 @@ public class SolrConfig extends Config implements MapSerializable {
|
|||
"addHttpRequestToContext", addHttpRequestToContext));
|
||||
if (indexConfig != null) result.put("indexConfig", indexConfig.toMap());
|
||||
|
||||
m = new LinkedHashMap();
|
||||
result.put("peerSync", m);
|
||||
m.put("useRangeVersions", useRangeVersionsForPeerSync);
|
||||
|
||||
//TODO there is more to add
|
||||
|
||||
return result;
|
||||
|
|
|
@ -20,9 +20,13 @@ import java.io.IOException;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.DocValuesType;
|
||||
|
@ -41,6 +45,7 @@ import org.apache.solr.common.SolrDocument;
|
|||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.StringUtils;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -97,8 +102,16 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
if (!params.getBool(COMPONENT_NAME, true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
String val = params.get("getVersions");
|
||||
|
||||
// This seems rather kludgey, may there is better way to indicate
|
||||
// that replica can support handling version ranges
|
||||
String val = params.get("checkCanHandleVersionRanges");
|
||||
if(val != null) {
|
||||
rb.rsp.add("canHandleVersionRanges", true);
|
||||
return;
|
||||
}
|
||||
|
||||
val = params.get("getVersions");
|
||||
if (val != null) {
|
||||
processGetVersions(rb);
|
||||
return;
|
||||
|
@ -667,7 +680,14 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
|
||||
if (ulog == null) return;
|
||||
|
||||
List<String> versions = StrUtils.splitSmart(versionsStr, ",", true);
|
||||
// handle version ranges
|
||||
List<Long> versions = null;
|
||||
if (versionsStr.indexOf("...") != -1) {
|
||||
versions = resolveVersionRanges(versionsStr, ulog);
|
||||
} else {
|
||||
versions = StrUtils.splitSmart(versionsStr, ",", true).stream().map(Long::parseLong)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
List<Object> updates = new ArrayList<>(versions.size());
|
||||
|
@ -676,8 +696,7 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
|
||||
// TODO: get this from cache instead of rebuilding?
|
||||
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
|
||||
for (String versionStr : versions) {
|
||||
long version = Long.parseLong(versionStr);
|
||||
for (Long version : versions) {
|
||||
try {
|
||||
Object o = recentUpdates.lookup(version);
|
||||
if (o == null) continue;
|
||||
|
@ -702,5 +721,37 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private List<Long> resolveVersionRanges(String versionsStr, UpdateLog ulog) {
|
||||
if (StringUtils.isEmpty(versionsStr)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<String> ranges = StrUtils.splitSmart(versionsStr, ",", true);
|
||||
|
||||
// TODO merge ranges.
|
||||
|
||||
// get all the versions from updatelog and sort them
|
||||
List<Long> versionAvailable = null;
|
||||
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
|
||||
versionAvailable = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
|
||||
}
|
||||
// sort versions
|
||||
Collections.sort(versionAvailable, PeerSync.absComparator);
|
||||
|
||||
// This can be done with single pass over both ranges and versionsAvailable, that would require
|
||||
// merging ranges. We currently use Set to ensure there are no duplicates.
|
||||
Set<Long> versionsToRet = new HashSet<>(ulog.getNumRecordsToKeep());
|
||||
for (String range : ranges) {
|
||||
String[] rangeBounds = range.split("\\.{3}");
|
||||
int indexStart = Collections.binarySearch(versionAvailable, Long.valueOf(rangeBounds[1]), PeerSync.absComparator);
|
||||
int indexEnd = Collections.binarySearch(versionAvailable, Long.valueOf(rangeBounds[0]), PeerSync.absComparator);
|
||||
if(indexStart >=0 && indexEnd >= 0) {
|
||||
versionsToRet.addAll(versionAvailable.subList(indexStart, indexEnd + 1)); // indexEnd is exclusive
|
||||
}
|
||||
}
|
||||
// TODO do we need to sort versions using PeerSync.absComparator?
|
||||
return new ArrayList<>(versionsToRet);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Comparator;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.http.NoHttpResponseException;
|
||||
import org.apache.http.client.HttpClient;
|
||||
|
@ -86,7 +87,7 @@ public class PeerSync {
|
|||
private SolrCore core;
|
||||
|
||||
// comparator that sorts by absolute value, putting highest first
|
||||
private static Comparator<Long> absComparator = (o1, o2) -> {
|
||||
public static Comparator<Long> absComparator = (o1, o2) -> {
|
||||
long l1 = Math.abs(o1);
|
||||
long l2 = Math.abs(o2);
|
||||
if (l1 > l2) return -1;
|
||||
|
@ -117,6 +118,8 @@ public class PeerSync {
|
|||
boolean doFingerprintComparison;
|
||||
List<Long> requestedUpdates;
|
||||
Exception updateException;
|
||||
List<String> requestedRanges;
|
||||
long totalRequestedUpdates;
|
||||
}
|
||||
|
||||
public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
|
||||
|
@ -359,6 +362,103 @@ public class PeerSync {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean canHandleVersionRanges(String replica) {
|
||||
SyncShardRequest sreq = new SyncShardRequest();
|
||||
requests.add(sreq);
|
||||
|
||||
// determine if leader can handle version ranges
|
||||
sreq.shards = new String[] {replica};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = new ModifiableSolrParams();
|
||||
sreq.params.set("qt", "/get");
|
||||
sreq.params.set("distrib", false);
|
||||
sreq.params.set("checkCanHandleVersionRanges", false);
|
||||
|
||||
ShardHandler sh = shardHandlerFactory.getShardHandler(client);
|
||||
sh.submit(sreq, replica, sreq.params);
|
||||
|
||||
ShardResponse srsp = sh.takeCompletedIncludingErrors();
|
||||
Boolean canHandleVersionRanges = srsp.getSolrResponse().getResponse().getBooleanArg("canHandleVersionRanges");
|
||||
|
||||
if (canHandleVersionRanges == null || canHandleVersionRanges.booleanValue() == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean handleVersionsWithRanges(ShardResponse srsp, List<Long> otherVersions, SyncShardRequest sreq,
|
||||
boolean completeList, long otherHigh, long otherHighest) {
|
||||
// we may endup asking for updates for too many versions, causing 2MB post payload limit. Construct a range of
|
||||
// versions to request instead of asking individual versions
|
||||
List<String> rangesToRequest = new ArrayList<>();
|
||||
|
||||
// construct ranges to request
|
||||
// both ourUpdates and otherVersions are sorted with highest range first
|
||||
// may be we can create another reverse the lists and avoid confusion
|
||||
int ourUpdatesIndex = ourUpdates.size() - 1;
|
||||
int otherUpdatesIndex = otherVersions.size() - 1;
|
||||
long totalRequestedVersions = 0;
|
||||
|
||||
while (otherUpdatesIndex >= 0) {
|
||||
// we have run out of ourUpdates, pick up all the remaining versions from the other versions
|
||||
if (ourUpdatesIndex < 0) {
|
||||
String range = otherVersions.get(otherUpdatesIndex) + "..." + otherVersions.get(0);
|
||||
rangesToRequest.add(range);
|
||||
totalRequestedVersions += otherUpdatesIndex + 1;
|
||||
break;
|
||||
}
|
||||
|
||||
// stop when the entries get old enough that reorders may lead us to see updates we don't need
|
||||
if (!completeList && Math.abs(otherVersions.get(otherUpdatesIndex)) < ourLowThreshold) break;
|
||||
|
||||
if (ourUpdates.get(ourUpdatesIndex).longValue() == otherVersions.get(otherUpdatesIndex).longValue()) {
|
||||
ourUpdatesIndex--;
|
||||
otherUpdatesIndex--;
|
||||
} else if (Math.abs(ourUpdates.get(ourUpdatesIndex)) < Math.abs(otherVersions.get(otherUpdatesIndex))) {
|
||||
ourUpdatesIndex--;
|
||||
} else {
|
||||
long rangeStart = otherVersions.get(otherUpdatesIndex);
|
||||
while ((otherUpdatesIndex < otherVersions.size())
|
||||
&& (Math.abs(otherVersions.get(otherUpdatesIndex)) < Math.abs(ourUpdates.get(ourUpdatesIndex)))) {
|
||||
otherUpdatesIndex--;
|
||||
totalRequestedVersions++;
|
||||
}
|
||||
// construct range here
|
||||
rangesToRequest.add(rangeStart + "..." + otherVersions.get(otherUpdatesIndex + 1));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO, do we really need to hold on to all the ranges we requested
|
||||
// keeping track of totalRequestedUpdates should suffice for verification
|
||||
sreq.requestedRanges = rangesToRequest;
|
||||
sreq.totalRequestedUpdates = totalRequestedVersions;
|
||||
|
||||
if (rangesToRequest.isEmpty()) {
|
||||
log.info(msg() + " No additional versions requested. ourLowThreshold=" + ourLowThreshold + " otherHigh="
|
||||
+ otherHigh + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
|
||||
|
||||
// we had (or already requested) all the updates referenced by the replica
|
||||
|
||||
// If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to
|
||||
// defer
|
||||
if (doFingerprint) {
|
||||
sreq.doFingerprintComparison = true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (totalRequestedVersions > maxUpdates) {
|
||||
log.info(msg() + " Failing due to needing too many updates:" + maxUpdates);
|
||||
return false;
|
||||
}
|
||||
|
||||
String rangesToRequestStr = rangesToRequest.stream().collect(Collectors.joining(","));
|
||||
return requestUpdates(srsp, rangesToRequestStr, totalRequestedVersions);
|
||||
}
|
||||
|
||||
|
||||
private boolean handleVersions(ShardResponse srsp) {
|
||||
// we retrieved the last N updates from the replica
|
||||
List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
|
||||
|
@ -410,6 +510,15 @@ public class PeerSync {
|
|||
return true;
|
||||
}
|
||||
|
||||
if(core.getSolrConfig().useRangeVersionsForPeerSync && canHandleVersionRanges(sreq.shards[0])) {
|
||||
return handleVersionsWithRanges(srsp, otherVersions, sreq, completeList, otherHigh, otherHighest);
|
||||
} else {
|
||||
return handleIndividualVersions(srsp, otherVersions, sreq, completeList, otherHigh, otherHighest);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean handleIndividualVersions(ShardResponse srsp, List<Long> otherVersions, SyncShardRequest sreq,
|
||||
boolean completeList, long otherHigh, long otherHighest) {
|
||||
List<Long> toRequest = new ArrayList<>();
|
||||
for (Long otherVersion : otherVersions) {
|
||||
// stop when the entries get old enough that reorders may lead us to see updates we don't need
|
||||
|
@ -426,7 +535,10 @@ public class PeerSync {
|
|||
requestedUpdateSet.add(otherVersion);
|
||||
}
|
||||
|
||||
// TODO, do we really need to hold on to all the version numbers we requested.
|
||||
// keeping track of totalRequestedUpdates should suffice for verification
|
||||
sreq.requestedUpdates = toRequest;
|
||||
sreq.totalRequestedUpdates = toRequest.size();
|
||||
|
||||
if (toRequest.isEmpty()) {
|
||||
log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
|
||||
|
@ -446,7 +558,7 @@ public class PeerSync {
|
|||
return false;
|
||||
}
|
||||
|
||||
return requestUpdates(srsp, toRequest);
|
||||
return requestUpdates(srsp, StrUtils.join(toRequest, ','), toRequest.size());
|
||||
}
|
||||
|
||||
private boolean compareFingerprint(SyncShardRequest sreq) {
|
||||
|
@ -462,10 +574,10 @@ public class PeerSync {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
|
||||
private boolean requestUpdates(ShardResponse srsp, String versionsAndRanges, long totalUpdates) {
|
||||
String replica = srsp.getShardRequest().shards[0];
|
||||
|
||||
log.info(msg() + "Requesting updates from " + replica + "n=" + toRequest.size() + " versions=" + toRequest);
|
||||
log.info(msg() + "Requesting updates from " + replica + "n=" + totalUpdates + " versions=" + versionsAndRanges);
|
||||
|
||||
// reuse our original request object
|
||||
ShardRequest sreq = srsp.getShardRequest();
|
||||
|
@ -474,7 +586,7 @@ public class PeerSync {
|
|||
sreq.params = new ModifiableSolrParams();
|
||||
sreq.params.set("qt", "/get");
|
||||
sreq.params.set("distrib", false);
|
||||
sreq.params.set("getUpdates", StrUtils.join(toRequest, ','));
|
||||
sreq.params.set("getUpdates", versionsAndRanges);
|
||||
sreq.params.set("onlyIfActive", onlyIfActive);
|
||||
sreq.responses.clear(); // needs to be zeroed for correct correlation to occur
|
||||
|
||||
|
@ -489,7 +601,7 @@ public class PeerSync {
|
|||
List<Object> updates = (List<Object>)srsp.getSolrResponse().getResponse().get("updates");
|
||||
|
||||
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
|
||||
if (updates.size() < sreq.requestedUpdates.size()) {
|
||||
if (updates.size() < sreq.totalRequestedUpdates) {
|
||||
log.error(msg() + " Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -52,5 +52,8 @@
|
|||
"multipartUploadLimitInKB":0,
|
||||
"formdataUploadLimitInKB":0,
|
||||
"enableRemoteStreaming":0,
|
||||
"addHttpRequestToContext":0}}
|
||||
"addHttpRequestToContext":0}},
|
||||
"peerSync":{
|
||||
"useRangeVersions":11
|
||||
}
|
||||
}
|
|
@ -46,6 +46,10 @@
|
|||
|
||||
<requestHandler name="standard" class="solr.StandardRequestHandler">
|
||||
</requestHandler>
|
||||
|
||||
<peerSync>
|
||||
<useRangeVersions>${solr.peerSync.useRangeVersions:false}</useRangeVersions>
|
||||
</peerSync>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<updateLog>
|
||||
|
@ -57,12 +61,12 @@
|
|||
|
||||
|
||||
<autoCommit>
|
||||
<maxTime>${solr.autoCommit.maxTime:-1}</maxTime>
|
||||
<maxTime>${solr.autoCommit.maxTime:-1}</maxTime>
|
||||
<openSearcher>false</openSearcher>
|
||||
</autoCommit>
|
||||
|
||||
<autoSoftCommit>
|
||||
<maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime>
|
||||
<maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime>
|
||||
</autoSoftCommit>
|
||||
</updateHandler>
|
||||
|
||||
|
|
|
@ -229,6 +229,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
System.setProperty("enable.update.log", usually() ? "true" : "false");
|
||||
System.setProperty("tests.shardhandler.randomSeed", Long.toString(random().nextLong()));
|
||||
System.setProperty("solr.clustering.enabled", "false");
|
||||
System.setProperty("solr.peerSync.useRangeVersions", String.valueOf(random().nextBoolean()));
|
||||
startTrackingSearchers();
|
||||
ignoreException("ignore_exception");
|
||||
newRandomConfig();
|
||||
|
@ -276,6 +277,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
System.clearProperty("enable.update.log");
|
||||
System.clearProperty("useCompoundFile");
|
||||
System.clearProperty("urlScheme");
|
||||
System.clearProperty("solr.peerSync.useRangeVersions");
|
||||
|
||||
HttpClientUtil.resetHttpClientBuilder();
|
||||
|
||||
|
|
Loading…
Reference in New Issue