SOLR-8586: add index fingerprinting and use it in peersync

This commit is contained in:
yonik 2016-02-04 14:54:08 -05:00
parent eee6fa0bcf
commit 629767be06
7 changed files with 334 additions and 18 deletions

View File

@ -344,6 +344,10 @@ New Features
* SOLR-8560: Added RequestStatusState enum which can be used when comparing states of
asynchronous requests. (Shai Erera)
* SOLR-8586: added index fingerprint, a hash over all versions currently in the index.
PeerSync now uses this to check if replicas are in sync. (yonik)
Bug Fixes
----------------------
@ -440,6 +444,7 @@ Bug Fixes
* SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
(Jan Høydahl, Steve Rowe)
Optimizations
----------------------

View File

@ -173,7 +173,7 @@ public class SyncStrategy {
// if we can't reach a replica for sync, we still consider the overall sync a success
// TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
// to recover once more?
PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive);
PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive, false);
return peerSync.sync();
}

View File

@ -66,6 +66,7 @@ import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.SolrReturnFields;
import org.apache.solr.search.SyntaxError;
import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
@ -596,6 +597,8 @@ public class RealTimeGetComponent extends SearchComponent
int nVersions = params.getInt("getVersions", -1);
if (nVersions == -1) return;
boolean doFingerprint = params.getBool("fingerprint", false);
String sync = params.get("sync");
if (sync != null) {
processSync(rb, nVersions, sync);
@ -608,6 +611,11 @@ public class RealTimeGetComponent extends SearchComponent
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
}
if (doFingerprint) {
IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
rb.rsp.add("fingerprint", fingerprint.toObject());
}
}

View File

@ -0,0 +1,232 @@
package org.apache.solr.update;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/** @lucene.internal */
public class IndexFingerprint {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private long maxVersionSpecified;
private long maxVersionEncountered;
private long maxInHash;
private long versionsHash;
private long numVersions;
private long numDocs;
private long maxDoc;
public long getMaxVersionSpecified() {
return maxVersionSpecified;
}
public long getMaxVersionEncountered() {
return maxVersionEncountered;
}
public long getMaxInHash() {
return maxInHash;
}
public long getVersionsHash() {
return versionsHash;
}
public long getNumVersions() {
return numVersions;
}
public long getNumDocs() {
return numDocs;
}
public long getMaxDoc() {
return maxDoc;
}
/** Opens a new realtime searcher and returns it's fingerprint */
public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
try {
return getFingerprint(newestSearcher.get(), maxVersion);
} finally {
if (newestSearcher != null) {
newestSearcher.decref();
}
}
}
public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException {
long start = System.currentTimeMillis();
SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
IndexFingerprint f = new IndexFingerprint();
f.maxVersionSpecified = maxVersion;
f.maxDoc = searcher.maxDoc();
// TODO: this could be parallelized, or even cached per-segment if performance becomes an issue
ValueSource vs = versionField.getType().getValueSource(versionField, null);
Map funcContext = ValueSource.newContext(searcher);
vs.createWeight(funcContext, searcher);
for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
int maxDoc = ctx.reader().maxDoc();
f.numDocs += ctx.reader().numDocs();
Bits liveDocs = ctx.reader().getLiveDocs();
FunctionValues fv = vs.getValues(funcContext, ctx);
for (int doc = 0; doc < maxDoc; doc++) {
if (liveDocs != null && !liveDocs.get(doc)) continue;
long v = fv.longVal(doc);
f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
if (v <= f.maxVersionSpecified) {
f.maxInHash = Math.max(v, f.maxInHash);
f.versionsHash += Hash.fmix64(v);
f.numVersions++;
}
}
}
long end = System.currentTimeMillis();
log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
return f;
}
/** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
int cmp;
// NOTE: some way want number of docs in index to take precedence over highest version (add-only systems for sure)
// if we're comparing all of the versions in the index, then go by the highest encountered.
if (f1.maxVersionSpecified == Long.MAX_VALUE) {
cmp = Long.compare(f1.maxVersionEncountered, f2.maxVersionEncountered);
if (cmp != 0) return cmp;
}
// Go by the highest version under the requested max.
cmp = Long.compare(f1.maxInHash, f2.maxInHash);
if (cmp != 0) return cmp;
// go by who has the most documents in the index
cmp = Long.compare(f1.numVersions, f2.numVersions);
if (cmp != 0) return cmp;
// both have same number of documents, so go by hash
cmp = Long.compare(f1.versionsHash, f2.versionsHash);
return cmp;
}
/**
* Create a generic object suitable for serializing with ResponseWriters
*/
public Object toObject() {
Map<String,Object> map = new LinkedHashMap<>();
map.put("maxVersionSpecified", maxVersionSpecified);
map.put("maxVersionEncountered", maxVersionEncountered);
map.put("maxInHash", maxInHash);
map.put("versionsHash", versionsHash);
map.put("numVersions", numVersions);
map.put("numDocs", numDocs);
map.put("maxDoc", maxDoc);
return map;
}
private static long getLong(Object o, String key, long def) {
long v = def;
Object oval = null;
if (o instanceof Map) {
oval = ((Map)o).get(key);
} else if (o instanceof NamedList) {
oval = ((NamedList)o).get(key);
}
return oval != null ? ((Number)oval).longValue() : def;
}
/**
* Create an IndexFingerprint object from a deserialized generic object (Map or NamedList)
*/
public static IndexFingerprint fromObject(Object o) {
IndexFingerprint f = new IndexFingerprint();
f.maxVersionSpecified = getLong(o, "maxVersionSpecified", Long.MAX_VALUE);
f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
f.maxInHash = getLong(o, "maxInHash", -1);
f.versionsHash = getLong(o, "versionsHash", -1);
f.numVersions = getLong(o, "numVersions", -1);
f.numDocs = getLong(o, "numDocs", -1);
f.maxDoc = getLong(o, "maxDoc", -1);
return f;
}
@Override
public String toString() {
return toObject().toString();
}
}

