SOLR-9207: PeerSync recovery failes if number of updates requested is high. A new useRangeVersions config option is introduced (defaults to true) to send version ranges instead of individual versions for peer sync.

This commit is contained in:
Shalin Shekhar Mangar 2016-07-07 02:32:45 +05:30
parent c473441958
commit 380c5a6b97
7 changed files with 200 additions and 16 deletions

View File

@ -119,6 +119,10 @@ Bug Fixes
* SOLR-9088: Fixed TestManagedSchemaAPI failures which exposed race conditions in the schema API ( Varun Thacker, noble)
* SOLR-9207: PeerSync recovery failes if number of updates requested is high. A new useRangeVersions config option
is introduced (defaults to true) to send version ranges instead of individual versions for peer sync.
(Pushkar Raste, shalin)
Optimizations
----------------------

View File

@ -234,7 +234,8 @@ public class SolrConfig extends Config implements MapSerializable {
queryResultWindowSize = Math.max(1, getInt("query/queryResultWindowSize", 1));
queryResultMaxDocsCached = getInt("query/queryResultMaxDocsCached", Integer.MAX_VALUE);
enableLazyFieldLoading = getBool("query/enableLazyFieldLoading", false);
useRangeVersionsForPeerSync = getBool("peerSync/useRangeVersions", true);
filterCacheConfig = CacheConfig.getConfig(this, "query/filterCache");
queryResultCacheConfig = CacheConfig.getConfig(this, "query/queryResultCache");
@ -462,6 +463,9 @@ public class SolrConfig extends Config implements MapSerializable {
public final int queryResultWindowSize;
public final int queryResultMaxDocsCached;
public final boolean enableLazyFieldLoading;
public final boolean useRangeVersionsForPeerSync;
// DocSet
public final float hashSetInverseLoadFactor;
public final int hashDocSetMaxSize;
@ -864,6 +868,10 @@ public class SolrConfig extends Config implements MapSerializable {
"addHttpRequestToContext", addHttpRequestToContext));
if (indexConfig != null) result.put("indexConfig", indexConfig.toMap());
m = new LinkedHashMap();
result.put("peerSync", m);
m.put("useRangeVersions", useRangeVersionsForPeerSync);
//TODO there is more to add
return result;

View File

@ -20,9 +20,13 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocValuesType;
@ -41,6 +45,7 @@ import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
@ -97,8 +102,16 @@ public class RealTimeGetComponent extends SearchComponent
if (!params.getBool(COMPONENT_NAME, true)) {
return;
}
String val = params.get("getVersions");
// This seems rather kludgey, may there is better way to indicate
// that replica can support handling version ranges
String val = params.get("checkCanHandleVersionRanges");
if(val != null) {
rb.rsp.add("canHandleVersionRanges", true);
return;
}
val = params.get("getVersions");
if (val != null) {
processGetVersions(rb);
return;
@ -667,7 +680,14 @@ public class RealTimeGetComponent extends SearchComponent
UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
if (ulog == null) return;
List<String> versions = StrUtils.splitSmart(versionsStr, ",", true);
// handle version ranges
List<Long> versions = null;
if (versionsStr.indexOf("...") != -1) {
versions = resolveVersionRanges(versionsStr, ulog);
} else {
versions = StrUtils.splitSmart(versionsStr, ",", true).stream().map(Long::parseLong)
.collect(Collectors.toList());
}
List<Object> updates = new ArrayList<>(versions.size());
@ -676,8 +696,7 @@ public class RealTimeGetComponent extends SearchComponent
// TODO: get this from cache instead of rebuilding?
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
for (String versionStr : versions) {
long version = Long.parseLong(versionStr);
for (Long version : versions) {
try {
Object o = recentUpdates.lookup(version);
if (o == null) continue;
@ -702,5 +721,37 @@ public class RealTimeGetComponent extends SearchComponent
}
}
private List<Long> resolveVersionRanges(String versionsStr, UpdateLog ulog) {
if (StringUtils.isEmpty(versionsStr)) {
return Collections.emptyList();
}
List<String> ranges = StrUtils.splitSmart(versionsStr, ",", true);
// TODO merge ranges.
// get all the versions from updatelog and sort them
List<Long> versionAvailable = null;
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
versionAvailable = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
}
// sort versions
Collections.sort(versionAvailable, PeerSync.absComparator);
// This can be done with single pass over both ranges and versionsAvailable, that would require
// merging ranges. We currently use Set to ensure there are no duplicates.
Set<Long> versionsToRet = new HashSet<>(ulog.getNumRecordsToKeep());
for (String range : ranges) {
String[] rangeBounds = range.split("\\.{3}");
int indexStart = Collections.binarySearch(versionAvailable, Long.valueOf(rangeBounds[1]), PeerSync.absComparator);
int indexEnd = Collections.binarySearch(versionAvailable, Long.valueOf(rangeBounds[0]), PeerSync.absComparator);
if(indexStart >=0 && indexEnd >= 0) {
versionsToRet.addAll(versionAvailable.subList(indexStart, indexEnd + 1)); // indexEnd is exclusive
}
}
// TODO do we need to sort versions using PeerSync.absComparator?
return new ArrayList<>(versionsToRet);
}
}

View File

@ -26,6 +26,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
@ -86,7 +87,7 @@ public class PeerSync {
private SolrCore core;
// comparator that sorts by absolute value, putting highest first
private static Comparator<Long> absComparator = (o1, o2) -> {
public static Comparator<Long> absComparator = (o1, o2) -> {
long l1 = Math.abs(o1);
long l2 = Math.abs(o2);
if (l1 > l2) return -1;
@ -117,6 +118,8 @@ public class PeerSync {
boolean doFingerprintComparison;
List<Long> requestedUpdates;
Exception updateException;
List<String> requestedRanges;
long totalRequestedUpdates;
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
@ -359,6 +362,103 @@ public class PeerSync {
}
}
private boolean canHandleVersionRanges(String replica) {
SyncShardRequest sreq = new SyncShardRequest();
requests.add(sreq);
// determine if leader can handle version ranges
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = new ModifiableSolrParams();
sreq.params.set("qt", "/get");
sreq.params.set("distrib", false);
sreq.params.set("checkCanHandleVersionRanges", false);
ShardHandler sh = shardHandlerFactory.getShardHandler(client);
sh.submit(sreq, replica, sreq.params);
ShardResponse srsp = sh.takeCompletedIncludingErrors();
Boolean canHandleVersionRanges = srsp.getSolrResponse().getResponse().getBooleanArg("canHandleVersionRanges");
if (canHandleVersionRanges == null || canHandleVersionRanges.booleanValue() == false) {
return false;
}
return true;
}
private boolean handleVersionsWithRanges(ShardResponse srsp, List<Long> otherVersions, SyncShardRequest sreq,
boolean completeList, long otherHigh, long otherHighest) {
// we may endup asking for updates for too many versions, causing 2MB post payload limit. Construct a range of
// versions to request instead of asking individual versions
List<String> rangesToRequest = new ArrayList<>();
// construct ranges to request
// both ourUpdates and otherVersions are sorted with highest range first
// may be we can create another reverse the lists and avoid confusion
int ourUpdatesIndex = ourUpdates.size() - 1;
int otherUpdatesIndex = otherVersions.size() - 1;
long totalRequestedVersions = 0;
while (otherUpdatesIndex >= 0) {
// we have run out of ourUpdates, pick up all the remaining versions from the other versions
if (ourUpdatesIndex < 0) {
String range = otherVersions.get(otherUpdatesIndex) + "..." + otherVersions.get(0);
rangesToRequest.add(range);
totalRequestedVersions += otherUpdatesIndex + 1;
break;
}
// stop when the entries get old enough that reorders may lead us to see updates we don't need
if (!completeList && Math.abs(otherVersions.get(otherUpdatesIndex)) < ourLowThreshold) break;
if (ourUpdates.get(ourUpdatesIndex).longValue() == otherVersions.get(otherUpdatesIndex).longValue()) {
ourUpdatesIndex--;
otherUpdatesIndex--;
} else if (Math.abs(ourUpdates.get(ourUpdatesIndex)) < Math.abs(otherVersions.get(otherUpdatesIndex))) {
ourUpdatesIndex--;
} else {
long rangeStart = otherVersions.get(otherUpdatesIndex);
while ((otherUpdatesIndex < otherVersions.size())
&& (Math.abs(otherVersions.get(otherUpdatesIndex)) < Math.abs(ourUpdates.get(ourUpdatesIndex)))) {
otherUpdatesIndex--;
totalRequestedVersions++;
}
// construct range here
rangesToRequest.add(rangeStart + "..." + otherVersions.get(otherUpdatesIndex + 1));
}
}
// TODO, do we really need to hold on to all the ranges we requested
// keeping track of totalRequestedUpdates should suffice for verification
sreq.requestedRanges = rangesToRequest;
sreq.totalRequestedUpdates = totalRequestedVersions;
if (rangesToRequest.isEmpty()) {
log.info(msg() + " No additional versions requested. ourLowThreshold=" + ourLowThreshold + " otherHigh="
+ otherHigh + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
// we had (or already requested) all the updates referenced by the replica
// If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to
// defer
if (doFingerprint) {
sreq.doFingerprintComparison = true;
}
return true;
}
if (totalRequestedVersions > maxUpdates) {
log.info(msg() + " Failing due to needing too many updates:" + maxUpdates);
return false;
}
String rangesToRequestStr = rangesToRequest.stream().collect(Collectors.joining(","));
return requestUpdates(srsp, rangesToRequestStr, totalRequestedVersions);
}
private boolean handleVersions(ShardResponse srsp) {
// we retrieved the last N updates from the replica
List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
@ -410,6 +510,15 @@ public class PeerSync {
return true;
}
if(core.getSolrConfig().useRangeVersionsForPeerSync && canHandleVersionRanges(sreq.shards[0])) {
return handleVersionsWithRanges(srsp, otherVersions, sreq, completeList, otherHigh, otherHighest);
} else {
return handleIndividualVersions(srsp, otherVersions, sreq, completeList, otherHigh, otherHighest);
}
}
private boolean handleIndividualVersions(ShardResponse srsp, List<Long> otherVersions, SyncShardRequest sreq,
boolean completeList, long otherHigh, long otherHighest) {
List<Long> toRequest = new ArrayList<>();
for (Long otherVersion : otherVersions) {
// stop when the entries get old enough that reorders may lead us to see updates we don't need
@ -426,7 +535,10 @@ public class PeerSync {
requestedUpdateSet.add(otherVersion);
}
// TODO, do we really need to hold on to all the version numbers we requested.
// keeping track of totalRequestedUpdates should suffice for verification
sreq.requestedUpdates = toRequest;
sreq.totalRequestedUpdates = toRequest.size();
if (toRequest.isEmpty()) {
log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
@ -446,7 +558,7 @@ public class PeerSync {
return false;
}
return requestUpdates(srsp, toRequest);
return requestUpdates(srsp, StrUtils.join(toRequest, ','), toRequest.size());
}
private boolean compareFingerprint(SyncShardRequest sreq) {
@ -462,10 +574,10 @@ public class PeerSync {
}
}
private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
private boolean requestUpdates(ShardResponse srsp, String versionsAndRanges, long totalUpdates) {
String replica = srsp.getShardRequest().shards[0];
log.info(msg() + "Requesting updates from " + replica + "n=" + toRequest.size() + " versions=" + toRequest);
log.info(msg() + "Requesting updates from " + replica + "n=" + totalUpdates + " versions=" + versionsAndRanges);
// reuse our original request object
ShardRequest sreq = srsp.getShardRequest();
@ -474,7 +586,7 @@ public class PeerSync {
sreq.params = new ModifiableSolrParams();
sreq.params.set("qt", "/get");
sreq.params.set("distrib", false);
sreq.params.set("getUpdates", StrUtils.join(toRequest, ','));
sreq.params.set("getUpdates", versionsAndRanges);
sreq.params.set("onlyIfActive", onlyIfActive);
sreq.responses.clear(); // needs to be zeroed for correct correlation to occur
@ -489,7 +601,7 @@ public class PeerSync {
List<Object> updates = (List<Object>)srsp.getSolrResponse().getResponse().get("updates");
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
if (updates.size() < sreq.requestedUpdates.size()) {
if (updates.size() < sreq.totalRequestedUpdates) {
log.error(msg() + " Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
return false;
}

View File

@ -52,5 +52,8 @@
"multipartUploadLimitInKB":0,
"formdataUploadLimitInKB":0,
"enableRemoteStreaming":0,
"addHttpRequestToContext":0}}
"addHttpRequestToContext":0}},
"peerSync":{
"useRangeVersions":11
}
}

View File

@ -46,6 +46,10 @@
<requestHandler name="standard" class="solr.StandardRequestHandler">
</requestHandler>
<peerSync>
<useRangeVersions>${solr.peerSync.useRangeVersions:false}</useRangeVersions>
</peerSync>
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog>
@ -57,12 +61,12 @@
<autoCommit>
<maxTime>${solr.autoCommit.maxTime:-1}</maxTime>
<maxTime>${solr.autoCommit.maxTime:-1}</maxTime>
<openSearcher>false</openSearcher>
</autoCommit>
<autoSoftCommit>
<maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime>
<maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime>
</autoSoftCommit>
</updateHandler>

View File

@ -229,6 +229,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
System.setProperty("enable.update.log", usually() ? "true" : "false");
System.setProperty("tests.shardhandler.randomSeed", Long.toString(random().nextLong()));
System.setProperty("solr.clustering.enabled", "false");
System.setProperty("solr.peerSync.useRangeVersions", String.valueOf(random().nextBoolean()));
startTrackingSearchers();
ignoreException("ignore_exception");
newRandomConfig();
@ -276,6 +277,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
System.clearProperty("enable.update.log");
System.clearProperty("useCompoundFile");
System.clearProperty("urlScheme");
System.clearProperty("solr.peerSync.useRangeVersions");
HttpClientUtil.resetHttpClientBuilder();