diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexReader.java b/lucene/core/src/java/org/apache/lucene/index/IndexReader.java index 0c52ed37f09..c4e939b4ba7 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexReader.java @@ -19,7 +19,7 @@ package org.apache.lucene.index; import org.apache.lucene.document.DocumentStoredFieldVisitor; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.Bits; +import org.apache.lucene.util.Bits; // javadocs import org.apache.lucene.util.IOUtils; import java.io.Closeable; @@ -30,7 +30,6 @@ import java.util.List; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicInteger; -// javadocs /** IndexReader is an abstract class, providing an interface for accessing a @@ -99,7 +98,7 @@ public abstract class IndexReader implements Closeable { */ public static interface ReaderClosedListener { /** Invoked when the {@link IndexReader} is closed. */ - public void onClose(IndexReader reader); + public void onClose(IndexReader reader) throws IOException; } private final Set readerClosedListeners = @@ -191,7 +190,7 @@ public abstract class IndexReader implements Closeable { */ public final void incRef() { if (!tryIncRef()) { - ensureOpen(); + ensureOpen(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 0faadaa6367..595a85c5198 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -3831,6 +3831,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { merge.readers.set(i, null); } } + + try { + merge.mergeFinished(); + } catch (Throwable t) { + if (th == null) { + th = t; + } + } // If any error occured, throw it. if (!suppressExceptions) { diff --git a/lucene/core/src/java/org/apache/lucene/index/LeafReader.java b/lucene/core/src/java/org/apache/lucene/index/LeafReader.java index 26b24da4020..2f9c60422ee 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LeafReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/LeafReader.java @@ -80,7 +80,7 @@ public abstract class LeafReader extends IndexReader { public static interface CoreClosedListener { /** Invoked when the shared core of the original {@code * SegmentReader} has closed. */ - public void onClose(Object ownerCoreCacheKey); + public void onClose(Object ownerCoreCacheKey) throws IOException; } private static class CoreClosedListenerWrapper implements ReaderClosedListener { @@ -92,7 +92,7 @@ public abstract class LeafReader extends IndexReader { } @Override - public void onClose(IndexReader reader) { + public void onClose(IndexReader reader) throws IOException { listener.onClose(reader.getCoreCacheKey()); } diff --git a/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java b/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java index 5571ea207f3..b6c32dfb5d2 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java +++ b/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java @@ -66,4 +66,9 @@ public final class LeafReaderContext extends IndexReaderContext { public LeafReader reader() { return reader; } -} \ No newline at end of file + + @Override + public String toString() { + return "LeafReaderContext(" + reader + " docBase=" + docBase + " ord=" + ord + ")"; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index b344d41e5ba..8c1d8305876 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -129,6 +129,10 @@ public abstract class MergePolicy { totalDocCount = count; } + /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */ + public void mergeFinished() throws IOException { + } + /** Expert: Get the list of readers to merge. Note that this list does not * necessarily match the list of segments to merge and should only be used * to feed SegmentMerger to initialize a merge. When a {@link OneMerge} diff --git a/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java b/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java index 0e6093ebd03..9a78fe10b56 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java @@ -28,7 +28,6 @@ import java.util.TreeMap; import org.apache.lucene.util.Bits; - /** An {@link LeafReader} which reads multiple, parallel indexes. Each index * added must have the same number of documents, but typically each contains * different fields. Deletions are taken from the first reader. @@ -322,4 +321,10 @@ public class ParallelLeafReader extends LeafReader { reader.checkIntegrity(); } } + + /** Returns the {@link LeafReader}s that were passed on init. */ + public LeafReader[] getParallelReaders() { + ensureOpen(); + return parallelReaders; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java b/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java index 89bb7f0f30c..ba920b843dc 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java @@ -67,6 +67,19 @@ public final class ReaderManager extends ReferenceManager { current = DirectoryReader.open(dir); } + /** + * Creates and returns a new ReaderManager from the given + * already-opened {@link DirectoryReader}, stealing + * the incoming reference. + * + * @param reader the directoryReader to use for future reopens + * + * @throws IOException If there is a low-level I/O error + */ + public ReaderManager(DirectoryReader reader) throws IOException { + current = reader; + } + @Override protected void decRef(DirectoryReader reference) throws IOException { reference.decRef(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java new file mode 100644 index 00000000000..859b46277b0 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java @@ -0,0 +1,1339 @@ +package org.apache.lucene.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongField; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.store.MockDirectoryWrapper.Throttling; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.StringHelper; +import org.apache.lucene.util.TestUtil; + +// TODO: +// - old parallel indices are only pruned on commit/close; can we do it on refresh? + +/** Simple example showing how to use ParallelLeafReader to index new + * stuff (postings, DVs, etc.) from previously stored fields, on the + * fly (during NRT reader reopen), after the initial indexing. The + * test indexes just a single stored field with text "content X" (X is + * a number embedded in the text). + * + * Then, on reopen, for any newly created segments (flush or merge), it + * builds a new parallel segment by loading all stored docs, parsing + * out that X, and adding it as DV and numeric indexed (trie) field. + * + * Finally, for searching, it builds a top-level MultiReader, with + * ParallelLeafReader for each segment, and then tests that random + * numeric range queries, and sorting by the new DV field, work + * correctly. + * + * Each per-segment index lives in a private directory next to the main + * index, and they are deleted once their segments are removed from the + * index. They are "volatile", meaning if e.g. the index is replicated to + * another machine, it's OK to not copy parallel segments indices, + * since they will just be regnerated (at a cost though). */ + +// @SuppressSysoutChecks(bugUrl="we print stuff") + +public class TestDemoParallelLeafReader extends LuceneTestCase { + + static final boolean DEBUG = false; + + static abstract class ReindexingReader implements Closeable { + + /** Key used to store the current schema gen in the SegmentInfo diagnostics */ + public final static String SCHEMA_GEN_KEY = "schema_gen"; + + public final IndexWriter w; + public final ReaderManager mgr; + + private final Directory indexDir; + private final Path root; + private final Path segsPath; + + /** Which segments have been closed, but their parallel index is not yet not removed. */ + private final Set closedSegments = Collections.newSetFromMap(new ConcurrentHashMap()); + + /** Holds currently open parallel readers for each segment. */ + private final Map parallelReaders = new ConcurrentHashMap<>(); + + void printRefCounts() { + System.out.println("All refCounts:"); + for(Map.Entry ent : parallelReaders.entrySet()) { + System.out.println(" " + ent.getKey() + " " + ent.getValue() + " refCount=" + ent.getValue().getRefCount()); + } + } + + public ReindexingReader(Path root) throws IOException { + this.root = root; + + // Normal index is stored under "index": + indexDir = openDirectory(root.resolve("index")); + + // Per-segment parallel indices are stored under subdirs "segs": + segsPath = root.resolve("segs"); + Files.createDirectories(segsPath); + + IndexWriterConfig iwc = getIndexWriterConfig(); + iwc.setMergePolicy(new ReindexingMergePolicy(iwc.getMergePolicy())); + w = new IndexWriter(indexDir, iwc); + + w.getConfig().setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() { + @Override + public void warm(LeafReader reader) throws IOException { + // This will build the parallel index for the merged segment before the merge becomes visible, so reopen delay is only due to + // newly flushed segments: + if (DEBUG) System.out.println(Thread.currentThread().getName() +": TEST: now warm " + reader); + // TODO: it's not great that we pass false here; it means we close the reader & reopen again for NRT reader; still we did "warm" by + // building the parallel index, if necessary + getParallelLeafReader(reader, false, getCurrentSchemaGen()); + } + }); + + // start with empty commit: + w.commit(); + mgr = new ReaderManager(new ParallelLeafDirectoryReader(DirectoryReader.open(w, true))); + } + + protected abstract IndexWriterConfig getIndexWriterConfig() throws IOException; + + /** Optional method to validate that the provided parallell reader in fact reflects the changes in schemaGen. */ + protected void checkParallelReader(LeafReader reader, LeafReader parallelReader, long schemaGen) throws IOException { + } + + /** Override to customize Directory impl. */ + protected Directory openDirectory(Path path) throws IOException { + return FSDirectory.open(path); + } + + public void commit() throws IOException { + w.commit(); + } + + LeafReader getCurrentReader(LeafReader reader, long schemaGen) throws IOException { + LeafReader parallelReader = getParallelLeafReader(reader, true, schemaGen); + if (parallelReader != null) { + + // We should not be embedding one ParallelLeafReader inside another: + assertFalse(parallelReader instanceof ParallelLeafReader); + assertFalse(reader instanceof ParallelLeafReader); + + // NOTE: important that parallelReader is first, so if there are field name overlaps, because changes to the schema + // overwrote existing field names, it wins: + LeafReader newReader = new ParallelLeafReader(false, parallelReader, reader) { + @Override + public Bits getLiveDocs() { + return getParallelReaders()[1].getLiveDocs(); + } + @Override + public int numDocs() { + return getParallelReaders()[1].numDocs(); + } + }; + + // Because ParallelLeafReader does its own (extra) incRef: + parallelReader.decRef(); + + return newReader; + + } else { + // This segment was already current as of currentSchemaGen: + return reader; + } + } + + private class ParallelLeafDirectoryReader extends FilterDirectoryReader { + public ParallelLeafDirectoryReader(DirectoryReader in) { + super(in, new FilterDirectoryReader.SubReaderWrapper() { + final long currentSchemaGen = getCurrentSchemaGen(); + @Override + public LeafReader wrap(LeafReader reader) { + try { + return getCurrentReader(reader, currentSchemaGen); + } catch (IOException ioe) { + // TODO: must close on exc here: + throw new RuntimeException(ioe); + } + } + }); + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) { + return new ParallelLeafDirectoryReader(in); + } + + @Override + protected void doClose() throws IOException { + Throwable firstExc = null; + for (final LeafReader r : getSequentialSubReaders()) { + if (r instanceof ParallelLeafReader) { + // try to close each reader, even if an exception is thrown + try { + r.decRef(); + } catch (Throwable t) { + if (firstExc == null) { + firstExc = t; + } + } + } + } + // Also close in, so it decRef's the SegmentInfos + try { + in.doClose(); + } catch (Throwable t) { + if (firstExc == null) { + firstExc = t; + } + } + // throw the first exception + IOUtils.reThrow(firstExc); + } + } + + @Override + public void close() throws IOException { + w.close(); + if (DEBUG) System.out.println("TEST: after close writer index=" + SegmentInfos.readLatestCommit(indexDir).toString(indexDir)); + + /* + DirectoryReader r = mgr.acquire(); + try { + TestUtil.checkReader(r); + } finally { + mgr.release(r); + } + */ + mgr.close(); + pruneOldSegments(true); + assertNoExtraSegments(); + indexDir.close(); + } + + // Make sure we deleted all parallel indices for segments that are no longer in the main index: + private void assertNoExtraSegments() throws IOException { + Set liveIDs = new HashSet(); + for(SegmentCommitInfo info : SegmentInfos.readLatestCommit(indexDir)) { + String idString = StringHelper.idToString(info.info.getId()); + liveIDs.add(idString); + } + + // At this point (closing) the only segments in closedSegments should be the still-live ones: + for(SegmentIDAndGen segIDGen : closedSegments) { + assertTrue(liveIDs.contains(segIDGen.segID)); + } + + boolean fail = false; + try (DirectoryStream stream = Files.newDirectoryStream(segsPath)) { + for (Path path : stream) { + SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString()); + if (liveIDs.contains(segIDGen.segID) == false) { + if (DEBUG) System.out.println("TEST: fail seg=" + path.getFileName() + " is not live but still has a parallel index"); + fail = true; + } + } + } + assertFalse(fail); + } + + private static class SegmentIDAndGen { + public final String segID; + public final long schemaGen; + + public SegmentIDAndGen(String segID, long schemaGen) { + this.segID = segID; + this.schemaGen = schemaGen; + } + + public SegmentIDAndGen(String s) { + String[] parts = s.split("_"); + if (parts.length != 2) { + throw new IllegalArgumentException("invalid SegmentIDAndGen \"" + s + "\""); + } + // TODO: better checking of segID? + segID = parts[0]; + schemaGen = Long.parseLong(parts[1]); + } + + @Override + public int hashCode() { + return (int) (segID.hashCode() * schemaGen); + } + + @Override + public boolean equals(Object _other) { + if (_other instanceof SegmentIDAndGen) { + SegmentIDAndGen other = (SegmentIDAndGen) _other; + return segID.equals(other.segID) && schemaGen == other.schemaGen; + } else { + return false; + } + } + + @Override + public String toString() { + return segID + "_" + schemaGen; + } + } + + private class ParallelReaderClosed implements LeafReader.ReaderClosedListener { + private final SegmentIDAndGen segIDGen; + private final Directory dir; + + public ParallelReaderClosed(SegmentIDAndGen segIDGen, Directory dir) { + this.segIDGen = segIDGen; + this.dir = dir; + } + + @Override + public void onClose(IndexReader ignored) { + try { + // TODO: make this sync finer, i.e. just the segment + schemaGen + synchronized(ReindexingReader.this) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now close parallel parLeafReader dir=" + dir + " segIDGen=" + segIDGen); + parallelReaders.remove(segIDGen); + dir.close(); + closedSegments.add(segIDGen); + } + } catch (IOException ioe) { + System.out.println("TEST: hit IOExc closing dir=" + dir); + ioe.printStackTrace(System.out); + throw new RuntimeException(ioe); + } + } + } + + // Returns a ref + LeafReader getParallelLeafReader(final LeafReader leaf, boolean doCache, long schemaGen) throws IOException { + assert leaf instanceof SegmentReader; + SegmentInfo info = ((SegmentReader) leaf).getSegmentInfo().info; + + long infoSchemaGen = getSchemaGen(info); + + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: getParallelLeafReader: " + leaf + " infoSchemaGen=" + infoSchemaGen + " vs schemaGen=" + schemaGen + " doCache=" + doCache); + + if (infoSchemaGen == schemaGen) { + if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: segment is already current schemaGen=" + schemaGen + "; skipping"); + return null; + } + + if (infoSchemaGen > schemaGen) { + throw new IllegalStateException("segment infoSchemaGen (" + infoSchemaGen + ") cannot be greater than requested schemaGen (" + schemaGen + ")"); + } + + final SegmentIDAndGen segIDGen = new SegmentIDAndGen(StringHelper.idToString(info.getId()), schemaGen); + + // While loop because the parallel reader may be closed out from under us, so we must retry: + while (true) { + + // TODO: make this sync finer, i.e. just the segment + schemaGen + synchronized (this) { + LeafReader parReader = parallelReaders.get(segIDGen); + + assert doCache || parReader == null; + + if (parReader == null) { + + Path leafIndex = segsPath.resolve(segIDGen.toString()); + + final Directory dir = openDirectory(leafIndex); + + if (Files.exists(leafIndex.resolve("done")) == false) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: build segment index for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex); + + if (dir.listAll().length != 0) { + // It crashed before finishing last time: + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: remove old incomplete index files: " + leafIndex); + IOUtils.rm(leafIndex); + } + + reindex(infoSchemaGen, schemaGen, leaf, dir); + + // Marker file, telling us this index is in fact done. This way if we crash while doing the reindexing for a given segment, we will + // later try again: + dir.createOutput("done", IOContext.DEFAULT).close(); + } else { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: segment index already exists for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex); + } + + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check index " + dir); + //TestUtil.checkIndex(dir); + + SegmentInfos infos = SegmentInfos.readLatestCommit(dir); + final LeafReader parLeafReader; + if (infos.size() == 1) { + parLeafReader = new SegmentReader(infos.info(0), IOContext.DEFAULT); + } else { + // This just means we didn't forceMerge above: + parLeafReader = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir)); + } + + //checkParallelReader(leaf, parLeafReader, schemaGen); + + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: opened parallel reader: " + parLeafReader); + if (doCache) { + parallelReaders.put(segIDGen, parLeafReader); + + // Our id+gen could have been previously closed, e.g. if it was a merged segment that was warmed, so we must clear this else + // the pruning may remove our directory: + closedSegments.remove(segIDGen); + + parLeafReader.addReaderClosedListener(new ParallelReaderClosed(segIDGen, dir)); + + } else { + // Used only for merged segment warming: + // Messy: we close this reader now, instead of leaving open for reuse: + if (DEBUG) System.out.println("TEST: now decRef non cached refCount=" + parLeafReader.getRefCount()); + parLeafReader.decRef(); + dir.close(); + + // Must do this after dir is closed, else another thread could "rm -rf" while we are closing (which makes MDW.close's + // checkIndex angry): + closedSegments.add(segIDGen); + parReader = null; + } + parReader = parLeafReader; + + } else { + if (parReader.tryIncRef() == false) { + // We failed: this reader just got closed by another thread, e.g. refresh thread opening a new reader, so this reader is now + // closed and we must try again. + if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: tryIncRef failed for " + parReader + "; retry"); + parReader = null; + continue; + } + if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: use existing already opened parReader=" + parReader + " refCount=" + parReader.getRefCount()); + //checkParallelReader(leaf, parReader, schemaGen); + } + + // We return the new reference to caller + return parReader; + } + } + } + + // TODO: we could pass a writer already opened...? + protected abstract void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException; + + /** Returns the gen for the current schema. */ + protected abstract long getCurrentSchemaGen(); + + /** Returns the gen that should be merged, meaning those changes will be folded back into the main index. */ + protected long getMergingSchemaGen() { + return getCurrentSchemaGen(); + } + + /** Removes the parallel index that are no longer in the last commit point. We can't + * remove this when the parallel reader is closed because it may still be referenced by + * the last commit. */ + private void pruneOldSegments(boolean removeOldGens) throws IOException { + SegmentInfos lastCommit = SegmentInfos.readLatestCommit(indexDir); + if (DEBUG) System.out.println("TEST: prune"); + + Set liveIDs = new HashSet(); + for(SegmentCommitInfo info : lastCommit) { + String idString = StringHelper.idToString(info.info.getId()); + liveIDs.add(idString); + } + + long currentSchemaGen = getCurrentSchemaGen(); + + if (Files.exists(segsPath)) { + try (DirectoryStream stream = Files.newDirectoryStream(segsPath)) { + for (Path path : stream) { + if (Files.isDirectory(path)) { + SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString()); + assert segIDGen.schemaGen <= currentSchemaGen; + if (liveIDs.contains(segIDGen.segID) == false && (closedSegments.contains(segIDGen) || (removeOldGens && segIDGen.schemaGen < currentSchemaGen))) { + if (DEBUG) System.out.println("TEST: remove " + segIDGen); + try { + IOUtils.rm(path); + closedSegments.remove(segIDGen); + } catch (IOException ioe) { + // OK, we'll retry later + if (DEBUG) System.out.println("TEST: ignore ioe during delete " + path + ":" + ioe); + } + } + } + } + } + } + } + + /** Just replaces the sub-readers with parallel readers, so reindexed fields are merged into new segments. */ + private class ReindexingMergePolicy extends MergePolicy { + + class ReindexingOneMerge extends OneMerge { + + List parallelReaders; + final long schemaGen; + + ReindexingOneMerge(List segments) { + super(segments); + // Commit up front to which schemaGen we will merge; we don't want a schema change sneaking in for some of our leaf readers but not others: + schemaGen = getMergingSchemaGen(); + long currentSchemaGen = getCurrentSchemaGen(); + + // Defensive sanity check: + if (schemaGen > currentSchemaGen) { + throw new IllegalStateException("currentSchemaGen (" + currentSchemaGen + ") must always be >= mergingSchemaGen (" + schemaGen + ")"); + } + } + + @Override + public List getMergeReaders() throws IOException { + if (parallelReaders == null) { + parallelReaders = new ArrayList<>(); + for (LeafReader reader : super.getMergeReaders()) { + parallelReaders.add(getCurrentReader(reader, schemaGen)); + } + } + + return parallelReaders; + } + + @Override + public void mergeFinished() throws IOException { + Throwable th = null; + for(LeafReader r : parallelReaders) { + if (r instanceof ParallelLeafReader) { + try { + r.decRef(); + } catch (Throwable t) { + if (th == null) { + th = t; + } + } + } + } + + // If any error occured, throw it. + IOUtils.reThrow(th); + } + + @Override + public void setInfo(SegmentCommitInfo info) { + // Record that this merged segment is current as of this schemaGen: + info.info.getDiagnostics().put(SCHEMA_GEN_KEY, Long.toString(schemaGen)); + super.setInfo(info); + } + + @Override + public MergePolicy.DocMap getDocMap(final MergeState mergeState) { + return super.getDocMap(mergeState); + } + } + + class ReindexingMergeSpecification extends MergeSpecification { + @Override + public void add(OneMerge merge) { + super.add(new ReindexingOneMerge(merge.segments)); + } + + @Override + public String segString(Directory dir) { + return "ReindexingMergeSpec(" + super.segString(dir) + ")"; + } + } + + MergeSpecification wrap(MergeSpecification spec) { + MergeSpecification wrapped = null; + if (spec != null) { + wrapped = new ReindexingMergeSpecification(); + for (OneMerge merge : spec.merges) { + wrapped.add(merge); + } + } + return wrapped; + } + + final MergePolicy in; + + /** Create a new {@code MergePolicy} that sorts documents with the given {@code sort}. */ + public ReindexingMergePolicy(MergePolicy in) { + this.in = in; + } + + @Override + public MergeSpecification findMerges(MergeTrigger mergeTrigger, + SegmentInfos segmentInfos, IndexWriter writer) throws IOException { + return wrap(in.findMerges(mergeTrigger, segmentInfos, writer)); + } + + @Override + public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, + int maxSegmentCount, Map segmentsToMerge, IndexWriter writer) + throws IOException { + // TODO: do we need to force-force this? Ie, wrapped MP may think index is already optimized, yet maybe its schemaGen is old? need test! + return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer)); + } + + @Override + public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer) + throws IOException { + return wrap(in.findForcedDeletesMerges(segmentInfos, writer)); + } + + @Override + public boolean useCompoundFile(SegmentInfos segments, + SegmentCommitInfo newSegment, IndexWriter writer) throws IOException { + return in.useCompoundFile(segments, newSegment, writer); + } + + @Override + public String toString() { + return "ReindexingMergePolicy(" + in + ")"; + } + } + + static long getSchemaGen(SegmentInfo info) { + String s = info.getDiagnostics().get(SCHEMA_GEN_KEY); + if (s == null) { + return -1; + } else { + return Long.parseLong(s); + } + } + } + + private ReindexingReader getReindexer(Path root) throws IOException { + return new ReindexingReader(root) { + @Override + protected IndexWriterConfig getIndexWriterConfig() throws IOException { + return newIndexWriterConfig(); + } + + @Override + protected Directory openDirectory(Path path) throws IOException { + MockDirectoryWrapper dir = newMockFSDirectory(path); + dir.setUseSlowOpenClosers(false); + dir.setThrottling(Throttling.NEVER); + return dir; + } + + @Override + protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException { + IndexWriterConfig iwc = newIndexWriterConfig(); + + // The order of our docIDs must precisely matching incoming reader: + iwc.setMergePolicy(new LogByteSizeMergePolicy()); + IndexWriter w = new IndexWriter(parallelDir, iwc); + int maxDoc = reader.maxDoc(); + + // Slowly parse the stored field into a new doc values field: + for(int i=0;i DV field each time. */ + private ReindexingReader getReindexerNewDVFields(Path root, final AtomicLong currentSchemaGen) throws IOException { + return new ReindexingReader(root) { + @Override + protected IndexWriterConfig getIndexWriterConfig() throws IOException { + return newIndexWriterConfig(); + } + + @Override + protected Directory openDirectory(Path path) throws IOException { + MockDirectoryWrapper dir = newMockFSDirectory(path); + dir.setUseSlowOpenClosers(false); + dir.setThrottling(Throttling.NEVER); + return dir; + } + + @Override + protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException { + IndexWriterConfig iwc = newIndexWriterConfig(); + + // The order of our docIDs must precisely matching incoming reader: + iwc.setMergePolicy(new LogByteSizeMergePolicy()); + IndexWriter w = new IndexWriter(parallelDir, iwc); + int maxDoc = reader.maxDoc(); + + if (oldSchemaGen <= 0) { + // Must slowly parse the stored field into a new doc values field: + for(int i=0;i 0 && random().nextInt(10) == 7) { + // Replace a doc + id = "" + random().nextInt(maxID); + updateID = id; + } else { + id = "" + (maxID++); + updateID = null; + } + + doc.add(newStringField("id", id, Field.Store.NO)); + doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); + if (updateID == null) { + reindexer.w.addDocument(doc); + } else { + reindexer.w.updateDocument(new Term("id", updateID), doc); + } + if (random().nextInt(refreshEveryNumDocs) == 17) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs"); + reindexer.mgr.maybeRefresh(); + + DirectoryReader r = reindexer.mgr.acquire(); + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r); + try { + checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); + } finally { + reindexer.mgr.release(r); + } + if (DEBUG) reindexer.printRefCounts(); + refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); + } + + if (random().nextInt(500) == 17) { + currentSchemaGen.incrementAndGet(); + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen); + } + + if (i > 0 && random().nextInt(10) == 7) { + // Random delete: + reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i))); + } + + if (random().nextInt(commitCloseNumDocs) == 17) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs"); + reindexer.commit(); + //reindexer.printRefCounts(); + commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); + } + + // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: + if (random().nextInt(commitCloseNumDocs) == 17) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs"); + reindexer.close(); + reindexer = null; + commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); + } + } + + if (reindexer != null) { + reindexer.close(); + } + } + + /** First schema change creates a new "number" DV field off the stored field; subsequent changes just change the value of that number + * field for all docs. */ + public void testRandomMultipleSchemaGensSameField() throws Exception { + + AtomicLong currentSchemaGen = new AtomicLong(); + AtomicLong mergingSchemaGen = new AtomicLong(); + + ReindexingReader reindexer = null; + + // TODO: separate refresh thread, search threads, indexing threads + int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 2000); + int maxID = 0; + Path root = createTempDir(); + int refreshEveryNumDocs = 100; + int commitCloseNumDocs = 1000; + + for(int i=0;i 0 && random().nextInt(10) == 7) { + // Replace a doc + id = "" + random().nextInt(maxID); + updateID = id; + } else { + id = "" + (maxID++); + updateID = null; + } + + doc.add(newStringField("id", id, Field.Store.NO)); + doc.add(newTextField("text", "number " + TestUtil.nextInt(random(), -10000, 10000), Field.Store.YES)); + if (updateID == null) { + reindexer.w.addDocument(doc); + } else { + reindexer.w.updateDocument(new Term("id", updateID), doc); + } + if (random().nextInt(refreshEveryNumDocs) == 17) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs"); + reindexer.mgr.maybeRefresh(); + DirectoryReader r = reindexer.mgr.acquire(); + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r); + try { + checkAllNumberDVs(r, "number", true, (int) currentSchemaGen.get()); + } finally { + reindexer.mgr.release(r); + } + if (DEBUG) reindexer.printRefCounts(); + refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); + } + + if (random().nextInt(500) == 17) { + currentSchemaGen.incrementAndGet(); + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen); + if (random().nextBoolean()) { + mergingSchemaGen.incrementAndGet(); + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance mergingSchemaGen to " + mergingSchemaGen); + } + } + + if (i > 0 && random().nextInt(10) == 7) { + // Random delete: + reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i))); + } + + if (random().nextInt(commitCloseNumDocs) == 17) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs"); + reindexer.commit(); + //reindexer.printRefCounts(); + commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); + } + + // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: + if (random().nextInt(commitCloseNumDocs) == 17) { + if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs"); + reindexer.close(); + reindexer = null; + commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); + } + } + + if (reindexer != null) { + reindexer.close(); + } + + // Verify main index never reflects schema changes beyond mergingSchemaGen: + try (Directory dir = newFSDirectory(root.resolve("index")); + IndexReader r = DirectoryReader.open(dir)) { + for (LeafReaderContext ctx : r.leaves()) { + LeafReader leaf = ctx.reader(); + NumericDocValues numbers = leaf.getNumericDocValues("number"); + if (numbers != null) { + int maxDoc = leaf.maxDoc(); + for(int i=0;i 0 && random().nextInt(10) == 7) { + // Replace a doc + id = "" + random().nextInt(maxID); + updateID = id; + } else { + id = "" + (maxID++); + updateID = null; + } + + doc.add(newStringField("id", id, Field.Store.NO)); + doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); + if (updateID == null) { + reindexer.w.addDocument(doc); + } else { + reindexer.w.updateDocument(new Term("id", updateID), doc); + } + + if (random().nextInt(refreshEveryNumDocs) == 17) { + if (DEBUG) System.out.println("TEST: refresh @ " + (i+1) + " docs"); + reindexer.mgr.maybeRefresh(); + DirectoryReader r = reindexer.mgr.acquire(); + if (DEBUG) System.out.println("TEST: got reader=" + r); + try { + checkAllNumberDVs(r); + IndexSearcher s = newSearcher(r); + testNumericDVSort(s); + testNumericRangeQuery(s); + } finally { + reindexer.mgr.release(r); + } + refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); + } + + if (i > 0 && random().nextInt(10) == 7) { + // Random delete: + reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i))); + } + + if (random().nextInt(1000) == 17) { + if (DEBUG) System.out.println("TEST: commit @ " + (i+1) + " docs"); + reindexer.commit(); + } + + // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: + if (random().nextInt(1000) == 17) { + if (DEBUG) System.out.println("TEST: close writer @ " + (i+1) + " docs"); + reindexer.close(); + reindexer = null; + } + } + if (reindexer != null) { + reindexer.close(); + } + } + + private static void checkAllNumberDVs(IndexReader r) throws IOException { + checkAllNumberDVs(r, "number", true, 1); + } + + private static void checkAllNumberDVs(IndexReader r, String fieldName, boolean doThrow, int multiplier) throws IOException { + NumericDocValues numbers = MultiDocValues.getNumericValues(r, fieldName); + int maxDoc = r.maxDoc(); + boolean failed = false; + long t0 = System.currentTimeMillis(); + for(int i=0;i= last); + assertEquals(value, numbers.get(scoreDoc.doc)); + last = value; + } + } + + private static void testNumericRangeQuery(IndexSearcher s) throws IOException { + NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number"); + for(int i=0;i<100;i++) { + // Confirm we can range search by the new indexed (numeric) field: + long min = random().nextLong(); + long max = random().nextLong(); + if (min > max) { + long x = min; + min = max; + max = x; + } + + TopDocs hits = s.search(NumericRangeQuery.newLongRange("number", min, max, true, true), 100); + for(ScoreDoc scoreDoc : hits.scoreDocs) { + long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]); + assertTrue(value >= min); + assertTrue(value <= max); + assertEquals(value, numbers.get(scoreDoc.doc)); + } + } + } + + // TODO: test exceptions +} diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java index 0f64def08f4..c794f9e91c3 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java @@ -56,8 +56,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.document.Field; +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.Field; import org.apache.lucene.document.FieldType; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; @@ -76,8 +78,8 @@ import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.Fields; import org.apache.lucene.index.IndexOptions; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader.ReaderClosedListener; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; @@ -104,8 +106,8 @@ import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.index.StorableField; import org.apache.lucene.index.StoredDocument; import org.apache.lucene.index.Terms; -import org.apache.lucene.index.TermsEnum; import org.apache.lucene.index.TermsEnum.SeekStatus; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.AssertingIndexSearcher; import org.apache.lucene.search.DocIdSetIterator; @@ -116,12 +118,12 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSLockFactory; import org.apache.lucene.store.FlushInfo; -import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext.Context; +import org.apache.lucene.store.IOContext; import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.MergeInfo; -import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper.Throttling; +import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.RateLimitedDirectoryWrapper; import org.apache.lucene.util.automaton.AutomatonTestUtil; @@ -138,7 +140,6 @@ import org.junit.Test; import org.junit.rules.RuleChain; import org.junit.rules.TestRule; import org.junit.runner.RunWith; - import com.carrotsearch.randomizedtesting.JUnit4MethodProvider; import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.MixWithSuiteName; @@ -149,16 +150,16 @@ import com.carrotsearch.randomizedtesting.annotations.Listeners; import com.carrotsearch.randomizedtesting.annotations.SeedDecorators; import com.carrotsearch.randomizedtesting.annotations.TestGroup; import com.carrotsearch.randomizedtesting.annotations.TestMethodProviders; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction.Action; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup.Group; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies.Consequence; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies; import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.rules.NoClassHooksShadowingRule; @@ -862,6 +863,11 @@ public abstract class LuceneTestCase extends Assert { dumpIterator(label, iter, stream); } + /** create a new index writer config with random defaults */ + public static IndexWriterConfig newIndexWriterConfig() { + return newIndexWriterConfig(new MockAnalyzer(random())); + } + /** create a new index writer config with random defaults */ public static IndexWriterConfig newIndexWriterConfig(Analyzer a) { return newIndexWriterConfig(random(), a);