From 380c5a6b9727beabb8ccce04add7e8e7089aa798 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Thu, 7 Jul 2016 02:32:45 +0530 Subject: [PATCH] SOLR-9207: PeerSync recovery failes if number of updates requested is high. A new useRangeVersions config option is introduced (defaults to true) to send version ranges instead of individual versions for peer sync. --- solr/CHANGES.txt | 4 + .../java/org/apache/solr/core/SolrConfig.java | 10 +- .../component/RealTimeGetComponent.java | 63 ++++++++- .../java/org/apache/solr/update/PeerSync.java | 124 +++++++++++++++++- .../EditableSolrConfigAttributes.json | 5 +- .../solr/collection1/conf/solrconfig-tlog.xml | 8 +- .../java/org/apache/solr/SolrTestCaseJ4.java | 2 + 7 files changed, 200 insertions(+), 16 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 860230093c4..cf5cf05ffad 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -119,6 +119,10 @@ Bug Fixes * SOLR-9088: Fixed TestManagedSchemaAPI failures which exposed race conditions in the schema API ( Varun Thacker, noble) +* SOLR-9207: PeerSync recovery failes if number of updates requested is high. A new useRangeVersions config option + is introduced (defaults to true) to send version ranges instead of individual versions for peer sync. + (Pushkar Raste, shalin) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java index a5f54580e06..eb3aa5fc7f1 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java +++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java @@ -234,7 +234,8 @@ public class SolrConfig extends Config implements MapSerializable { queryResultWindowSize = Math.max(1, getInt("query/queryResultWindowSize", 1)); queryResultMaxDocsCached = getInt("query/queryResultMaxDocsCached", Integer.MAX_VALUE); enableLazyFieldLoading = getBool("query/enableLazyFieldLoading", false); - + + useRangeVersionsForPeerSync = getBool("peerSync/useRangeVersions", true); filterCacheConfig = CacheConfig.getConfig(this, "query/filterCache"); queryResultCacheConfig = CacheConfig.getConfig(this, "query/queryResultCache"); @@ -462,6 +463,9 @@ public class SolrConfig extends Config implements MapSerializable { public final int queryResultWindowSize; public final int queryResultMaxDocsCached; public final boolean enableLazyFieldLoading; + + public final boolean useRangeVersionsForPeerSync; + // DocSet public final float hashSetInverseLoadFactor; public final int hashDocSetMaxSize; @@ -864,6 +868,10 @@ public class SolrConfig extends Config implements MapSerializable { "addHttpRequestToContext", addHttpRequestToContext)); if (indexConfig != null) result.put("indexConfig", indexConfig.toMap()); + m = new LinkedHashMap(); + result.put("peerSync", m); + m.put("useRangeVersions", useRangeVersionsForPeerSync); + //TODO there is more to add return result; diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index 1c42b034321..1942232115a 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -20,9 +20,13 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.lucene.document.Document; import org.apache.lucene.index.DocValuesType; @@ -41,6 +45,7 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.StringUtils; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; @@ -97,8 +102,16 @@ public class RealTimeGetComponent extends SearchComponent if (!params.getBool(COMPONENT_NAME, true)) { return; } - - String val = params.get("getVersions"); + + // This seems rather kludgey, may there is better way to indicate + // that replica can support handling version ranges + String val = params.get("checkCanHandleVersionRanges"); + if(val != null) { + rb.rsp.add("canHandleVersionRanges", true); + return; + } + + val = params.get("getVersions"); if (val != null) { processGetVersions(rb); return; @@ -667,7 +680,14 @@ public class RealTimeGetComponent extends SearchComponent UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog(); if (ulog == null) return; - List versions = StrUtils.splitSmart(versionsStr, ",", true); + // handle version ranges + List versions = null; + if (versionsStr.indexOf("...") != -1) { + versions = resolveVersionRanges(versionsStr, ulog); + } else { + versions = StrUtils.splitSmart(versionsStr, ",", true).stream().map(Long::parseLong) + .collect(Collectors.toList()); + } List updates = new ArrayList<>(versions.size()); @@ -676,8 +696,7 @@ public class RealTimeGetComponent extends SearchComponent // TODO: get this from cache instead of rebuilding? try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { - for (String versionStr : versions) { - long version = Long.parseLong(versionStr); + for (Long version : versions) { try { Object o = recentUpdates.lookup(version); if (o == null) continue; @@ -702,5 +721,37 @@ public class RealTimeGetComponent extends SearchComponent } } - + + + private List resolveVersionRanges(String versionsStr, UpdateLog ulog) { + if (StringUtils.isEmpty(versionsStr)) { + return Collections.emptyList(); + } + + List ranges = StrUtils.splitSmart(versionsStr, ",", true); + + // TODO merge ranges. + + // get all the versions from updatelog and sort them + List versionAvailable = null; + try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { + versionAvailable = recentUpdates.getVersions(ulog.getNumRecordsToKeep()); + } + // sort versions + Collections.sort(versionAvailable, PeerSync.absComparator); + + // This can be done with single pass over both ranges and versionsAvailable, that would require + // merging ranges. We currently use Set to ensure there are no duplicates. + Set versionsToRet = new HashSet<>(ulog.getNumRecordsToKeep()); + for (String range : ranges) { + String[] rangeBounds = range.split("\\.{3}"); + int indexStart = Collections.binarySearch(versionAvailable, Long.valueOf(rangeBounds[1]), PeerSync.absComparator); + int indexEnd = Collections.binarySearch(versionAvailable, Long.valueOf(rangeBounds[0]), PeerSync.absComparator); + if(indexStart >=0 && indexEnd >= 0) { + versionsToRet.addAll(versionAvailable.subList(indexStart, indexEnd + 1)); // indexEnd is exclusive + } + } + // TODO do we need to sort versions using PeerSync.absComparator? + return new ArrayList<>(versionsToRet); + } } diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index 9af5b1e7a6e..79f5ac9a2d7 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -26,6 +26,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.http.NoHttpResponseException; import org.apache.http.client.HttpClient; @@ -86,7 +87,7 @@ public class PeerSync { private SolrCore core; // comparator that sorts by absolute value, putting highest first - private static Comparator absComparator = (o1, o2) -> { + public static Comparator absComparator = (o1, o2) -> { long l1 = Math.abs(o1); long l2 = Math.abs(o2); if (l1 > l2) return -1; @@ -117,6 +118,8 @@ public class PeerSync { boolean doFingerprintComparison; List requestedUpdates; Exception updateException; + List requestedRanges; + long totalRequestedUpdates; } public PeerSync(SolrCore core, List replicas, int nUpdates) { @@ -359,6 +362,103 @@ public class PeerSync { } } + private boolean canHandleVersionRanges(String replica) { + SyncShardRequest sreq = new SyncShardRequest(); + requests.add(sreq); + + // determine if leader can handle version ranges + sreq.shards = new String[] {replica}; + sreq.actualShards = sreq.shards; + sreq.params = new ModifiableSolrParams(); + sreq.params.set("qt", "/get"); + sreq.params.set("distrib", false); + sreq.params.set("checkCanHandleVersionRanges", false); + + ShardHandler sh = shardHandlerFactory.getShardHandler(client); + sh.submit(sreq, replica, sreq.params); + + ShardResponse srsp = sh.takeCompletedIncludingErrors(); + Boolean canHandleVersionRanges = srsp.getSolrResponse().getResponse().getBooleanArg("canHandleVersionRanges"); + + if (canHandleVersionRanges == null || canHandleVersionRanges.booleanValue() == false) { + return false; + } + + return true; + } + + private boolean handleVersionsWithRanges(ShardResponse srsp, List otherVersions, SyncShardRequest sreq, + boolean completeList, long otherHigh, long otherHighest) { + // we may endup asking for updates for too many versions, causing 2MB post payload limit. Construct a range of + // versions to request instead of asking individual versions + List rangesToRequest = new ArrayList<>(); + + // construct ranges to request + // both ourUpdates and otherVersions are sorted with highest range first + // may be we can create another reverse the lists and avoid confusion + int ourUpdatesIndex = ourUpdates.size() - 1; + int otherUpdatesIndex = otherVersions.size() - 1; + long totalRequestedVersions = 0; + + while (otherUpdatesIndex >= 0) { + // we have run out of ourUpdates, pick up all the remaining versions from the other versions + if (ourUpdatesIndex < 0) { + String range = otherVersions.get(otherUpdatesIndex) + "..." + otherVersions.get(0); + rangesToRequest.add(range); + totalRequestedVersions += otherUpdatesIndex + 1; + break; + } + + // stop when the entries get old enough that reorders may lead us to see updates we don't need + if (!completeList && Math.abs(otherVersions.get(otherUpdatesIndex)) < ourLowThreshold) break; + + if (ourUpdates.get(ourUpdatesIndex).longValue() == otherVersions.get(otherUpdatesIndex).longValue()) { + ourUpdatesIndex--; + otherUpdatesIndex--; + } else if (Math.abs(ourUpdates.get(ourUpdatesIndex)) < Math.abs(otherVersions.get(otherUpdatesIndex))) { + ourUpdatesIndex--; + } else { + long rangeStart = otherVersions.get(otherUpdatesIndex); + while ((otherUpdatesIndex < otherVersions.size()) + && (Math.abs(otherVersions.get(otherUpdatesIndex)) < Math.abs(ourUpdates.get(ourUpdatesIndex)))) { + otherUpdatesIndex--; + totalRequestedVersions++; + } + // construct range here + rangesToRequest.add(rangeStart + "..." + otherVersions.get(otherUpdatesIndex + 1)); + } + } + + // TODO, do we really need to hold on to all the ranges we requested + // keeping track of totalRequestedUpdates should suffice for verification + sreq.requestedRanges = rangesToRequest; + sreq.totalRequestedUpdates = totalRequestedVersions; + + if (rangesToRequest.isEmpty()) { + log.info(msg() + " No additional versions requested. ourLowThreshold=" + ourLowThreshold + " otherHigh=" + + otherHigh + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest); + + // we had (or already requested) all the updates referenced by the replica + + // If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to + // defer + if (doFingerprint) { + sreq.doFingerprintComparison = true; + } + + return true; + } + + if (totalRequestedVersions > maxUpdates) { + log.info(msg() + " Failing due to needing too many updates:" + maxUpdates); + return false; + } + + String rangesToRequestStr = rangesToRequest.stream().collect(Collectors.joining(",")); + return requestUpdates(srsp, rangesToRequestStr, totalRequestedVersions); + } + + private boolean handleVersions(ShardResponse srsp) { // we retrieved the last N updates from the replica List otherVersions = (List)srsp.getSolrResponse().getResponse().get("versions"); @@ -410,6 +510,15 @@ public class PeerSync { return true; } + if(core.getSolrConfig().useRangeVersionsForPeerSync && canHandleVersionRanges(sreq.shards[0])) { + return handleVersionsWithRanges(srsp, otherVersions, sreq, completeList, otherHigh, otherHighest); + } else { + return handleIndividualVersions(srsp, otherVersions, sreq, completeList, otherHigh, otherHighest); + } + } + + private boolean handleIndividualVersions(ShardResponse srsp, List otherVersions, SyncShardRequest sreq, + boolean completeList, long otherHigh, long otherHighest) { List toRequest = new ArrayList<>(); for (Long otherVersion : otherVersions) { // stop when the entries get old enough that reorders may lead us to see updates we don't need @@ -426,7 +535,10 @@ public class PeerSync { requestedUpdateSet.add(otherVersion); } + // TODO, do we really need to hold on to all the version numbers we requested. + // keeping track of totalRequestedUpdates should suffice for verification sreq.requestedUpdates = toRequest; + sreq.totalRequestedUpdates = toRequest.size(); if (toRequest.isEmpty()) { log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest); @@ -446,7 +558,7 @@ public class PeerSync { return false; } - return requestUpdates(srsp, toRequest); + return requestUpdates(srsp, StrUtils.join(toRequest, ','), toRequest.size()); } private boolean compareFingerprint(SyncShardRequest sreq) { @@ -462,10 +574,10 @@ public class PeerSync { } } - private boolean requestUpdates(ShardResponse srsp, List toRequest) { + private boolean requestUpdates(ShardResponse srsp, String versionsAndRanges, long totalUpdates) { String replica = srsp.getShardRequest().shards[0]; - log.info(msg() + "Requesting updates from " + replica + "n=" + toRequest.size() + " versions=" + toRequest); + log.info(msg() + "Requesting updates from " + replica + "n=" + totalUpdates + " versions=" + versionsAndRanges); // reuse our original request object ShardRequest sreq = srsp.getShardRequest(); @@ -474,7 +586,7 @@ public class PeerSync { sreq.params = new ModifiableSolrParams(); sreq.params.set("qt", "/get"); sreq.params.set("distrib", false); - sreq.params.set("getUpdates", StrUtils.join(toRequest, ',')); + sreq.params.set("getUpdates", versionsAndRanges); sreq.params.set("onlyIfActive", onlyIfActive); sreq.responses.clear(); // needs to be zeroed for correct correlation to occur @@ -489,7 +601,7 @@ public class PeerSync { List updates = (List)srsp.getSolrResponse().getResponse().get("updates"); SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest(); - if (updates.size() < sreq.requestedUpdates.size()) { + if (updates.size() < sreq.totalRequestedUpdates) { log.error(msg() + " Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size()); return false; } diff --git a/solr/core/src/resources/EditableSolrConfigAttributes.json b/solr/core/src/resources/EditableSolrConfigAttributes.json index 394d63457a8..b0d6c2fee01 100644 --- a/solr/core/src/resources/EditableSolrConfigAttributes.json +++ b/solr/core/src/resources/EditableSolrConfigAttributes.json @@ -52,5 +52,8 @@ "multipartUploadLimitInKB":0, "formdataUploadLimitInKB":0, "enableRemoteStreaming":0, - "addHttpRequestToContext":0}} + "addHttpRequestToContext":0}}, + "peerSync":{ + "useRangeVersions":11 + } } \ No newline at end of file diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml index 680f5723098..c6e2f958347 100644 --- a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml @@ -46,6 +46,10 @@ + + + ${solr.peerSync.useRangeVersions:false} + @@ -57,12 +61,12 @@ - ${solr.autoCommit.maxTime:-1} + ${solr.autoCommit.maxTime:-1} false - ${solr.autoSoftCommit.maxTime:-1} + ${solr.autoSoftCommit.maxTime:-1} diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java index db127846247..d57029c58ed 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java @@ -229,6 +229,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase { System.setProperty("enable.update.log", usually() ? "true" : "false"); System.setProperty("tests.shardhandler.randomSeed", Long.toString(random().nextLong())); System.setProperty("solr.clustering.enabled", "false"); + System.setProperty("solr.peerSync.useRangeVersions", String.valueOf(random().nextBoolean())); startTrackingSearchers(); ignoreException("ignore_exception"); newRandomConfig(); @@ -276,6 +277,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase { System.clearProperty("enable.update.log"); System.clearProperty("useCompoundFile"); System.clearProperty("urlScheme"); + System.clearProperty("solr.peerSync.useRangeVersions"); HttpClientUtil.resetHttpClientBuilder();