mirror of https://github.com/apache/lucene.git
SOLR-9506: cache IndexFingerprint for each segment
This commit is contained in:
parent
9b49c72dbc
commit
184b0f2215
|
@ -229,6 +229,9 @@ Optimizations
|
||||||
|
|
||||||
* SOLR-9546: Eliminate unnecessary boxing/unboxing going on in SolrParams (Pushkar Raste, noble)
|
* SOLR-9546: Eliminate unnecessary boxing/unboxing going on in SolrParams (Pushkar Raste, noble)
|
||||||
|
|
||||||
|
* SOLR-9506: cache IndexFingerprint for each segment (Pushkar Raste, yonik, noble)
|
||||||
|
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -52,12 +52,14 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import com.google.common.collect.MapMaker;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.lucene.analysis.util.ResourceLoader;
|
import org.apache.lucene.analysis.util.ResourceLoader;
|
||||||
import org.apache.lucene.codecs.Codec;
|
import org.apache.lucene.codecs.Codec;
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
import org.apache.lucene.index.IndexDeletionPolicy;
|
import org.apache.lucene.index.IndexDeletionPolicy;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
import org.apache.lucene.search.BooleanQuery;
|
import org.apache.lucene.search.BooleanQuery;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
|
@ -127,6 +129,7 @@ import org.apache.solr.search.stats.LocalStatsCache;
|
||||||
import org.apache.solr.search.stats.StatsCache;
|
import org.apache.solr.search.stats.StatsCache;
|
||||||
import org.apache.solr.update.DefaultSolrCoreState;
|
import org.apache.solr.update.DefaultSolrCoreState;
|
||||||
import org.apache.solr.update.DirectUpdateHandler2;
|
import org.apache.solr.update.DirectUpdateHandler2;
|
||||||
|
import org.apache.solr.update.IndexFingerprint;
|
||||||
import org.apache.solr.update.SolrCoreState;
|
import org.apache.solr.update.SolrCoreState;
|
||||||
import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
|
import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
|
||||||
import org.apache.solr.update.SolrIndexWriter;
|
import org.apache.solr.update.SolrIndexWriter;
|
||||||
|
@ -201,6 +204,8 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
|
|
||||||
public Date getStartTimeStamp() { return startTime; }
|
public Date getStartTimeStamp() { return startTime; }
|
||||||
|
|
||||||
|
private final Map<Object, IndexFingerprint> perSegmentFingerprintCache = new MapMaker().weakKeys().makeMap();
|
||||||
|
|
||||||
public long getStartNanoTime() {
|
public long getStartNanoTime() {
|
||||||
return startNanoTime;
|
return startNanoTime;
|
||||||
}
|
}
|
||||||
|
@ -1588,6 +1593,41 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
return getSearcher(false,true,null);
|
return getSearcher(false,true,null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes fingerprint of a segment and caches it only if all the version in segment are included in the fingerprint.
|
||||||
|
* We can't use computeIfAbsent as caching is conditional (as described above)
|
||||||
|
* There is chance that two threads may compute fingerprint on the same segment. It might be OK to do so rather than locking entire map.
|
||||||
|
*
|
||||||
|
* @param searcher searcher that includes specified LeaderReaderContext
|
||||||
|
* @param ctx LeafReaderContext of a segment to compute fingerprint of
|
||||||
|
* @param maxVersion maximum version number to consider for fingerprint computation
|
||||||
|
* @return IndexFingerprint of the segment
|
||||||
|
* @throws IOException Can throw IOException
|
||||||
|
*/
|
||||||
|
public IndexFingerprint getIndexFingerprint(SolrIndexSearcher searcher, LeafReaderContext ctx, long maxVersion)
|
||||||
|
throws IOException {
|
||||||
|
IndexFingerprint f = null;
|
||||||
|
f = perSegmentFingerprintCache.get(ctx.reader().getCoreCacheKey());
|
||||||
|
// fingerprint is either not cached or
|
||||||
|
// if we want fingerprint only up to a version less than maxVersionEncountered in the segment, or
|
||||||
|
// documents were deleted from segment for which fingerprint was cached
|
||||||
|
//
|
||||||
|
if (f == null || (f.getMaxInHash() > maxVersion) || (f.getNumDocs() != ctx.reader().numDocs())) {
|
||||||
|
log.debug("IndexFingerprint cache miss for searcher:{} reader:{} readerHash:{} maxVersion:{}", searcher, ctx.reader(), ctx.reader().hashCode(), maxVersion);
|
||||||
|
f = IndexFingerprint.getFingerprint(searcher, ctx, maxVersion);
|
||||||
|
// cache fingerprint for the segment only if all the versions in the segment are included in the fingerprint
|
||||||
|
if (f.getMaxVersionEncountered() == f.getMaxInHash()) {
|
||||||
|
log.info("Caching fingerprint for searcher:{} leafReaderContext:{} mavVersion:{}", searcher, ctx, maxVersion);
|
||||||
|
perSegmentFingerprintCache.put(ctx.reader().getCoreCacheKey(), f);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.debug("IndexFingerprint cache hit for searcher:{} reader:{} readerHash:{} maxVersion:{}", searcher, ctx.reader(), ctx.reader().hashCode(), maxVersion);
|
||||||
|
}
|
||||||
|
log.debug("Cache Size: {}, Segments Size:{}", perSegmentFingerprintCache.size(), searcher.getTopReaderContext().leaves().size());
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the current registered searcher with its reference count incremented, or null if none are registered.
|
* Returns the current registered searcher with its reference count incremented, or null if none are registered.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -33,7 +33,7 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -42,22 +42,50 @@ import org.apache.lucene.document.Document;
|
||||||
import org.apache.lucene.document.DocumentStoredFieldVisitor;
|
import org.apache.lucene.document.DocumentStoredFieldVisitor;
|
||||||
import org.apache.lucene.document.LazyDocument;
|
import org.apache.lucene.document.LazyDocument;
|
||||||
import org.apache.lucene.index.*;
|
import org.apache.lucene.index.*;
|
||||||
import org.apache.lucene.index.NumericDocValues;
|
|
||||||
import org.apache.lucene.index.StoredFieldVisitor.Status;
|
import org.apache.lucene.index.StoredFieldVisitor.Status;
|
||||||
import org.apache.lucene.search.*;
|
import org.apache.lucene.search.BooleanClause;
|
||||||
import org.apache.lucene.search.BooleanClause.Occur;
|
import org.apache.lucene.search.BooleanClause.Occur;
|
||||||
|
import org.apache.lucene.search.BooleanQuery;
|
||||||
|
import org.apache.lucene.search.CollectionStatistics;
|
||||||
|
import org.apache.lucene.search.Collector;
|
||||||
|
import org.apache.lucene.search.ConstantScoreQuery;
|
||||||
|
import org.apache.lucene.search.DocIdSet;
|
||||||
|
import org.apache.lucene.search.DocIdSetIterator;
|
||||||
|
import org.apache.lucene.search.EarlyTerminatingSortingCollector;
|
||||||
|
import org.apache.lucene.search.Explanation;
|
||||||
|
import org.apache.lucene.search.FieldDoc;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.LeafCollector;
|
||||||
|
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||||
|
import org.apache.lucene.search.MultiCollector;
|
||||||
|
import org.apache.lucene.search.Query;
|
||||||
|
import org.apache.lucene.search.ScoreDoc;
|
||||||
|
import org.apache.lucene.search.Scorer;
|
||||||
|
import org.apache.lucene.search.SimpleCollector;
|
||||||
|
import org.apache.lucene.search.Sort;
|
||||||
|
import org.apache.lucene.search.SortField;
|
||||||
|
import org.apache.lucene.search.TermQuery;
|
||||||
|
import org.apache.lucene.search.TermStatistics;
|
||||||
|
import org.apache.lucene.search.TimeLimitingCollector;
|
||||||
|
import org.apache.lucene.search.TopDocs;
|
||||||
|
import org.apache.lucene.search.TopDocsCollector;
|
||||||
|
import org.apache.lucene.search.TopFieldCollector;
|
||||||
|
import org.apache.lucene.search.TopFieldDocs;
|
||||||
|
import org.apache.lucene.search.TopScoreDocCollector;
|
||||||
|
import org.apache.lucene.search.TotalHitCountCollector;
|
||||||
|
import org.apache.lucene.search.Weight;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.util.Bits;
|
import org.apache.lucene.util.Bits;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.FixedBitSet;
|
import org.apache.lucene.util.FixedBitSet;
|
||||||
import org.apache.solr.common.SolrDocumentBase;
|
import org.apache.solr.common.SolrDocumentBase;
|
||||||
import org.apache.solr.common.SolrException.ErrorCode;
|
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.SolrException.ErrorCode;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||||
import org.apache.solr.core.DirectoryFactory.DirContext;
|
|
||||||
import org.apache.solr.core.DirectoryFactory;
|
import org.apache.solr.core.DirectoryFactory;
|
||||||
|
import org.apache.solr.core.DirectoryFactory.DirContext;
|
||||||
import org.apache.solr.core.SolrConfig;
|
import org.apache.solr.core.SolrConfig;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.core.SolrInfoMBean;
|
import org.apache.solr.core.SolrInfoMBean;
|
||||||
|
@ -152,8 +180,6 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
|
||||||
private final String path;
|
private final String path;
|
||||||
private boolean releaseDirectory;
|
private boolean releaseDirectory;
|
||||||
|
|
||||||
private final Map<Long, IndexFingerprint> maxVersionFingerprintCache = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private final NamedList<Object> readerStats;
|
private final NamedList<Object> readerStats;
|
||||||
|
|
||||||
private static DirectoryReader getReader(SolrCore core, SolrIndexConfig config, DirectoryFactory directoryFactory,
|
private static DirectoryReader getReader(SolrCore core, SolrIndexConfig config, DirectoryFactory directoryFactory,
|
||||||
|
@ -2416,19 +2442,24 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
|
||||||
final SolrIndexSearcher searcher = this;
|
final SolrIndexSearcher searcher = this;
|
||||||
final AtomicReference<IOException> exception = new AtomicReference<>();
|
final AtomicReference<IOException> exception = new AtomicReference<>();
|
||||||
try {
|
try {
|
||||||
return maxVersionFingerprintCache.computeIfAbsent(maxVersion, key -> {
|
return searcher.getTopReaderContext().leaves().stream()
|
||||||
try {
|
.map(ctx -> {
|
||||||
return IndexFingerprint.getFingerprint(searcher, key);
|
try {
|
||||||
} catch (IOException e) {
|
return searcher.getCore().getIndexFingerprint(searcher, ctx, maxVersion);
|
||||||
exception.set(e);
|
} catch (IOException e) {
|
||||||
return null;
|
exception.set(e);
|
||||||
}
|
return null;
|
||||||
});
|
}
|
||||||
|
})
|
||||||
|
.filter(java.util.Objects::nonNull)
|
||||||
|
.reduce(new IndexFingerprint(maxVersion), IndexFingerprint::reduce);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
if (exception.get() != null) throw exception.get();
|
if (exception.get() != null) throw exception.get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////
|
||||||
// SolrInfoMBean stuff: Statistics and Module Info
|
// SolrInfoMBean stuff: Statistics and Module Info
|
||||||
/////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -52,6 +52,14 @@ public class IndexFingerprint implements MapSerializable {
|
||||||
private long numDocs;
|
private long numDocs;
|
||||||
private long maxDoc;
|
private long maxDoc;
|
||||||
|
|
||||||
|
public IndexFingerprint() {
|
||||||
|
// default constructor
|
||||||
|
}
|
||||||
|
|
||||||
|
public IndexFingerprint (long maxVersionSpecified) {
|
||||||
|
this.maxVersionSpecified = maxVersionSpecified;
|
||||||
|
}
|
||||||
|
|
||||||
public long getMaxVersionSpecified() {
|
public long getMaxVersionSpecified() {
|
||||||
return maxVersionSpecified;
|
return maxVersionSpecified;
|
||||||
}
|
}
|
||||||
|
@ -82,53 +90,62 @@ public class IndexFingerprint implements MapSerializable {
|
||||||
|
|
||||||
/** Opens a new realtime searcher and returns it's (possibly cached) fingerprint */
|
/** Opens a new realtime searcher and returns it's (possibly cached) fingerprint */
|
||||||
public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
|
public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
|
||||||
|
RTimer timer = new RTimer();
|
||||||
core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
|
core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
|
||||||
RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
|
RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
|
||||||
try {
|
try {
|
||||||
return newestSearcher.get().getIndexFingerprint(maxVersion);
|
IndexFingerprint f = newestSearcher.get().getIndexFingerprint(maxVersion);
|
||||||
|
final double duration = timer.stop();
|
||||||
|
log.info("IndexFingerprint millis:{} result:{}",duration, f);
|
||||||
|
return f;
|
||||||
} finally {
|
} finally {
|
||||||
if (newestSearcher != null) {
|
if (newestSearcher != null) {
|
||||||
newestSearcher.decref();
|
newestSearcher.decref();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Calculates an index fingerprint */
|
public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, LeafReaderContext ctx, Long maxVersion)
|
||||||
public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException {
|
throws IOException {
|
||||||
RTimer timer = new RTimer();
|
|
||||||
|
|
||||||
SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
|
SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
|
||||||
|
|
||||||
IndexFingerprint f = new IndexFingerprint();
|
|
||||||
f.maxVersionSpecified = maxVersion;
|
|
||||||
f.maxDoc = searcher.maxDoc();
|
|
||||||
|
|
||||||
// TODO: this could be parallelized, or even cached per-segment if performance becomes an issue
|
|
||||||
ValueSource vs = versionField.getType().getValueSource(versionField, null);
|
ValueSource vs = versionField.getType().getValueSource(versionField, null);
|
||||||
Map funcContext = ValueSource.newContext(searcher);
|
Map funcContext = ValueSource.newContext(searcher);
|
||||||
vs.createWeight(funcContext, searcher);
|
vs.createWeight(funcContext, searcher);
|
||||||
for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
|
|
||||||
int maxDoc = ctx.reader().maxDoc();
|
IndexFingerprint f = new IndexFingerprint();
|
||||||
f.numDocs += ctx.reader().numDocs();
|
f.maxVersionSpecified = maxVersion;
|
||||||
Bits liveDocs = ctx.reader().getLiveDocs();
|
f.maxDoc = ctx.reader().maxDoc();
|
||||||
FunctionValues fv = vs.getValues(funcContext, ctx);
|
f.numDocs = ctx.reader().numDocs();
|
||||||
for (int doc = 0; doc < maxDoc; doc++) {
|
|
||||||
if (liveDocs != null && !liveDocs.get(doc)) continue;
|
int maxDoc = ctx.reader().maxDoc();
|
||||||
long v = fv.longVal(doc);
|
Bits liveDocs = ctx.reader().getLiveDocs();
|
||||||
f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
|
FunctionValues fv = vs.getValues(funcContext, ctx);
|
||||||
if (v <= f.maxVersionSpecified) {
|
for (int doc = 0; doc < maxDoc; doc++) {
|
||||||
f.maxInHash = Math.max(v, f.maxInHash);
|
if (liveDocs != null && !liveDocs.get(doc)) continue;
|
||||||
f.versionsHash += Hash.fmix64(v);
|
long v = fv.longVal(doc);
|
||||||
f.numVersions++;
|
f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
|
||||||
}
|
if (v <= f.maxVersionSpecified) {
|
||||||
|
f.maxInHash = Math.max(v, f.maxInHash);
|
||||||
|
f.versionsHash += Hash.fmix64(v);
|
||||||
|
f.numVersions++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final double duration = timer.stop();
|
|
||||||
log.info("IndexFingerprint millis:" + duration + " result:" + f);
|
|
||||||
|
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static IndexFingerprint reduce(IndexFingerprint acc, IndexFingerprint f2) {
|
||||||
|
// acc should have maxVersionSpecified already set in it using IndexFingerprint(long maxVersionSpecified) constructor
|
||||||
|
acc.maxDoc = Math.max(acc.maxDoc, f2.maxDoc);
|
||||||
|
acc.numDocs += f2.numDocs;
|
||||||
|
acc.maxVersionEncountered = Math.max(acc.maxVersionEncountered, f2.maxVersionEncountered);
|
||||||
|
acc.maxInHash = Math.max(acc.maxInHash, f2.maxInHash);
|
||||||
|
acc.versionsHash += f2.versionsHash;
|
||||||
|
acc.numVersions += f2.numVersions;
|
||||||
|
|
||||||
|
return acc;
|
||||||
|
}
|
||||||
|
|
||||||
/** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
|
/** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
|
||||||
public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
|
public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
|
||||||
|
@ -200,4 +217,5 @@ public class IndexFingerprint implements MapSerializable {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return toMap(new LinkedHashMap<>()).toString();
|
return toMap(new LinkedHashMap<>()).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang.RandomStringUtils;
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
||||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
|
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
@ -37,10 +36,8 @@ import org.apache.solr.common.cloud.ClusterState;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.Slice.State;
|
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.core.Diagnostics;
|
|
||||||
import org.apache.solr.handler.ReplicationHandler;
|
import org.apache.solr.handler.ReplicationHandler;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -197,35 +194,6 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs)
|
|
||||||
throws Exception {
|
|
||||||
log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs);
|
|
||||||
boolean waitForLeader = true;
|
|
||||||
int i = 0;
|
|
||||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
|
||||||
zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
|
|
||||||
|
|
||||||
while(waitForLeader) {
|
|
||||||
ClusterState clusterState = zkStateReader.getClusterState();
|
|
||||||
DocCollection coll = clusterState.getCollection("collection1");
|
|
||||||
Slice slice = coll.getSlice(shardName);
|
|
||||||
if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
|
|
||||||
log.info("New leader got elected in {} secs", i);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(i == maxWaitInSecs) {
|
|
||||||
Diagnostics.logThreadDumps("Could not find new leader in specified timeout");
|
|
||||||
zkStateReader.getZkClient().printLayoutToStdOut();
|
|
||||||
fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs");
|
|
||||||
}
|
|
||||||
|
|
||||||
i++;
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private void waitTillNodesActive() throws Exception {
|
private void waitTillNodesActive() throws Exception {
|
||||||
for (int i = 0; i < 60; i++) {
|
for (int i = 0; i < 60; i++) {
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
|
||||||
log.info("Now shutting down initial leader");
|
log.info("Now shutting down initial leader");
|
||||||
forceNodeFailures(singletonList(initialLeaderJetty));
|
forceNodeFailures(singletonList(initialLeaderJetty));
|
||||||
log.info("Updating mappings from zk");
|
log.info("Updating mappings from zk");
|
||||||
LeaderFailureAfterFreshStartTest.waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15);
|
waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15);
|
||||||
updateMappingsFromZk(jettys, clients, true);
|
updateMappingsFromZk(jettys, clients, true);
|
||||||
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
|
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
|
||||||
|
|
||||||
|
|
|
@ -122,7 +122,8 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
|
||||||
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "1000");
|
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "1000");
|
||||||
|
|
||||||
assertSync(client1, numVersions, true, shardsArr[0]);
|
assertSync(client1, numVersions, true, shardsArr[0]);
|
||||||
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
|
client0.commit(); client1.commit();
|
||||||
|
queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
|
||||||
|
|
||||||
// test that delete by query is returned even if not requested, and that it doesn't delete newer stuff than it should
|
// test that delete by query is returned even if not requested, and that it doesn't delete newer stuff than it should
|
||||||
v=2000;
|
v=2000;
|
||||||
|
@ -145,7 +146,6 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
|
||||||
assertSync(client1, numVersions, true, shardsArr[0]);
|
assertSync(client1, numVersions, true, shardsArr[0]);
|
||||||
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
|
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
|
||||||
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// Test that handling reorders work when applying docs retrieved from peer
|
// Test that handling reorders work when applying docs retrieved from peer
|
||||||
//
|
//
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.solr.update;
|
||||||
|
|
||||||
|
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.solr.BaseDistributedSearchTestCase;
|
||||||
|
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.common.util.StrUtils;
|
||||||
|
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is deliberately kept in different class as we don't want segment merging to kick in after deleting documents.
|
||||||
|
* This ensures that first check the cached IndexFingerprint and
|
||||||
|
* recompute it only if any documents in the segment were deleted since caching the fingerprint first time around
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||||
|
public class PeerSyncWithIndexFingerprintCachingTest extends BaseDistributedSearchTestCase {
|
||||||
|
private static int numVersions = 100; // number of versions to use when syncing
|
||||||
|
private final String FROM_LEADER = DistribPhase.FROMLEADER.toString();
|
||||||
|
|
||||||
|
private ModifiableSolrParams seenLeader =
|
||||||
|
params(DISTRIB_UPDATE_PARAM, FROM_LEADER);
|
||||||
|
|
||||||
|
public PeerSyncWithIndexFingerprintCachingTest() {
|
||||||
|
stress = 0;
|
||||||
|
|
||||||
|
// TODO: a better way to do this?
|
||||||
|
configString = "solrconfig-tlog.xml";
|
||||||
|
schemaString = "schema.xml";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@ShardsFixed(num = 3)
|
||||||
|
public void test() throws Exception {
|
||||||
|
handle.clear();
|
||||||
|
handle.put("timestamp", SKIPVAL);
|
||||||
|
handle.put("score", SKIPVAL);
|
||||||
|
handle.put("maxScore", SKIPVAL);
|
||||||
|
|
||||||
|
SolrClient client0 = clients.get(0);
|
||||||
|
SolrClient client1 = clients.get(1);
|
||||||
|
|
||||||
|
long v =1;
|
||||||
|
for(; v < 8; ++v) {
|
||||||
|
add(client0, seenLeader, sdoc("id", ""+v,"_version_",v));
|
||||||
|
add(client1, seenLeader, sdoc("id",""+v,"_version_",v));
|
||||||
|
|
||||||
|
}
|
||||||
|
client0.commit(); client1.commit();
|
||||||
|
|
||||||
|
IndexFingerprint before = getFingerprint(client0, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2");
|
||||||
|
client0.commit();
|
||||||
|
|
||||||
|
IndexFingerprint after = getFingerprint(client0, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
// make sure fingerprint before and after deleting are not the same
|
||||||
|
Assert.assertTrue(IndexFingerprint.compare(before, after) != 0);
|
||||||
|
|
||||||
|
// replica which missed the delete should be able to sync
|
||||||
|
assertSync(client1, numVersions, true, shardsArr[0]);
|
||||||
|
client0.commit(); client1.commit();
|
||||||
|
|
||||||
|
queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexFingerprint getFingerprint(SolrClient client, long maxVersion) throws IOException, SolrServerException {
|
||||||
|
QueryRequest qr = new QueryRequest(params("qt","/get", "getFingerprint",Long.toString(maxVersion)));
|
||||||
|
NamedList rsp = client.request(qr);
|
||||||
|
return IndexFingerprint.fromObject(rsp.get("fingerprint"));
|
||||||
|
}
|
||||||
|
|
||||||
|
void assertSync(SolrClient client, int numVersions, boolean expectedResult, String... syncWith) throws IOException, SolrServerException {
|
||||||
|
QueryRequest qr = new QueryRequest(params("qt","/get", "getVersions",Integer.toString(numVersions), "sync", StrUtils.join(Arrays.asList(syncWith), ',')));
|
||||||
|
NamedList rsp = client.request(qr);
|
||||||
|
assertEquals(expectedResult, (Boolean) rsp.get("sync"));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -24,11 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.solr.BaseDistributedSearchTestCase;
|
import org.apache.solr.BaseDistributedSearchTestCase;
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.common.cloud.ClusterState;
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.cloud.Slice.State;
|
||||||
import org.apache.solr.core.Diagnostics;
|
import org.apache.solr.core.Diagnostics;
|
||||||
import org.apache.solr.core.MockDirectoryFactory;
|
import org.apache.solr.core.MockDirectoryFactory;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -222,6 +225,34 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
||||||
|
|
||||||
log.info("Collection has disappeared - collection: " + collection);
|
log.info("Collection has disappeared - collection: " + collection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs)
|
||||||
|
throws Exception {
|
||||||
|
log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs);
|
||||||
|
boolean waitForLeader = true;
|
||||||
|
int i = 0;
|
||||||
|
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||||
|
zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
|
||||||
|
|
||||||
|
while(waitForLeader) {
|
||||||
|
ClusterState clusterState = zkStateReader.getClusterState();
|
||||||
|
DocCollection coll = clusterState.getCollection("collection1");
|
||||||
|
Slice slice = coll.getSlice(shardName);
|
||||||
|
if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
|
||||||
|
log.info("New leader got elected in {} secs", i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(i == maxWaitInSecs) {
|
||||||
|
Diagnostics.logThreadDumps("Could not find new leader in specified timeout");
|
||||||
|
zkStateReader.getZkClient().printLayoutToStdOut();
|
||||||
|
fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs");
|
||||||
|
}
|
||||||
|
|
||||||
|
i++;
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
|
public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
|
||||||
int maxIterations = 100;
|
int maxIterations = 100;
|
||||||
|
|
Loading…
Reference in New Issue