From 184b0f221559eaed5f273b1907e8af07bc95fec9 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Mon, 24 Oct 2016 16:45:42 +0530 Subject: [PATCH] SOLR-9506: cache IndexFingerprint for each segment --- solr/CHANGES.txt | 3 + .../java/org/apache/solr/core/SolrCore.java | 40 +++++++ .../apache/solr/search/SolrIndexSearcher.java | 61 +++++++--- .../apache/solr/update/IndexFingerprint.java | 78 ++++++++----- .../LeaderFailureAfterFreshStartTest.java | 32 ------ .../solr/cloud/PeerSyncReplicationTest.java | 2 +- .../org/apache/solr/update/PeerSyncTest.java | 4 +- ...erSyncWithIndexFingerprintCachingTest.java | 108 ++++++++++++++++++ .../solr/cloud/AbstractDistribZkTestBase.java | 31 +++++ 9 files changed, 279 insertions(+), 80 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/update/PeerSyncWithIndexFingerprintCachingTest.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 92a994fbae3..f455002cf6e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -229,6 +229,9 @@ Optimizations * SOLR-9546: Eliminate unnecessary boxing/unboxing going on in SolrParams (Pushkar Raste, noble) +* SOLR-9506: cache IndexFingerprint for each segment (Pushkar Raste, yonik, noble) + + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 7ba15afe81c..a2dc1c418e2 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -52,12 +52,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.collect.MapMaker; import org.apache.commons.io.FileUtils; import org.apache.lucene.analysis.util.ResourceLoader; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -127,6 +129,7 @@ import org.apache.solr.search.stats.LocalStatsCache; import org.apache.solr.search.stats.StatsCache; import org.apache.solr.update.DefaultSolrCoreState; import org.apache.solr.update.DirectUpdateHandler2; +import org.apache.solr.update.IndexFingerprint; import org.apache.solr.update.SolrCoreState; import org.apache.solr.update.SolrCoreState.IndexWriterCloser; import org.apache.solr.update.SolrIndexWriter; @@ -201,6 +204,8 @@ public final class SolrCore implements SolrInfoMBean, Closeable { public Date getStartTimeStamp() { return startTime; } + private final Map perSegmentFingerprintCache = new MapMaker().weakKeys().makeMap(); + public long getStartNanoTime() { return startNanoTime; } @@ -1588,6 +1593,41 @@ public final class SolrCore implements SolrInfoMBean, Closeable { return getSearcher(false,true,null); } + /** + * Computes fingerprint of a segment and caches it only if all the version in segment are included in the fingerprint. + * We can't use computeIfAbsent as caching is conditional (as described above) + * There is chance that two threads may compute fingerprint on the same segment. It might be OK to do so rather than locking entire map. + * + * @param searcher searcher that includes specified LeaderReaderContext + * @param ctx LeafReaderContext of a segment to compute fingerprint of + * @param maxVersion maximum version number to consider for fingerprint computation + * @return IndexFingerprint of the segment + * @throws IOException Can throw IOException + */ + public IndexFingerprint getIndexFingerprint(SolrIndexSearcher searcher, LeafReaderContext ctx, long maxVersion) + throws IOException { + IndexFingerprint f = null; + f = perSegmentFingerprintCache.get(ctx.reader().getCoreCacheKey()); + // fingerprint is either not cached or + // if we want fingerprint only up to a version less than maxVersionEncountered in the segment, or + // documents were deleted from segment for which fingerprint was cached + // + if (f == null || (f.getMaxInHash() > maxVersion) || (f.getNumDocs() != ctx.reader().numDocs())) { + log.debug("IndexFingerprint cache miss for searcher:{} reader:{} readerHash:{} maxVersion:{}", searcher, ctx.reader(), ctx.reader().hashCode(), maxVersion); + f = IndexFingerprint.getFingerprint(searcher, ctx, maxVersion); + // cache fingerprint for the segment only if all the versions in the segment are included in the fingerprint + if (f.getMaxVersionEncountered() == f.getMaxInHash()) { + log.info("Caching fingerprint for searcher:{} leafReaderContext:{} mavVersion:{}", searcher, ctx, maxVersion); + perSegmentFingerprintCache.put(ctx.reader().getCoreCacheKey(), f); + } + + } else { + log.debug("IndexFingerprint cache hit for searcher:{} reader:{} readerHash:{} maxVersion:{}", searcher, ctx.reader(), ctx.reader().hashCode(), maxVersion); + } + log.debug("Cache Size: {}, Segments Size:{}", perSegmentFingerprintCache.size(), searcher.getTopReaderContext().leaves().size()); + return f; + } + /** * Returns the current registered searcher with its reference count incremented, or null if none are registered. */ diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java index 933477b1ac1..d9364ca3a57 100644 --- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java +++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java @@ -33,7 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -42,22 +42,50 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.DocumentStoredFieldVisitor; import org.apache.lucene.document.LazyDocument; import org.apache.lucene.index.*; -import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.StoredFieldVisitor.Status; -import org.apache.lucene.search.*; +import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.CollectionStatistics; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.EarlyTerminatingSortingCollector; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MultiCollector; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.SimpleCollector; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TermStatistics; +import org.apache.lucene.search.TimeLimitingCollector; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.search.TopScoreDocCollector; +import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.solr.common.SolrDocumentBase; -import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; -import org.apache.solr.core.DirectoryFactory.DirContext; import org.apache.solr.core.DirectoryFactory; +import org.apache.solr.core.DirectoryFactory.DirContext; import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrInfoMBean; @@ -152,8 +180,6 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI private final String path; private boolean releaseDirectory; - private final Map maxVersionFingerprintCache = new ConcurrentHashMap<>(); - private final NamedList readerStats; private static DirectoryReader getReader(SolrCore core, SolrIndexConfig config, DirectoryFactory directoryFactory, @@ -2416,19 +2442,24 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI final SolrIndexSearcher searcher = this; final AtomicReference exception = new AtomicReference<>(); try { - return maxVersionFingerprintCache.computeIfAbsent(maxVersion, key -> { - try { - return IndexFingerprint.getFingerprint(searcher, key); - } catch (IOException e) { - exception.set(e); - return null; - } - }); + return searcher.getTopReaderContext().leaves().stream() + .map(ctx -> { + try { + return searcher.getCore().getIndexFingerprint(searcher, ctx, maxVersion); + } catch (IOException e) { + exception.set(e); + return null; + } + }) + .filter(java.util.Objects::nonNull) + .reduce(new IndexFingerprint(maxVersion), IndexFingerprint::reduce); + } finally { if (exception.get() != null) throw exception.get(); } } + ///////////////////////////////////////////////////////////////////// // SolrInfoMBean stuff: Statistics and Module Info ///////////////////////////////////////////////////////////////////// diff --git a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java index 877ef034090..0b7e655608f 100644 --- a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java +++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java @@ -52,6 +52,14 @@ public class IndexFingerprint implements MapSerializable { private long numDocs; private long maxDoc; + public IndexFingerprint() { + // default constructor + } + + public IndexFingerprint (long maxVersionSpecified) { + this.maxVersionSpecified = maxVersionSpecified; + } + public long getMaxVersionSpecified() { return maxVersionSpecified; } @@ -82,53 +90,62 @@ public class IndexFingerprint implements MapSerializable { /** Opens a new realtime searcher and returns it's (possibly cached) fingerprint */ public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException { + RTimer timer = new RTimer(); core.getUpdateHandler().getUpdateLog().openRealtimeSearcher(); RefCounted newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher(); try { - return newestSearcher.get().getIndexFingerprint(maxVersion); + IndexFingerprint f = newestSearcher.get().getIndexFingerprint(maxVersion); + final double duration = timer.stop(); + log.info("IndexFingerprint millis:{} result:{}",duration, f); + return f; } finally { if (newestSearcher != null) { newestSearcher.decref(); } } } - - /** Calculates an index fingerprint */ - public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException { - RTimer timer = new RTimer(); - + + public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, LeafReaderContext ctx, Long maxVersion) + throws IOException { SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema()); - - IndexFingerprint f = new IndexFingerprint(); - f.maxVersionSpecified = maxVersion; - f.maxDoc = searcher.maxDoc(); - - // TODO: this could be parallelized, or even cached per-segment if performance becomes an issue ValueSource vs = versionField.getType().getValueSource(versionField, null); Map funcContext = ValueSource.newContext(searcher); vs.createWeight(funcContext, searcher); - for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) { - int maxDoc = ctx.reader().maxDoc(); - f.numDocs += ctx.reader().numDocs(); - Bits liveDocs = ctx.reader().getLiveDocs(); - FunctionValues fv = vs.getValues(funcContext, ctx); - for (int doc = 0; doc < maxDoc; doc++) { - if (liveDocs != null && !liveDocs.get(doc)) continue; - long v = fv.longVal(doc); - f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered); - if (v <= f.maxVersionSpecified) { - f.maxInHash = Math.max(v, f.maxInHash); - f.versionsHash += Hash.fmix64(v); - f.numVersions++; - } + + IndexFingerprint f = new IndexFingerprint(); + f.maxVersionSpecified = maxVersion; + f.maxDoc = ctx.reader().maxDoc(); + f.numDocs = ctx.reader().numDocs(); + + int maxDoc = ctx.reader().maxDoc(); + Bits liveDocs = ctx.reader().getLiveDocs(); + FunctionValues fv = vs.getValues(funcContext, ctx); + for (int doc = 0; doc < maxDoc; doc++) { + if (liveDocs != null && !liveDocs.get(doc)) continue; + long v = fv.longVal(doc); + f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered); + if (v <= f.maxVersionSpecified) { + f.maxInHash = Math.max(v, f.maxInHash); + f.versionsHash += Hash.fmix64(v); + f.numVersions++; } } - - final double duration = timer.stop(); - log.info("IndexFingerprint millis:" + duration + " result:" + f); - + return f; } + + + public static IndexFingerprint reduce(IndexFingerprint acc, IndexFingerprint f2) { + // acc should have maxVersionSpecified already set in it using IndexFingerprint(long maxVersionSpecified) constructor + acc.maxDoc = Math.max(acc.maxDoc, f2.maxDoc); + acc.numDocs += f2.numDocs; + acc.maxVersionEncountered = Math.max(acc.maxVersionEncountered, f2.maxVersionEncountered); + acc.maxInHash = Math.max(acc.maxInHash, f2.maxInHash); + acc.versionsHash += f2.versionsHash; + acc.numVersions += f2.numVersions; + + return acc; + } /** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */ public static int compare(IndexFingerprint f1, IndexFingerprint f2) { @@ -200,4 +217,5 @@ public class IndexFingerprint implements MapSerializable { public String toString() { return toMap(new LinkedHashMap<>()).toString(); } + } diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java index 348532c308a..ef213860c63 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import org.apache.commons.lang.RandomStringUtils; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.cloud.ZkTestServer.LimitViolationAction; import org.apache.solr.common.SolrInputDocument; @@ -37,10 +36,8 @@ import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.Slice.State; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.core.Diagnostics; import org.apache.solr.handler.ReplicationHandler; import org.junit.Test; import org.slf4j.Logger; @@ -197,35 +194,6 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB } - static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs) - throws Exception { - log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs); - boolean waitForLeader = true; - int i = 0; - ZkStateReader zkStateReader = cloudClient.getZkStateReader(); - zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION); - - while(waitForLeader) { - ClusterState clusterState = zkStateReader.getClusterState(); - DocCollection coll = clusterState.getCollection("collection1"); - Slice slice = coll.getSlice(shardName); - if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) { - log.info("New leader got elected in {} secs", i); - break; - } - - if(i == maxWaitInSecs) { - Diagnostics.logThreadDumps("Could not find new leader in specified timeout"); - zkStateReader.getZkClient().printLayoutToStdOut(); - fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs"); - } - - i++; - Thread.sleep(1000); - } - } - - private void waitTillNodesActive() throws Exception { for (int i = 0; i < 60; i++) { diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java index 3ded7d2f754..e00ea3c4811 100644 --- a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java @@ -149,7 +149,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase { log.info("Now shutting down initial leader"); forceNodeFailures(singletonList(initialLeaderJetty)); log.info("Updating mappings from zk"); - LeaderFailureAfterFreshStartTest.waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15); + waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15); updateMappingsFromZk(jettys, clients, true); assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1")); diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java index 64edd2120c7..8f3a89a599c 100644 --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java @@ -122,7 +122,8 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase { del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "1000"); assertSync(client1, numVersions, true, shardsArr[0]); - client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1); + client0.commit(); client1.commit(); + queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1); // test that delete by query is returned even if not requested, and that it doesn't delete newer stuff than it should v=2000; @@ -145,7 +146,6 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase { assertSync(client1, numVersions, true, shardsArr[0]); client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1); - // // Test that handling reorders work when applying docs retrieved from peer // diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncWithIndexFingerprintCachingTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncWithIndexFingerprintCachingTest.java new file mode 100644 index 00000000000..9617ff2a356 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncWithIndexFingerprintCachingTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.update; + +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.solr.BaseDistributedSearchTestCase; +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; +import org.junit.Assert; +import org.junit.Test; + + +/** + * This test is deliberately kept in different class as we don't want segment merging to kick in after deleting documents. + * This ensures that first check the cached IndexFingerprint and + * recompute it only if any documents in the segment were deleted since caching the fingerprint first time around + * + * + */ +@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") +public class PeerSyncWithIndexFingerprintCachingTest extends BaseDistributedSearchTestCase { + private static int numVersions = 100; // number of versions to use when syncing + private final String FROM_LEADER = DistribPhase.FROMLEADER.toString(); + + private ModifiableSolrParams seenLeader = + params(DISTRIB_UPDATE_PARAM, FROM_LEADER); + + public PeerSyncWithIndexFingerprintCachingTest() { + stress = 0; + + // TODO: a better way to do this? + configString = "solrconfig-tlog.xml"; + schemaString = "schema.xml"; + } + + @Test + @ShardsFixed(num = 3) + public void test() throws Exception { + handle.clear(); + handle.put("timestamp", SKIPVAL); + handle.put("score", SKIPVAL); + handle.put("maxScore", SKIPVAL); + + SolrClient client0 = clients.get(0); + SolrClient client1 = clients.get(1); + + long v =1; + for(; v < 8; ++v) { + add(client0, seenLeader, sdoc("id", ""+v,"_version_",v)); + add(client1, seenLeader, sdoc("id",""+v,"_version_",v)); + + } + client0.commit(); client1.commit(); + + IndexFingerprint before = getFingerprint(client0, Long.MAX_VALUE); + + del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2"); + client0.commit(); + + IndexFingerprint after = getFingerprint(client0, Long.MAX_VALUE); + + // make sure fingerprint before and after deleting are not the same + Assert.assertTrue(IndexFingerprint.compare(before, after) != 0); + + // replica which missed the delete should be able to sync + assertSync(client1, numVersions, true, shardsArr[0]); + client0.commit(); client1.commit(); + + queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1); + } + + IndexFingerprint getFingerprint(SolrClient client, long maxVersion) throws IOException, SolrServerException { + QueryRequest qr = new QueryRequest(params("qt","/get", "getFingerprint",Long.toString(maxVersion))); + NamedList rsp = client.request(qr); + return IndexFingerprint.fromObject(rsp.get("fingerprint")); + } + + void assertSync(SolrClient client, int numVersions, boolean expectedResult, String... syncWith) throws IOException, SolrServerException { + QueryRequest qr = new QueryRequest(params("qt","/get", "getVersions",Integer.toString(numVersions), "sync", StrUtils.join(Arrays.asList(syncWith), ','))); + NamedList rsp = client.request(qr); + assertEquals(expectedResult, (Boolean) rsp.get("sync")); + } + +} diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java index 03db71c44d0..d04d9968b6e 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java @@ -24,11 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.solr.BaseDistributedSearchTestCase; import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.cloud.Slice.State; import org.apache.solr.core.Diagnostics; import org.apache.solr.core.MockDirectoryFactory; import org.apache.zookeeper.KeeperException; @@ -222,6 +225,34 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes log.info("Collection has disappeared - collection: " + collection); } + + static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs) + throws Exception { + log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs); + boolean waitForLeader = true; + int i = 0; + ZkStateReader zkStateReader = cloudClient.getZkStateReader(); + zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION); + + while(waitForLeader) { + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection coll = clusterState.getCollection("collection1"); + Slice slice = coll.getSlice(shardName); + if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) { + log.info("New leader got elected in {} secs", i); + break; + } + + if(i == maxWaitInSecs) { + Diagnostics.logThreadDumps("Could not find new leader in specified timeout"); + zkStateReader.getZkClient().printLayoutToStdOut(); + fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs"); + } + + i++; + Thread.sleep(1000); + } + } public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException { int maxIterations = 100;