mirror of https://github.com/apache/lucene.git
SOLR-12509: Fix a bug when using round-robin doc assignment.
This commit is contained in:
parent
d1173b8adc
commit
b5ed6350a0
|
@ -26,6 +26,7 @@ import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.lucene.index.CodecReader;
|
import org.apache.lucene.index.CodecReader;
|
||||||
import org.apache.lucene.index.FilterCodecReader;
|
import org.apache.lucene.index.FilterCodecReader;
|
||||||
|
@ -212,11 +213,14 @@ public class SolrIndexSplitter {
|
||||||
log.info("SolrIndexSplitter: partitions=" + numPieces + " segments=" + leaves.size());
|
log.info("SolrIndexSplitter: partitions=" + numPieces + " segments=" + leaves.size());
|
||||||
RTimerTree t;
|
RTimerTree t;
|
||||||
|
|
||||||
|
// this tracks round-robin assignment of docs to partitions
|
||||||
|
AtomicInteger currentPartition = new AtomicInteger();
|
||||||
|
|
||||||
if (splitMethod != SplitMethod.LINK) {
|
if (splitMethod != SplitMethod.LINK) {
|
||||||
t = timings.sub("findDocSetsPerLeaf");
|
t = timings.sub("findDocSetsPerLeaf");
|
||||||
for (LeafReaderContext readerContext : leaves) {
|
for (LeafReaderContext readerContext : leaves) {
|
||||||
assert readerContext.ordInParent == segmentDocSets.size(); // make sure we're going in order
|
assert readerContext.ordInParent == segmentDocSets.size(); // make sure we're going in order
|
||||||
FixedBitSet[] docSets = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, false);
|
FixedBitSet[] docSets = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, currentPartition, false);
|
||||||
segmentDocSets.add(docSets);
|
segmentDocSets.add(docSets);
|
||||||
}
|
}
|
||||||
t.stop();
|
t.stop();
|
||||||
|
@ -295,7 +299,7 @@ public class SolrIndexSplitter {
|
||||||
t.resume();
|
t.resume();
|
||||||
// apply deletions specific to this partition. As a side-effect on the first call this also populates
|
// apply deletions specific to this partition. As a side-effect on the first call this also populates
|
||||||
// a cache of docsets to delete per leaf reader per partition, which is reused for subsequent partitions.
|
// a cache of docsets to delete per leaf reader per partition, which is reused for subsequent partitions.
|
||||||
iw.deleteDocuments(new SplittingQuery(partitionNumber, field, rangesArr, hashRouter, splitKey, docsToDeleteCache));
|
iw.deleteDocuments(new SplittingQuery(partitionNumber, field, rangesArr, hashRouter, splitKey, docsToDeleteCache, currentPartition));
|
||||||
t.pause();
|
t.pause();
|
||||||
} else {
|
} else {
|
||||||
// This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
|
// This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
|
||||||
|
@ -433,15 +437,17 @@ public class SolrIndexSplitter {
|
||||||
private final HashBasedRouter hashRouter;
|
private final HashBasedRouter hashRouter;
|
||||||
private final String splitKey;
|
private final String splitKey;
|
||||||
private final Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete;
|
private final Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete;
|
||||||
|
private final AtomicInteger currentPartition;
|
||||||
|
|
||||||
SplittingQuery(int partition, SchemaField field, DocRouter.Range[] rangesArr, HashBasedRouter hashRouter, String splitKey,
|
SplittingQuery(int partition, SchemaField field, DocRouter.Range[] rangesArr, HashBasedRouter hashRouter, String splitKey,
|
||||||
Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete) {
|
Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete, AtomicInteger currentPartition) {
|
||||||
this.partition = partition;
|
this.partition = partition;
|
||||||
this.field = field;
|
this.field = field;
|
||||||
this.rangesArr = rangesArr;
|
this.rangesArr = rangesArr;
|
||||||
this.hashRouter = hashRouter;
|
this.hashRouter = hashRouter;
|
||||||
this.splitKey = splitKey;
|
this.splitKey = splitKey;
|
||||||
this.docsToDelete = docsToDelete;
|
this.docsToDelete = docsToDelete;
|
||||||
|
this.currentPartition = currentPartition;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -493,7 +499,7 @@ public class SolrIndexSplitter {
|
||||||
return perPartition[partition];
|
return perPartition[partition];
|
||||||
}
|
}
|
||||||
|
|
||||||
perPartition = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, true);
|
perPartition = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, currentPartition, true);
|
||||||
docsToDelete.put(readerContext.reader().getCoreCacheHelper().getKey(), perPartition);
|
docsToDelete.put(readerContext.reader().getCoreCacheHelper().getKey(), perPartition);
|
||||||
return perPartition[partition];
|
return perPartition[partition];
|
||||||
}
|
}
|
||||||
|
@ -526,7 +532,7 @@ public class SolrIndexSplitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FixedBitSet[] split(LeafReaderContext readerContext, int numPieces, SchemaField field, DocRouter.Range[] rangesArr,
|
static FixedBitSet[] split(LeafReaderContext readerContext, int numPieces, SchemaField field, DocRouter.Range[] rangesArr,
|
||||||
String splitKey, HashBasedRouter hashRouter, boolean delete) throws IOException {
|
String splitKey, HashBasedRouter hashRouter, AtomicInteger currentPartition, boolean delete) throws IOException {
|
||||||
LeafReader reader = readerContext.reader();
|
LeafReader reader = readerContext.reader();
|
||||||
FixedBitSet[] docSets = new FixedBitSet[numPieces];
|
FixedBitSet[] docSets = new FixedBitSet[numPieces];
|
||||||
for (int i=0; i<docSets.length; i++) {
|
for (int i=0; i<docSets.length; i++) {
|
||||||
|
@ -556,7 +562,6 @@ public class SolrIndexSplitter {
|
||||||
docsMatchingRanges = new int[rangesArr.length+1];
|
docsMatchingRanges = new int[rangesArr.length+1];
|
||||||
}
|
}
|
||||||
|
|
||||||
int partition = 0;
|
|
||||||
CharsRefBuilder idRef = new CharsRefBuilder();
|
CharsRefBuilder idRef = new CharsRefBuilder();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
term = termsEnum.next();
|
term = termsEnum.next();
|
||||||
|
@ -580,7 +585,7 @@ public class SolrIndexSplitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
int hash = 0;
|
int hash = 0;
|
||||||
if (hashRouter != null) {
|
if (hashRouter != null && rangesArr != null) {
|
||||||
hash = hashRouter.sliceHash(idString, null, null, null);
|
hash = hashRouter.sliceHash(idString, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -591,14 +596,14 @@ public class SolrIndexSplitter {
|
||||||
if (doc == DocIdSetIterator.NO_MORE_DOCS) break;
|
if (doc == DocIdSetIterator.NO_MORE_DOCS) break;
|
||||||
if (rangesArr == null) {
|
if (rangesArr == null) {
|
||||||
if (delete) {
|
if (delete) {
|
||||||
docSets[partition].clear(doc);
|
docSets[currentPartition.get()].clear(doc);
|
||||||
} else {
|
} else {
|
||||||
docSets[partition].set(doc);
|
docSets[currentPartition.get()].set(doc);
|
||||||
}
|
}
|
||||||
partition = (partition + 1) % numPieces;
|
currentPartition.set((currentPartition.get() + 1) % numPieces);
|
||||||
} else {
|
} else {
|
||||||
int matchingRangesCount = 0;
|
int matchingRangesCount = 0;
|
||||||
for (int i=0; i<rangesArr.length; i++) { // inner-loop: use array here for extra speed.
|
for (int i=0; i < rangesArr.length; i++) { // inner-loop: use array here for extra speed.
|
||||||
if (rangesArr[i].includes(hash)) {
|
if (rangesArr[i].includes(hash)) {
|
||||||
if (delete) {
|
if (delete) {
|
||||||
docSets[i].clear(doc);
|
docSets[i].clear(doc);
|
||||||
|
|
|
@ -269,22 +269,26 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
|
||||||
directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
|
directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
|
||||||
DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
|
DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
|
||||||
DirectoryReader reader = DirectoryReader.open(directory);
|
DirectoryReader reader = DirectoryReader.open(directory);
|
||||||
assertEquals("split index1 has wrong number of documents", max / 3, reader.numDocs());
|
int numDocs1 = reader.numDocs();
|
||||||
reader.close();
|
reader.close();
|
||||||
h.getCore().getDirectoryFactory().release(directory);
|
h.getCore().getDirectoryFactory().release(directory);
|
||||||
directory = h.getCore().getDirectoryFactory().get(indexDir2.getAbsolutePath(),
|
directory = h.getCore().getDirectoryFactory().get(indexDir2.getAbsolutePath(),
|
||||||
DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
|
DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
|
||||||
reader = DirectoryReader.open(directory);
|
reader = DirectoryReader.open(directory);
|
||||||
assertEquals("split index2 has wrong number of documents", max / 3, reader.numDocs());
|
int numDocs2 = reader.numDocs();
|
||||||
reader.close();
|
reader.close();
|
||||||
h.getCore().getDirectoryFactory().release(directory);
|
h.getCore().getDirectoryFactory().release(directory);
|
||||||
directory = h.getCore().getDirectoryFactory().get(indexDir3.getAbsolutePath(),
|
directory = h.getCore().getDirectoryFactory().get(indexDir3.getAbsolutePath(),
|
||||||
DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
|
DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
|
||||||
reader = DirectoryReader.open(directory);
|
reader = DirectoryReader.open(directory);
|
||||||
assertEquals("split index3 has wrong number of documents", max / 3, reader.numDocs());
|
int numDocs3 = reader.numDocs();
|
||||||
reader.close();
|
reader.close();
|
||||||
h.getCore().getDirectoryFactory().release(directory);
|
h.getCore().getDirectoryFactory().release(directory);
|
||||||
directory = null;
|
directory = null;
|
||||||
|
assertEquals("split indexes lost some documents!", max, numDocs1 + numDocs2 + numDocs3);
|
||||||
|
assertEquals("split index1 has wrong number of documents", max / 3, numDocs1);
|
||||||
|
assertEquals("split index2 has wrong number of documents", max / 3, numDocs2);
|
||||||
|
assertEquals("split index3 has wrong number of documents", max / 3, numDocs3);
|
||||||
} finally {
|
} finally {
|
||||||
if (request != null) request.close(); // decrefs the searcher
|
if (request != null) request.close(); // decrefs the searcher
|
||||||
if (directory != null) {
|
if (directory != null) {
|
||||||
|
|
Loading…
Reference in New Issue