View File

@ -68,6 +68,7 @@ public class PeerSync {
private UpdateLog ulog;
private HttpShardHandlerFactory shardHandlerFactory;
private ShardHandler shardHandler;
private List<SyncShardRequest> requests = new ArrayList<>();
private List<Long> startingVersions;
@ -76,8 +77,10 @@ public class PeerSync {
private Set<Long> requestedUpdateSet;
private long ourLowThreshold; // 20th percentile
private long ourHighThreshold; // 80th percentile
private long ourHighest; // currently just used for logging/debugging purposes
private final boolean cantReachIsSuccess;
private final boolean getNoVersionsIsSuccess;
private final boolean doFingerprint;
private final HttpClient client;
private final boolean onlyIfActive;
private SolrCore core;
@ -116,6 +119,8 @@ public class PeerSync {
private static class SyncShardRequest extends ShardRequest {
List<Long> reportedVersions;
IndexFingerprint fingerprint;
boolean doFingerprintComparison;
List<Long> requestedUpdates;
Exception updateException;
}
@ -125,16 +130,17 @@ public class PeerSync {
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false);
this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false, true);
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive, boolean doFingerprint) {
this.core = core;
this.replicas = replicas;
this.nUpdates = nUpdates;
this.maxUpdates = nUpdates;
this.cantReachIsSuccess = cantReachIsSuccess;
this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
this.doFingerprint = doFingerprint;
this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
this.onlyIfActive = onlyIfActive;
@ -169,9 +175,8 @@ public class PeerSync {
return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
}
/** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates
* when considering the last N updates between it and its peers.
* A commit is not performed.
/** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates.
* It does not mean that the remote replica is in sync with us.
*/
public boolean sync() {
if (ulog == null) {
@ -210,7 +215,7 @@ public class PeerSync {
ourLowThreshold = percentile(startingVersions, 0.8f);
ourHighThreshold = percentile(startingVersions, 0.2f);
// now make sure that the starting updates overlap our updates
// there shouldn't be reorders, so any overlap will do.
@ -231,6 +236,7 @@ public class PeerSync {
}
ourUpdates = newList;
Collections.sort(ourUpdates, absComparator);
} else {
if (ourUpdates.size() > 0) {
@ -243,9 +249,10 @@ public class PeerSync {
return false;
}
}
ourHighest = ourUpdates.get(0);
ourUpdateSet = new HashSet<>(ourUpdates);
requestedUpdateSet = new HashSet<>(ourUpdates);
requestedUpdateSet = new HashSet<>();
for (;;) {
ShardResponse srsp = shardHandler.takeCompletedOrError();
@ -257,9 +264,18 @@ public class PeerSync {
return false;
}
}
log.info(msg() + "DONE. sync succeeded");
return true;
// finish up any comparisons with other shards that we deferred
boolean success = true;
for (SyncShardRequest sreq : requests) {
if (sreq.doFingerprintComparison) {
success = compareFingerprint(sreq);
if (!success) break;
}
}
log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
return success;
} finally {
MDCLoggingContext.clear();
}
@ -267,6 +283,7 @@ public class PeerSync {
private void requestVersions(String replica) {
SyncShardRequest sreq = new SyncShardRequest();
requests.add(sreq);
sreq.purpose = 1;
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
@ -274,6 +291,7 @@ public class PeerSync {
sreq.params.set("qt","/get");
sreq.params.set("distrib",false);
sreq.params.set("getVersions",nUpdates);
sreq.params.set("fingerprint",doFingerprint);
shardHandler.submit(sreq, replica, sreq.params);
}
@ -355,7 +373,12 @@ public class PeerSync {
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
sreq.reportedVersions = otherVersions;
log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] + " fingerprint:" + fingerprint );
if (fingerprint != null) {
sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
}
if (otherVersions.size() == 0) {
return getNoVersionsIsSuccess;
@ -371,13 +394,14 @@ public class PeerSync {
long otherHigh = percentile(otherVersions, .2f);
long otherLow = percentile(otherVersions, .8f);
long otherHighest = otherVersions.get(0);
if (ourHighThreshold < otherLow) {
// Small overlap between version windows and ours is older
// This means that we might miss updates if we attempted to use this method.
// Since there exists just one replica that is so much newer, we must
// fail the sync.
log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
return false;
}
@ -385,7 +409,10 @@ public class PeerSync {
// Small overlap between windows and ours is newer.
// Using this list to sync would result in requesting/replaying results we don't need
// and possibly bringing deleted docs back to life.
log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
// Because our versions are newer, IndexFingerprint with the remote would not match us.
// We return true on our side, but the remote peersync with us should fail.
return true;
}
@ -408,9 +435,15 @@ public class PeerSync {
sreq.requestedUpdates = toRequest;
if (toRequest.isEmpty()) {
log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
// we had (or already requested) all the updates referenced by the replica
// If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to defer
if (doFingerprint) {
sreq.doFingerprintComparison = true;
}
return true;
}
@ -422,6 +455,19 @@ public class PeerSync {
return requestUpdates(srsp, toRequest);
}
private boolean compareFingerprint(SyncShardRequest sreq) {
if (sreq.fingerprint == null) return true;
try {
IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
log.info("Fingerprint comparison: " + cmp);
return cmp == 0; // currently, we only check for equality...
} catch(IOException e){
log.error(msg() + "Error getting index fingerprint", e);
return false;
}
}
private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
String replica = srsp.getShardRequest().shards[0];
@ -556,7 +602,7 @@ public class PeerSync {
}
}
return true;
return compareFingerprint(sreq);
}

View File

@ -505,7 +505,9 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
/** Opens a new realtime searcher and clears the id caches */
/** Opens a new realtime searcher and clears the id caches.
* This may also be called when we updates are being buffered (from PeerSync/IndexFingerprint)
*/
public void openRealtimeSearcher() {
synchronized (this) {
// We must cause a new IndexReader to be opened before anything looks at these caches again

View File

@ -168,6 +168,29 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
// now lets check fingerprinting causes appropriate fails
v = 4000;
add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
toAdd = numVersions+10;
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
}
// client0 now has an additional add beyond our window and the fingerprint should cause this to fail
assertSync(client1, numVersions, false, shardsArr[0]);
// lets add the missing document and verify that order doesn't matter
add(client1, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
assertSync(client1, numVersions, true, shardsArr[0]);
// lets do some overwrites to ensure that repeated updates and maxDoc don't matter
for (int i=0; i<10; i++) {
add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + 1), "_version_", v + i + 1));
}
assertSync(client1, numVersions, true, shardsArr[0]);
}