* SOLR-9506: cache IndexFingerprint for each segment

This commit is contained in:
Noble Paul 2016-10-18 19:43:32 +05:30
parent b2188f4958
commit bb907a2983
8 changed files with 193 additions and 73 deletions

View File

@ -206,6 +206,8 @@ Optimizations
* SOLR-9566: Don't put replicas into recovery when first creating a Collection
(Alan Woodward)
* SOLR-9506: cache IndexFingerprint for each segment (Pushkar Raste, noble)
Other Changes
----------------------

View File

@ -58,6 +58,7 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@ -127,6 +128,7 @@ import org.apache.solr.search.stats.LocalStatsCache;
import org.apache.solr.search.stats.StatsCache;
import org.apache.solr.update.DefaultSolrCoreState;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
import org.apache.solr.update.SolrIndexWriter;
@ -149,6 +151,8 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.MapMaker;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.PATH;
@ -200,6 +204,8 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
private final ReentrantLock snapshotDelLock; // A lock instance to guard against concurrent deletions.
public Date getStartTimeStamp() { return startTime; }
private final Map<Object, IndexFingerprint> perSegmentFingerprintCache = new MapMaker().weakKeys().makeMap();
public long getStartNanoTime() {
return startNanoTime;
@ -1588,6 +1594,38 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
return getSearcher(false,true,null);
}
/**
* Computes fingerprint of a segment and caches it only if all the version in segment are included in the fingerprint.
* We can't use computeIfAbsent as caching is conditional (as described above)
* There is chance that two threads may compute fingerprint on the same segment. It might be OK to do so rather than locking entire map.
* @param searcher searcher that includes specified LeaderReaderContext
* @param ctx LeafReaderContext of a segment to compute fingerprint of
* @param maxVersion maximum version number to consider for fingerprint computation
* @return IndexFingerprint of the segment
* @throws IOException Can throw IOException
*/
public IndexFingerprint getIndexFingerprint(SolrIndexSearcher searcher, LeafReaderContext ctx, long maxVersion)
throws IOException {
IndexFingerprint f = null;
f = perSegmentFingerprintCache.get(ctx.reader().getCoreCacheKey()) ;
// fingerprint is either not cached or
// we want fingerprint only up to a version less than maxVersionEncountered in the segment
if(f == null || (f.getMaxInHash() > maxVersion)) {
log.debug("IndexFingerprint cache miss for searcher:{} reader:{} readerHash:{} maxVersion:{}", searcher, ctx.reader() , ctx.reader().hashCode(), maxVersion);
f = IndexFingerprint.getFingerprint(searcher, ctx, maxVersion);
// cache fingerprint for the segment only if all the versions in the segment are included in the fingerprint
if(f.getMaxVersionEncountered() == f.getMaxInHash()) {
log.info("Caching fingerprint for searcher:{} leafReaderContext:{} mavVersion:{}", searcher, ctx, maxVersion);
perSegmentFingerprintCache.put(ctx.reader().getCoreCacheKey(), f);
}
} else {
log.debug("IndexFingerprint cache hit for searcher:{} reader:{} readerHash:{} maxVersion:{}", searcher, ctx.reader(), ctx.reader().hashCode(), maxVersion);
}
log.debug("Cache Size: {}, Segments Size:{}", perSegmentFingerprintCache.size(), searcher.getTopReaderContext().leaves().size());
return f;
}
/**
* Returns the current registered searcher with its reference count incremented, or null if none are registered.
*/

View File

