diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index ef22a2ff4c0..c396d393873 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -344,6 +344,10 @@ New Features * SOLR-8560: Added RequestStatusState enum which can be used when comparing states of asynchronous requests. (Shai Erera) +* SOLR-8586: added index fingerprint, a hash over all versions currently in the index. + PeerSync now uses this to check if replicas are in sync. (yonik) + + Bug Fixes ---------------------- @@ -440,6 +444,7 @@ Bug Fixes * SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields. (Jan Høydahl, Steve Rowe) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index 7a16598bbcd..d811f5c1d93 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -173,7 +173,7 @@ public class SyncStrategy { // if we can't reach a replica for sync, we still consider the overall sync a success // TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach // to recover once more? - PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive); + PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive, false); return peerSync.sync(); } diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index 2e1f729a43c..2f71b3e8a37 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -66,6 +66,7 @@ import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.search.SolrReturnFields; import org.apache.solr.search.SyntaxError; import org.apache.solr.update.DocumentBuilder; +import org.apache.solr.update.IndexFingerprint; import org.apache.solr.update.PeerSync; import org.apache.solr.update.UpdateLog; import org.apache.solr.util.RefCounted; @@ -596,6 +597,8 @@ public class RealTimeGetComponent extends SearchComponent int nVersions = params.getInt("getVersions", -1); if (nVersions == -1) return; + boolean doFingerprint = params.getBool("fingerprint", false); + String sync = params.get("sync"); if (sync != null) { processSync(rb, nVersions, sync); @@ -608,6 +611,11 @@ public class RealTimeGetComponent extends SearchComponent try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { rb.rsp.add("versions", recentUpdates.getVersions(nVersions)); } + + if (doFingerprint) { + IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE); + rb.rsp.add("fingerprint", fingerprint.toObject()); + } } diff --git a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java new file mode 100644 index 00000000000..c73b57b7a07 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java @@ -0,0 +1,232 @@ +package org.apache.solr.update; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.ConnectException; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.HttpClient; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.queries.function.FunctionValues; +import org.apache.lucene.queries.function.ValueSource; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.cloud.ZkController; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.Hash; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.component.HttpShardHandlerFactory; +import org.apache.solr.handler.component.ShardHandler; +import org.apache.solr.handler.component.ShardHandlerFactory; +import org.apache.solr.handler.component.ShardRequest; +import org.apache.solr.handler.component.ShardResponse; +import org.apache.solr.logging.MDCLoggingContext; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorChain; +import org.apache.solr.util.RefCounted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + +/** @lucene.internal */ +public class IndexFingerprint { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private long maxVersionSpecified; + private long maxVersionEncountered; + private long maxInHash; + private long versionsHash; + private long numVersions; + private long numDocs; + private long maxDoc; + + public long getMaxVersionSpecified() { + return maxVersionSpecified; + } + + public long getMaxVersionEncountered() { + return maxVersionEncountered; + } + + public long getMaxInHash() { + return maxInHash; + } + + public long getVersionsHash() { + return versionsHash; + } + + public long getNumVersions() { + return numVersions; + } + + public long getNumDocs() { + return numDocs; + } + + public long getMaxDoc() { + return maxDoc; + } + + /** Opens a new realtime searcher and returns it's fingerprint */ + public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException { + core.getUpdateHandler().getUpdateLog().openRealtimeSearcher(); + RefCounted newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher(); + try { + return getFingerprint(newestSearcher.get(), maxVersion); + } finally { + if (newestSearcher != null) { + newestSearcher.decref(); + } + } + } + + public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException { + long start = System.currentTimeMillis(); + + SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema()); + + IndexFingerprint f = new IndexFingerprint(); + f.maxVersionSpecified = maxVersion; + f.maxDoc = searcher.maxDoc(); + + // TODO: this could be parallelized, or even cached per-segment if performance becomes an issue + ValueSource vs = versionField.getType().getValueSource(versionField, null); + Map funcContext = ValueSource.newContext(searcher); + vs.createWeight(funcContext, searcher); + for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) { + int maxDoc = ctx.reader().maxDoc(); + f.numDocs += ctx.reader().numDocs(); + Bits liveDocs = ctx.reader().getLiveDocs(); + FunctionValues fv = vs.getValues(funcContext, ctx); + for (int doc = 0; doc < maxDoc; doc++) { + if (liveDocs != null && !liveDocs.get(doc)) continue; + long v = fv.longVal(doc); + f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered); + if (v <= f.maxVersionSpecified) { + f.maxInHash = Math.max(v, f.maxInHash); + f.versionsHash += Hash.fmix64(v); + f.numVersions++; + } + } + } + + long end = System.currentTimeMillis(); + log.info("IndexFingerprint millis:" + (end-start) + " result:" + f); + + return f; + } + + /** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */ + public static int compare(IndexFingerprint f1, IndexFingerprint f2) { + int cmp; + + // NOTE: some way want number of docs in index to take precedence over highest version (add-only systems for sure) + + // if we're comparing all of the versions in the index, then go by the highest encountered. + if (f1.maxVersionSpecified == Long.MAX_VALUE) { + cmp = Long.compare(f1.maxVersionEncountered, f2.maxVersionEncountered); + if (cmp != 0) return cmp; + } + + // Go by the highest version under the requested max. + cmp = Long.compare(f1.maxInHash, f2.maxInHash); + if (cmp != 0) return cmp; + + // go by who has the most documents in the index + cmp = Long.compare(f1.numVersions, f2.numVersions); + if (cmp != 0) return cmp; + + // both have same number of documents, so go by hash + cmp = Long.compare(f1.versionsHash, f2.versionsHash); + return cmp; + } + + /** + * Create a generic object suitable for serializing with ResponseWriters + */ + public Object toObject() { + Map map = new LinkedHashMap<>(); + map.put("maxVersionSpecified", maxVersionSpecified); + map.put("maxVersionEncountered", maxVersionEncountered); + map.put("maxInHash", maxInHash); + map.put("versionsHash", versionsHash); + map.put("numVersions", numVersions); + map.put("numDocs", numDocs); + map.put("maxDoc", maxDoc); + return map; + } + + private static long getLong(Object o, String key, long def) { + long v = def; + + Object oval = null; + if (o instanceof Map) { + oval = ((Map)o).get(key); + } else if (o instanceof NamedList) { + oval = ((NamedList)o).get(key); + } + + return oval != null ? ((Number)oval).longValue() : def; + } + + /** + * Create an IndexFingerprint object from a deserialized generic object (Map or NamedList) + */ + public static IndexFingerprint fromObject(Object o) { + IndexFingerprint f = new IndexFingerprint(); + f.maxVersionSpecified = getLong(o, "maxVersionSpecified", Long.MAX_VALUE); + f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1); + f.maxInHash = getLong(o, "maxInHash", -1); + f.versionsHash = getLong(o, "versionsHash", -1); + f.numVersions = getLong(o, "numVersions", -1); + f.numDocs = getLong(o, "numDocs", -1); + f.maxDoc = getLong(o, "maxDoc", -1); + return f; + } + + @Override + public String toString() { + return toObject().toString(); + } +} diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index dacb470351a..ea717830c07 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -68,6 +68,7 @@ public class PeerSync { private UpdateLog ulog; private HttpShardHandlerFactory shardHandlerFactory; private ShardHandler shardHandler; + private List requests = new ArrayList<>(); private List startingVersions; @@ -76,8 +77,10 @@ public class PeerSync { private Set requestedUpdateSet; private long ourLowThreshold; // 20th percentile private long ourHighThreshold; // 80th percentile + private long ourHighest; // currently just used for logging/debugging purposes private final boolean cantReachIsSuccess; private final boolean getNoVersionsIsSuccess; + private final boolean doFingerprint; private final HttpClient client; private final boolean onlyIfActive; private SolrCore core; @@ -116,6 +119,8 @@ public class PeerSync { private static class SyncShardRequest extends ShardRequest { List reportedVersions; + IndexFingerprint fingerprint; + boolean doFingerprintComparison; List requestedUpdates; Exception updateException; } @@ -125,16 +130,17 @@ public class PeerSync { } public PeerSync(SolrCore core, List replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) { - this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false); + this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false, true); } - public PeerSync(SolrCore core, List replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive) { + public PeerSync(SolrCore core, List replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive, boolean doFingerprint) { this.core = core; this.replicas = replicas; this.nUpdates = nUpdates; this.maxUpdates = nUpdates; this.cantReachIsSuccess = cantReachIsSuccess; this.getNoVersionsIsSuccess = getNoVersionsIsSuccess; + this.doFingerprint = doFingerprint; this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient(); this.onlyIfActive = onlyIfActive; @@ -169,9 +175,8 @@ public class PeerSync { return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" "; } - /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates - * when considering the last N updates between it and its peers. - * A commit is not performed. + /** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates. + * It does not mean that the remote replica is in sync with us. */ public boolean sync() { if (ulog == null) { @@ -210,7 +215,7 @@ public class PeerSync { ourLowThreshold = percentile(startingVersions, 0.8f); ourHighThreshold = percentile(startingVersions, 0.2f); - + // now make sure that the starting updates overlap our updates // there shouldn't be reorders, so any overlap will do. @@ -231,6 +236,7 @@ public class PeerSync { } ourUpdates = newList; + Collections.sort(ourUpdates, absComparator); } else { if (ourUpdates.size() > 0) { @@ -243,9 +249,10 @@ public class PeerSync { return false; } } - + + ourHighest = ourUpdates.get(0); ourUpdateSet = new HashSet<>(ourUpdates); - requestedUpdateSet = new HashSet<>(ourUpdates); + requestedUpdateSet = new HashSet<>(); for (;;) { ShardResponse srsp = shardHandler.takeCompletedOrError(); @@ -257,9 +264,18 @@ public class PeerSync { return false; } } - - log.info(msg() + "DONE. sync succeeded"); - return true; + + // finish up any comparisons with other shards that we deferred + boolean success = true; + for (SyncShardRequest sreq : requests) { + if (sreq.doFingerprintComparison) { + success = compareFingerprint(sreq); + if (!success) break; + } + } + + log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed")); + return success; } finally { MDCLoggingContext.clear(); } @@ -267,6 +283,7 @@ public class PeerSync { private void requestVersions(String replica) { SyncShardRequest sreq = new SyncShardRequest(); + requests.add(sreq); sreq.purpose = 1; sreq.shards = new String[]{replica}; sreq.actualShards = sreq.shards; @@ -274,6 +291,7 @@ public class PeerSync { sreq.params.set("qt","/get"); sreq.params.set("distrib",false); sreq.params.set("getVersions",nUpdates); + sreq.params.set("fingerprint",doFingerprint); shardHandler.submit(sreq, replica, sreq.params); } @@ -355,7 +373,12 @@ public class PeerSync { SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest(); sreq.reportedVersions = otherVersions; - log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] ); + Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint"); + + log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] + " fingerprint:" + fingerprint ); + if (fingerprint != null) { + sreq.fingerprint = IndexFingerprint.fromObject(fingerprint); + } if (otherVersions.size() == 0) { return getNoVersionsIsSuccess; @@ -371,13 +394,14 @@ public class PeerSync { long otherHigh = percentile(otherVersions, .2f); long otherLow = percentile(otherVersions, .8f); + long otherHighest = otherVersions.get(0); if (ourHighThreshold < otherLow) { // Small overlap between version windows and ours is older // This means that we might miss updates if we attempted to use this method. // Since there exists just one replica that is so much newer, we must // fail the sync. - log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow); + log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest); return false; } @@ -385,7 +409,10 @@ public class PeerSync { // Small overlap between windows and ours is newer. // Using this list to sync would result in requesting/replaying results we don't need // and possibly bringing deleted docs back to life. - log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh); + log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest); + + // Because our versions are newer, IndexFingerprint with the remote would not match us. + // We return true on our side, but the remote peersync with us should fail. return true; } @@ -408,9 +435,15 @@ public class PeerSync { sreq.requestedUpdates = toRequest; if (toRequest.isEmpty()) { - log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh); + log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest); // we had (or already requested) all the updates referenced by the replica + + // If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to defer + if (doFingerprint) { + sreq.doFingerprintComparison = true; + } + return true; } @@ -422,6 +455,19 @@ public class PeerSync { return requestUpdates(srsp, toRequest); } + private boolean compareFingerprint(SyncShardRequest sreq) { + if (sreq.fingerprint == null) return true; + try { + IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE); + int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint); + log.info("Fingerprint comparison: " + cmp); + return cmp == 0; // currently, we only check for equality... + } catch(IOException e){ + log.error(msg() + "Error getting index fingerprint", e); + return false; + } + } + private boolean requestUpdates(ShardResponse srsp, List toRequest) { String replica = srsp.getShardRequest().shards[0]; @@ -556,7 +602,7 @@ public class PeerSync { } } - return true; + return compareFingerprint(sreq); } diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index c63d807d297..78c30b95d29 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -505,7 +505,9 @@ public class UpdateLog implements PluginInfoInitialized { } } - /** Opens a new realtime searcher and clears the id caches */ + /** Opens a new realtime searcher and clears the id caches. + * This may also be called when we updates are being buffered (from PeerSync/IndexFingerprint) + */ public void openRealtimeSearcher() { synchronized (this) { // We must cause a new IndexReader to be opened before anything looks at these caches again diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java index 8083ad066e1..bcaf84662f2 100644 --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java @@ -168,6 +168,29 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase { assertSync(client1, numVersions, true, shardsArr[0]); client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1); + + // now lets check fingerprinting causes appropriate fails + v = 4000; + add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v)); + toAdd = numVersions+10; + for (int i=0; i