@ -42,22 +42,50 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.DocumentStoredFieldVisitor;
import org.apache.lucene.document.LazyDocument;
import org.apache.lucene.index.*;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.StoredFieldVisitor.Status;
import org.apache.lucene.search.*;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.EarlyTerminatingSortingCollector;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.common.SolrDocumentBase;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean;
@ -249,7 +277,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
this.path = path;
this.directoryFactory = directoryFactory;
this.reader = (DirectoryReader) super.readerContext.reader();
this.reader = (DirectoryReader) super.getTopReaderContext().reader();
this.rawReader = r;
this.leafReader = SlowCompositeReaderWrapper.wrap(this.reader);
this.core = core;
@ -2413,12 +2441,11 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
* gets a cached version of the IndexFingerprint for this searcher
**/
public IndexFingerprint getIndexFingerprint(long maxVersion) throws IOException {
final SolrIndexSearcher searcher = this;
final AtomicReference<IOException> exception = new AtomicReference<>();
try {
return maxVersionFingerprintCache.computeIfAbsent(maxVersion, key -> {
try {
return IndexFingerprint.getFingerprint(searcher, key);
return computeFromPerSegmentIndexFingerprint(maxVersion);
} catch (IOException e) {
exception.set(e);
return null;
@ -2429,6 +2456,27 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
}
}
private IndexFingerprint computeFromPerSegmentIndexFingerprint(long maxVersion) throws IOException {
final SolrIndexSearcher searcher = this;
final AtomicReference<IOException> exception = new AtomicReference<>();
try {
return searcher.getTopReaderContext().leaves().stream()
.map(ctx -> {
try {
return searcher.getCore().getIndexFingerprint(searcher, ctx, maxVersion);
} catch (IOException e) {
exception.set(e);
return null;
}
})
.filter(java.util.Objects::nonNull)
.reduce(new IndexFingerprint(maxVersion), IndexFingerprint::reduce);
} finally {
if (exception.get() != null) throw exception.get();
}
}
/////////////////////////////////////////////////////////////////////
// SolrInfoMBean stuff: Statistics and Module Info
/////////////////////////////////////////////////////////////////////

View File

@ -52,6 +52,14 @@ public class IndexFingerprint implements MapSerializable {
private long numDocs;
private long maxDoc;
public IndexFingerprint() {
// default constructor
}
public IndexFingerprint (long maxVersionSpecified) {
this.maxVersionSpecified = maxVersionSpecified;
}
public long getMaxVersionSpecified() {
return maxVersionSpecified;
}
@ -82,53 +90,63 @@ public class IndexFingerprint implements MapSerializable {
/** Opens a new realtime searcher and returns it's (possibly cached) fingerprint */
public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
RTimer timer = new RTimer();
core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
try {
return newestSearcher.get().getIndexFingerprint(maxVersion);
IndexFingerprint f = newestSearcher.get().getIndexFingerprint(maxVersion);
final double duration = timer.stop();
log.debug("IndexFingerprint time : {} result:{}" ,duration, f);
return f;
} finally {
if (newestSearcher != null) {
newestSearcher.decref();
}
}
}
/** Calculates an index fingerprint */
public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException {
RTimer timer = new RTimer();
public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, LeafReaderContext ctx, Long maxVersion)
throws IOException {
SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
IndexFingerprint f = new IndexFingerprint();
f.maxVersionSpecified = maxVersion;
f.maxDoc = searcher.maxDoc();
// TODO: this could be parallelized, or even cached per-segment if performance becomes an issue
ValueSource vs = versionField.getType().getValueSource(versionField, null);
Map funcContext = ValueSource.newContext(searcher);
vs.createWeight(funcContext, searcher);
for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
int maxDoc = ctx.reader().maxDoc();
f.numDocs += ctx.reader().numDocs();
Bits liveDocs = ctx.reader().getLiveDocs();
FunctionValues fv = vs.getValues(funcContext, ctx);
for (int doc = 0; doc < maxDoc; doc++) {
if (liveDocs != null && !liveDocs.get(doc)) continue;
long v = fv.longVal(doc);
f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
if (v <= f.maxVersionSpecified) {
f.maxInHash = Math.max(v, f.maxInHash);
f.versionsHash += Hash.fmix64(v);
f.numVersions++;
}
IndexFingerprint f = new IndexFingerprint();
f.maxVersionSpecified = maxVersion;
f.maxDoc = ctx.reader().maxDoc();
f.numDocs = ctx.reader().numDocs();
int maxDoc = ctx.reader().maxDoc();
Bits liveDocs = ctx.reader().getLiveDocs();
FunctionValues fv = vs.getValues(funcContext, ctx);
for (int doc = 0; doc < maxDoc; doc++) {
if (liveDocs != null && !liveDocs.get(doc)) continue;
long v = fv.longVal(doc);
f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
if (v <= f.maxVersionSpecified) {
f.maxInHash = Math.max(v, f.maxInHash);
f.versionsHash += Hash.fmix64(v);
f.numVersions++;
}
}
final double duration = timer.stop();
log.info("IndexFingerprint millis:" + duration + " result:" + f);
return f;
}
public static IndexFingerprint reduce(IndexFingerprint acc, IndexFingerprint f2) {
// acc should have maxVersionSpecified already set in it using IndexFingerprint(long maxVersionSpecified) constructor
acc.maxDoc = Math.max(acc.maxDoc, f2.maxDoc);
acc.numDocs += f2.numDocs;
acc.maxVersionEncountered = Math.max(acc.maxVersionEncountered, f2.maxVersionEncountered);
acc.maxInHash = Math.max(acc.maxInHash, f2.maxInHash);
acc.versionsHash += f2.versionsHash;
acc.numVersions += f2.numVersions;
return acc;
}
/** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
@ -200,4 +218,5 @@ public class IndexFingerprint implements MapSerializable {
public String toString() {
return toMap(new LinkedHashMap<>()).toString();
}
}

View File

@ -29,7 +29,6 @@ import java.util.stream.Collectors;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
import org.apache.solr.common.SolrInputDocument;
@ -37,10 +36,8 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.Slice.State;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.Diagnostics;
import org.apache.solr.handler.ReplicationHandler;
import org.junit.Test;
import org.slf4j.Logger;
@ -197,35 +194,6 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
}
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs)
throws Exception {
log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs);
boolean waitForLeader = true;
int i = 0;
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
while(waitForLeader) {
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection("collection1");
Slice slice = coll.getSlice(shardName);
if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
log.info("New leader got elected in {} secs", i);
break;
}
if(i == maxWaitInSecs) {
Diagnostics.logThreadDumps("Could not find new leader in specified timeout");
zkStateReader.getZkClient().printLayoutToStdOut();
fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs");
}
i++;
Thread.sleep(1000);
}
}
private void waitTillNodesActive() throws Exception {
for (int i = 0; i < 60; i++) {

View File

@ -149,7 +149,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
log.info("Now shutting down initial leader");
forceNodeFailures(singletonList(initialLeaderJetty));
log.info("Updating mappings from zk");
LeaderFailureAfterFreshStartTest.waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15);
waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15);
updateMappingsFromZk(jettys, clients, true);
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));

View File

@ -16,6 +16,9 @@
*/
package org.apache.solr.update;
import java.io.IOException;
import java.util.Arrays;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
@ -26,9 +29,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@ -145,6 +145,20 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
// Test PeerSync after replica misses delete
v = 2500;
add(client0, seenLeader, sdoc("id", "2500", "_version_", ++v));
add(client1, seenLeader, sdoc("id", "2500", "_version_", v));
client0.commit();
client1.commit();
del(client0, params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", Long.toString(-++v)), "2500");
add(client0, seenLeader, sdoc("id", "2501", "_version_", ++v));
add(client1, seenLeader, sdoc("id", "2501", "_version_", v));
// Sync should be able to delete the document
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit();
client1.commit();
queryAndCompare(params("q", "*:*", "sort", "_version_ desc"), client0, client1);
//
// Test that handling reorders work when applying docs retrieved from peer

View File

@ -24,11 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.Slice.State;
import org.apache.solr.core.Diagnostics;
import org.apache.solr.core.MockDirectoryFactory;
import org.apache.zookeeper.KeeperException;
@ -222,6 +225,34 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
log.info("Collection has disappeared - collection: " + collection);
}
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs)
throws Exception {
log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs);
boolean waitForLeader = true;
int i = 0;
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
while(waitForLeader) {
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection("collection1");
Slice slice = coll.getSlice(shardName);
if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
log.info("New leader got elected in {} secs", i);
break;
}
if(i == maxWaitInSecs) {
Diagnostics.logThreadDumps("Could not find new leader in specified timeout");
zkStateReader.getZkClient().printLayoutToStdOut();
fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs");
}
i++;
Thread.sleep(1000);
}
}
public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
int maxIterations = 100;