LUCENE_6524: init IndexWriter from already opened reader

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1687992 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-06-28 08:07:50 +00:00
parent e85c95c75c
commit 2d0ac9bb7f
15 changed files with 699 additions and 33 deletions

View File

@ -105,6 +105,10 @@ New Features
were sorted, when SortingMergePolicy was used (Christine Poerschke
via Mike McCandless)
* LUCENE-6524: IndexWriter can now be initialized from an already open
near-real-time or non-NRT reader. (Boaz Leskes, Robert Muir, Mike
McCandless)
API Changes
* LUCENE-6508: Simplify Lock api, there is now just

View File

@ -228,7 +228,7 @@ public abstract class DirectoryReader extends BaseCompositeReader<LeafReader> {
SegmentInfos latest = SegmentInfos.readLatestCommit(dir);
final long currentGen = latest.getGeneration();
commits.add(new StandardDirectoryReader.ReaderCommit(latest, dir));
commits.add(new StandardDirectoryReader.ReaderCommit(null, latest, dir));
for(int i=0;i<files.length;i++) {
@ -253,8 +253,9 @@ public abstract class DirectoryReader extends BaseCompositeReader<LeafReader> {
// as if the file does not exist
}
if (sis != null)
commits.add(new StandardDirectoryReader.ReaderCommit(sis, dir));
if (sis != null) {
commits.add(new StandardDirectoryReader.ReaderCommit(null, sis, dir));
}
}
}

View File

@ -41,6 +41,8 @@ import org.apache.lucene.store.Directory;
* @lucene.experimental
*/
// TODO: this is now a poor name, because this class also represents a
// point-in-time view from an NRT reader
public abstract class IndexCommit implements Comparable<IndexCommit> {
/**
@ -121,4 +123,9 @@ public abstract class IndexCommit implements Comparable<IndexCommit> {
long comgen = commit.getGeneration();
return Long.compare(gen, comgen);
}
/** Package-private API for IndexWriter to init from a commit-point pulled from an NRT or non-NRT reader. */
StandardDirectoryReader getReader() {
return null;
}
}

View File

@ -128,7 +128,8 @@ final class IndexFileDeleter implements Closeable {
* @throws IOException if there is a low-level IO error
*/
public IndexFileDeleter(Directory directoryOrig, Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos,
InfoStream infoStream, IndexWriter writer, boolean initialIndexExists) throws IOException {
InfoStream infoStream, IndexWriter writer, boolean initialIndexExists,
boolean isReaderInit) throws IOException {
Objects.requireNonNull(writer);
this.infoStream = infoStream;
this.writer = writer;
@ -219,6 +220,11 @@ final class IndexFileDeleter implements Closeable {
incRef(sis, true);
}
if (isReaderInit) {
// Incoming SegmentInfos may have NRT changes not yet visible in the latest commit, so we have to protect its files from deletion too:
checkpoint(segmentInfos, false);
}
// We keep commits list in sorted order (oldest to newest):
CollectionUtil.timSort(commits);
@ -247,7 +253,11 @@ final class IndexFileDeleter implements Closeable {
// sometime it may not be the most recent commit
checkpoint(segmentInfos, false);
startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted();
if (currentCommitPoint == null) {
startingCommitDeleted = false;
} else {
startingCommitDeleted = currentCommitPoint.isDeleted();
}
deleteCommits();
}
@ -302,7 +312,7 @@ final class IndexFileDeleter implements Closeable {
}
// Generation is advanced before write:
infos.setGeneration(Math.max(infos.getGeneration(), maxSegmentGen));
infos.setNextWriteGeneration(Math.max(infos.getGeneration(), maxSegmentGen));
if (infos.counter < 1+maxSegmentName) {
if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "init: inflate infos.counter to " + (1+maxSegmentName) + " vs current=" + infos.counter);

View File

@ -755,7 +755,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
conf.setIndexWriter(this); // prevent reuse by other instances
config = conf;
infoStream = config.getInfoStream();
// obtain the write.lock. If the user configured a timeout,
// we wrap with a sleeper and this might take some time.
writeLock = d.obtainLock(WRITE_LOCK_NAME);
@ -792,8 +792,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// IndexFormatTooOldException.
boolean initialIndexExists = true;
boolean fromReader = false;
// Set up our initial SegmentInfos:
IndexCommit commit = config.getIndexCommit();
// Set up our initial SegmentInfos:
StandardDirectoryReader reader;
if (commit == null) {
reader = null;
} else {
reader = commit.getReader();
}
if (create) {
if (config.getIndexCommit() != null) {
// We cannot both open from a commit point and create:
if (mode == OpenMode.CREATE) {
throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() with OpenMode.CREATE");
} else {
throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() when index has no commit");
}
}
// Try to read first. This is to allow create
// against an index that's currently open for
// searching. In this case we write the next
@ -810,10 +832,58 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
segmentInfos = sis;
rollbackSegments = segmentInfos.createBackupSegmentInfos();
// Record that we have a change (zero out all
// segments) pending:
changed();
} else if (reader != null) {
// Init from an existing already opened NRT or non-NRT reader:
if (reader.directory() != commit.getDirectory()) {
throw new IllegalArgumentException("IndexCommit's reader must have the same directory as the IndexCommit");
}
if (reader.directory() != directoryOrig) {
throw new IllegalArgumentException("IndexCommit's reader must have the same directory passed to IndexWriter");
}
if (reader.segmentInfos.getLastGeneration() == 0) {
// TODO: maybe we could allow this? It's tricky...
throw new IllegalArgumentException("index must already have an initial commit to open from reader");
}
// Must clone because we don't want the incoming NRT reader to "see" any changes this writer now makes:
segmentInfos = reader.segmentInfos.clone();
SegmentInfos lastCommit;
try {
lastCommit = SegmentInfos.readCommit(directoryOrig, segmentInfos.getSegmentsFileName());
} catch (IOException ioe) {
throw new IllegalArgumentException("the provided reader is stale: its prior commit file \"" + segmentInfos.getSegmentsFileName() + "\" is missing from index");
}
if (reader.writer != null) {
// The old writer better be closed (we have the write lock now!):
assert reader.writer.closed;
// In case the old writer wrote further segments (which we are now dropping),
// update SIS metadata so we remain write-once:
segmentInfos.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
lastCommit.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
}
rollbackSegments = lastCommit.createBackupSegmentInfos();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "init from reader " + reader);
messageState();
}
} else {
// Init from either the latest commit point, or an explicit prior commit point:
String[] files = directory.listAll();
String lastSegmentsFile = SegmentInfos.getLastCommitSegmentsFileName(files);
if (lastSegmentsFile == null) {
@ -824,29 +894,34 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// retrying it does is not necessary here (we hold the write lock):
segmentInfos = SegmentInfos.readCommit(directoryOrig, lastSegmentsFile);
IndexCommit commit = config.getIndexCommit();
if (commit != null) {
// Swap out all segments, but, keep metadata in
// SegmentInfos, like version & generation, to
// preserve write-once. This is important if
// readers are open against the future commit
// points.
if (commit.getDirectory() != directoryOrig)
if (commit.getDirectory() != directoryOrig) {
throw new IllegalArgumentException("IndexCommit's directory doesn't match my directory, expected=" + directoryOrig + ", got=" + commit.getDirectory());
}
SegmentInfos oldInfos = SegmentInfos.readCommit(directoryOrig, commit.getSegmentsFileName());
segmentInfos.replace(oldInfos);
changed();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "init: loaded commit \"" + commit.getSegmentsFileName() + "\"");
}
}
rollbackSegments = segmentInfos.createBackupSegmentInfos();
}
rollbackSegments = segmentInfos.createBackupSegmentInfos();
pendingNumDocs.set(segmentInfos.totalMaxDoc());
// start with previous field numbers, but new FieldInfos
// NOTE: this is correct even for an NRT reader because we'll pull FieldInfos even for the un-committed segments:
globalFieldNumberMap = getFieldNumberMap();
config.getFlushPolicy().init(config);
docWriter = new DocumentsWriter(this, config, directoryOrig, directory);
eventQueue = docWriter.eventQueue();
@ -857,7 +932,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
deleter = new IndexFileDeleter(directoryOrig, directory,
config.getIndexDeletionPolicy(),
segmentInfos, infoStream, this,
initialIndexExists);
initialIndexExists, reader != null);
// We incRef all files when we return an NRT reader from IW, so all files must exist even in the NRT case:
assert create || filesExist(segmentInfos);
}
if (deleter.startingCommitDeleted) {
@ -868,6 +946,25 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
changed();
}
if (reader != null) {
// Pre-enroll all segment readers into the reader pool; this is necessary so
// any in-memory NRT live docs are correctly carried over, and so NRT readers
// pulled from this IW share the same segment reader:
List<LeafReaderContext> leaves = reader.leaves();
assert segmentInfos.size() == leaves.size();
for (int i=0;i<leaves.size();i++) {
LeafReaderContext leaf = leaves.get(i);
SegmentReader segReader = (SegmentReader) leaf.reader();
SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.numDocs());
readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(this, newReader));
}
// We always assume we are carrying over incoming changes when opening from reader:
segmentInfos.changed();
changed();
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "init: create=" + create);
messageState();
@ -885,7 +982,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
}
// reads latest field infos for the commit
// this is used on IW init and addIndexes(Dir) to create/update the global field map.
// TODO: fix tests abusing this method!

View File

@ -185,7 +185,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
/**
* Expert: allows to open a certain commit point. The default is null which
* opens the latest commit point.
* opens the latest commit point. This can also be used to open {@link IndexWriter}
* from a near-real-time reader, if you pass the reader's
* {@link DirectoryReader#getIndexCommit}.
*
* <p>Only takes effect when IndexWriter is first created. */
public IndexWriterConfig setIndexCommit(IndexCommit commit) {

View File

@ -82,11 +82,24 @@ class ReadersAndUpdates {
private final Map<String,DocValuesFieldUpdates> mergingDVUpdates = new HashMap<>();
public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) {
this.info = info;
this.writer = writer;
this.info = info;
liveDocsShared = true;
}
/** Init from a previously opened SegmentReader.
*
* <p>NOTE: steals incoming ref from reader. */
public ReadersAndUpdates(IndexWriter writer, SegmentReader reader) {
this.writer = writer;
this.reader = reader;
info = reader.getSegmentInfo();
liveDocs = reader.getLiveDocs();
liveDocsShared = true;
pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount();
assert pendingDeleteCount >= 0: "got " + pendingDeleteCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs();
}
public void incRef() {
final int rc = refCount.incrementAndGet();
assert rc > 1;

View File

@ -704,9 +704,16 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
generation = other.generation;
}
void setGeneration(long generation) {
// Carry over generation numbers, and version/counter, from another SegmentInfos
void updateGenerationVersionAndCounter(SegmentInfos other) {
updateGeneration(other);
this.version = other.version;
this.counter = other.counter;
}
void setNextWriteGeneration(long generation) {
assert generation >= this.generation;
this.generation = generation;
this.lastGeneration = generation;
}
final void rollbackCommit(Directory dir) {

View File

@ -107,6 +107,12 @@ public final class SegmentReader extends CodecReader {
* liveDocs. Used by IndexWriter to provide a new NRT
* reader */
SegmentReader(SegmentCommitInfo si, SegmentReader sr, Bits liveDocs, int numDocs) throws IOException {
if (numDocs > si.info.maxDoc()) {
throw new IllegalArgumentException("numDocs=" + numDocs + " but maxDoc=" + si.info.maxDoc());
}
if (liveDocs != null && liveDocs.length() != si.info.maxDoc()) {
throw new IllegalArgumentException("maxDoc=" + si.info.maxDoc() + " but liveDocs.size()=" + liveDocs.length());
}
this.si = si;
this.liveDocs = liveDocs;
this.numDocs = numDocs;

View File

@ -32,8 +32,8 @@ import org.apache.lucene.util.IOUtils;
final class StandardDirectoryReader extends DirectoryReader {
private final IndexWriter writer;
private final SegmentInfos segmentInfos;
final IndexWriter writer;
final SegmentInfos segmentInfos;
private final boolean applyAllDeletes;
/** called only from static open() methods */
@ -383,7 +383,7 @@ final class StandardDirectoryReader extends DirectoryReader {
@Override
public IndexCommit getIndexCommit() throws IOException {
ensureOpen();
return new ReaderCommit(segmentInfos, directory);
return new ReaderCommit(this, segmentInfos, directory);
}
static final class ReaderCommit extends IndexCommit {
@ -393,14 +393,18 @@ final class StandardDirectoryReader extends DirectoryReader {
long generation;
final Map<String,String> userData;
private final int segmentCount;
private final StandardDirectoryReader reader;
ReaderCommit(SegmentInfos infos, Directory dir) throws IOException {
ReaderCommit(StandardDirectoryReader reader, SegmentInfos infos, Directory dir) throws IOException {
segmentsFileName = infos.getSegmentsFileName();
this.dir = dir;
userData = infos.getUserData();
files = Collections.unmodifiableCollection(infos.files(true));
generation = infos.getGeneration();
segmentCount = infos.size();
// NOTE: we intentionally do not incRef this! Else we'd need to make IndexCommit Closeable...
this.reader = reader;
}
@Override
@ -447,5 +451,10 @@ final class StandardDirectoryReader extends DirectoryReader {
public void delete() {
throw new UnsupportedOperationException("This IndexCommit does not support deletions");
}
@Override
StandardDirectoryReader getReader() {
return reader;
}
}
}

View File

@ -2424,7 +2424,7 @@ public class TestIndexWriter extends LuceneTestCase {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
final SetOnce<IndexWriter> iwRef = new SetOnce<>();
IndexWriter evilWriter = RandomIndexWriter.mockIndexWriter(dir, iwc, new RandomIndexWriter.TestPoint() {
IndexWriter evilWriter = RandomIndexWriter.mockIndexWriter(random(), dir, iwc, new RandomIndexWriter.TestPoint() {
@Override
public void apply(String message) {
if ("startCommitMerge".equals(message)) {
@ -2594,7 +2594,7 @@ public class TestIndexWriter extends LuceneTestCase {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(null);
// use an infostream that "takes a long time" to commit
final IndexWriter iw = RandomIndexWriter.mockIndexWriter(dir, iwc, new RandomIndexWriter.TestPoint() {
final IndexWriter iw = RandomIndexWriter.mockIndexWriter(random(), dir, iwc, new RandomIndexWriter.TestPoint() {
@Override
public void apply(String message) {
if (message.equals("finishStartCommit")) {

View File

@ -246,7 +246,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases.
IndexWriter writer = RandomIndexWriter.mockIndexWriter(dir, newIndexWriterConfig(analyzer)
IndexWriter writer = RandomIndexWriter.mockIndexWriter(random(), dir, newIndexWriterConfig(analyzer)
.setRAMBufferSizeMB(0.1)
.setMergeScheduler(new ConcurrentMergeScheduler()), new TestPoint1());
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
@ -290,7 +290,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
Directory dir = newDirectory();
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases.
IndexWriter writer = RandomIndexWriter.mockIndexWriter(dir, newIndexWriterConfig(analyzer)
IndexWriter writer = RandomIndexWriter.mockIndexWriter(random(), dir, newIndexWriterConfig(analyzer)
.setRAMBufferSizeMB(0.2)
.setMergeScheduler(new ConcurrentMergeScheduler()), new TestPoint1());
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
@ -372,7 +372,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
public void testExceptionDocumentsWriterInit() throws IOException {
Directory dir = newDirectory();
TestPoint2 testPoint = new TestPoint2();
IndexWriter w = RandomIndexWriter.mockIndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())), testPoint);
IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, newIndexWriterConfig(new MockAnalyzer(random())), testPoint);
Document doc = new Document();
doc.add(newTextField("field", "a field", Field.Store.YES));
w.addDocument(doc);
@ -406,7 +406,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
}
};
IndexWriter w = RandomIndexWriter.mockIndexWriter(dir,
IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir,
newIndexWriterConfig(analyzer)
.setMaxBufferedDocs(2),
new TestPoint1());
@ -452,7 +452,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
conf.setMergeScheduler(cms);
((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2);
TestPoint3 testPoint = new TestPoint3();
IndexWriter w = RandomIndexWriter.mockIndexWriter(dir, conf, testPoint);
IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, conf, testPoint);
testPoint.doFail = true;
Document doc = new Document();
doc.add(newTextField("field", "a field", Field.Store.YES));
@ -1102,7 +1102,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
public void testRollbackExceptionHang() throws Throwable {
Directory dir = newDirectory();
TestPoint4 testPoint = new TestPoint4();
IndexWriter w = RandomIndexWriter.mockIndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())), testPoint);
IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, newIndexWriterConfig(new MockAnalyzer(random())), testPoint);
addDoc(w);

View File

@ -0,0 +1,488 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class TestIndexWriterFromReader extends LuceneTestCase {
// Pull NRT reader immediately after writer has committed
public void testRightAfterCommit() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.commit();
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(1, r.maxDoc());
w.close();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
IndexWriter w2 = new IndexWriter(dir, iwc);
r.close();
assertEquals(1, w2.maxDoc());
w2.addDocument(new Document());
assertEquals(2, w2.maxDoc());
w2.close();
IndexReader r2 = DirectoryReader.open(dir);
assertEquals(2, r2.maxDoc());
r2.close();
dir.close();
}
// Open from non-NRT reader
public void testFromNonNRTReader() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.close();
DirectoryReader r = DirectoryReader.open(dir);
assertEquals(1, r.maxDoc());
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
IndexWriter w2 = new IndexWriter(dir, iwc);
assertEquals(1, r.maxDoc());
r.close();
assertEquals(1, w2.maxDoc());
w2.addDocument(new Document());
assertEquals(2, w2.maxDoc());
w2.close();
IndexReader r2 = DirectoryReader.open(dir);
assertEquals(2, r2.maxDoc());
r2.close();
dir.close();
}
// Pull NRT reader from a writer on a new index with no commit:
public void testWithNoFirstCommit() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
DirectoryReader r = DirectoryReader.open(w, true);
w.rollback();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
try {
new IndexWriter(dir, iwc);
fail("did not hit expected exception");
} catch (IllegalArgumentException iae) {
// expected
assertEquals("cannot use IndexWriterConfig.setIndexCommit() when index has no commit", iae.getMessage());
}
r.close();
dir.close();
}
// Pull NRT reader after writer has committed and then indexed another doc:
public void testAfterCommitThenIndex() throws Exception {
Directory dir = newDirectory();
if (dir instanceof MockDirectoryWrapper) {
// We only hit exc if stale segments file was deleted:
((MockDirectoryWrapper) dir).setEnableVirusScanner(false);
}
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.commit();
w.addDocument(new Document());
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(2, r.maxDoc());
w.close();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
try {
new IndexWriter(dir, iwc);
fail("did not hit expected exception");
} catch (IllegalArgumentException iae) {
// expected
assertTrue(iae.getMessage().contains("the provided reader is stale: its prior commit file"));
}
r.close();
dir.close();
}
// NRT rollback: pull NRT reader after writer has committed and then before indexing another doc
public void testNRTRollback() throws Exception {
Directory dir = newDirectory();
if (dir instanceof MockDirectoryWrapper) {
// We only hit exc if stale segments file was deleted:
((MockDirectoryWrapper) dir).setEnableVirusScanner(false);
}
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.commit();
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(1, r.maxDoc());
// Add another doc
w.addDocument(new Document());
assertEquals(2, w.maxDoc());
w.close();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
try {
new IndexWriter(dir, iwc);
fail("did not hit expected exception");
} catch (IllegalArgumentException iae) {
// expected
assertTrue(iae.getMessage().contains("the provided reader is stale: its prior commit file"));
}
r.close();
dir.close();
}
public void testRandom() throws Exception {
Directory dir = newDirectory();
if (dir instanceof MockDirectoryWrapper) {
// Since we rollback writer we can easily try to write to the same filenames:
((MockDirectoryWrapper) dir).setPreventDoubleWrite(false);
}
int numOps = atLeast(100);
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
// We must have a starting commit for this test because whenever we rollback with
// an NRT reader, the commit before that NRT reader must exist
w.commit();
DirectoryReader r = DirectoryReader.open(w, true);
int nrtReaderNumDocs = 0;
int writerNumDocs = 0;
boolean commitAfterNRT = false;
Set<Integer> liveIDs = new HashSet<>();
Set<Integer> nrtLiveIDs = new HashSet<>();
for(int op=0;op<numOps;op++) {
if (VERBOSE) {
System.out.println("\nITER op=" + op + " nrtReaderNumDocs=" + nrtReaderNumDocs + " writerNumDocs=" + writerNumDocs + " r=" + r + " r.numDocs()=" + r.numDocs());
}
assertEquals(nrtReaderNumDocs, r.numDocs());
int x = random().nextInt(5);
switch(x) {
case 0:
if (VERBOSE) {
System.out.println(" add doc id=" + op);
}
// add doc
Document doc = new Document();
doc.add(newStringField("id", ""+op, Field.Store.NO));
w.addDocument(doc);
liveIDs.add(op);
writerNumDocs++;
break;
case 1:
if (VERBOSE) {
System.out.println(" delete doc");
}
// delete docs
if (liveIDs.size() > 0) {
int id = random().nextInt(op);
if (VERBOSE) {
System.out.println(" id=" + id);
}
w.deleteDocuments(new Term("id", ""+id));
if (liveIDs.remove(id)) {
if (VERBOSE) {
System.out.println(" really deleted");
}
writerNumDocs--;
}
} else {
if (VERBOSE) {
System.out.println(" nothing to delete yet");
}
}
break;
case 2:
// reopen NRT reader
if (VERBOSE) {
System.out.println(" reopen NRT reader");
}
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
if (r2 != null) {
r.close();
r = r2;
if (VERBOSE) {
System.out.println(" got new reader oldNumDocs=" + nrtReaderNumDocs + " newNumDocs=" + writerNumDocs);
}
nrtReaderNumDocs = writerNumDocs;
nrtLiveIDs = new HashSet<>(liveIDs);
} else {
if (VERBOSE) {
System.out.println(" reader is unchanged");
}
assertEquals(nrtReaderNumDocs, r.numDocs());
}
commitAfterNRT = false;
break;
case 3:
if (commitAfterNRT == false) {
// rollback writer to last nrt reader
if (random().nextBoolean()) {
if (VERBOSE) {
System.out.println(" close writer and open new writer from non-NRT reader numDocs=" + w.numDocs());
}
w.close();
r.close();
r = DirectoryReader.open(dir);
assertEquals(writerNumDocs, r.numDocs());
nrtReaderNumDocs = writerNumDocs;
nrtLiveIDs = new HashSet<>(liveIDs);
} else {
if (VERBOSE) {
System.out.println(" rollback writer and open new writer from NRT reader numDocs=" + w.numDocs());
}
w.rollback();
}
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
w = new IndexWriter(dir, iwc);
writerNumDocs = nrtReaderNumDocs;
liveIDs = new HashSet<>(nrtLiveIDs);
r.close();
r = DirectoryReader.open(w, true);
}
break;
case 4:
if (VERBOSE) {
System.out.println(" commit");
}
w.commit();
commitAfterNRT = true;
break;
}
}
IOUtils.close(w, r, dir);
}
public void testConsistentFieldNumbers() throws Exception {
Directory dir = newDirectory();
if (dir instanceof MockDirectoryWrapper) {
// Since we use IW.rollback and then open another, the 2nd IW can easily write to the same segment name:
((MockDirectoryWrapper) dir).setPreventDoubleWrite(false);
}
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
// Empty first commit:
w.commit();
Document doc = new Document();
doc.add(newStringField("f0", "foo", Field.Store.NO));
w.addDocument(doc);
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(1, r.maxDoc());
doc = new Document();
doc.add(newStringField("f1", "foo", Field.Store.NO));
w.addDocument(doc);
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
assertNotNull(r2);
r.close();
assertEquals(2, r2.maxDoc());
w.rollback();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r2.getIndexCommit());
IndexWriter w2 = new IndexWriter(dir, iwc);
r2.close();
doc = new Document();
doc.add(newStringField("f1", "foo", Field.Store.NO));
doc.add(newStringField("f0", "foo", Field.Store.NO));
w2.addDocument(doc);
w2.close();
dir.close();
}
public void testInvalidOpenMode() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.commit();
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(1, r.maxDoc());
w.close();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
iwc.setIndexCommit(r.getIndexCommit());
try {
new IndexWriter(dir, iwc);
fail("did not hit exception");
} catch (IllegalArgumentException iae) {
// expected
assertEquals("cannot use IndexWriterConfig.setIndexCommit() with OpenMode.CREATE", iae.getMessage());
}
IOUtils.close(r, dir);
}
public void testOnClosedReader() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.commit();
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(1, r.maxDoc());
IndexCommit commit = r.getIndexCommit();
r.close();
w.close();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(commit);
try {
new IndexWriter(dir, iwc);
fail("did not hit exception");
} catch (AlreadyClosedException ace) {
// expected
}
IOUtils.close(r, dir);
}
public void testStaleNRTReader() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.commit();
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(1, r.maxDoc());
w.addDocument(new Document());
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
assertNotNull(r2);
r2.close();
w.rollback();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
w = new IndexWriter(dir, iwc);
assertEquals(1, w.numDocs());
r.close();
DirectoryReader r3 = DirectoryReader.open(w, true);
assertEquals(1, r3.numDocs());
w.addDocument(new Document());
DirectoryReader r4 = DirectoryReader.openIfChanged(r3);
r3.close();
assertEquals(2, r4.numDocs());
r4.close();
w.close();
IOUtils.close(r, dir);
}
public void testAfterRollback() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
w.commit();
w.addDocument(new Document());
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(2, r.maxDoc());
w.rollback();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
w = new IndexWriter(dir, iwc);
assertEquals(2, w.numDocs());
r.close();
w.close();
DirectoryReader r2 = DirectoryReader.open(dir);
assertEquals(2, r2.numDocs());
IOUtils.close(r2, dir);
}
// Pull NRT reader after writer has committed and then indexed another doc:
public void testAfterCommitThenIndexKeepCommits() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
// Keep all commits:
iwc.setIndexDeletionPolicy(new IndexDeletionPolicy() {
@Override
public void onInit(List<? extends IndexCommit> commits) {
}
@Override
public void onCommit(List<? extends IndexCommit> commits) {
}
});
IndexWriter w = new IndexWriter(dir, iwc);
w.addDocument(new Document());
w.commit();
w.addDocument(new Document());
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(2, r.maxDoc());
w.addDocument(new Document());
DirectoryReader r2 = DirectoryReader.open(w, true);
assertEquals(3, r2.maxDoc());
IOUtils.close(r2, w);
// r is not stale because, even though we've committed the original writer since it was open, we are keeping all commit points:
iwc = newIndexWriterConfig();
iwc.setIndexCommit(r.getIndexCommit());
IndexWriter w2 = new IndexWriter(dir, iwc);
assertEquals(2, w2.maxDoc());
IOUtils.close(r, w2, dir);
}
}

View File

@ -114,7 +114,7 @@ public class TestIndexWriterOutOfFileDescriptors extends LuceneTestCase {
if (VERBOSE) {
System.out.println("TEST: iter=" + iter + ": success");
}
} catch (IOException ioe) {
} catch (AssertionError | IOException ioe) {
if (VERBOSE) {
System.out.println("TEST: iter=" + iter + ": exception");
ioe.printStackTrace();

View File

@ -56,7 +56,7 @@ public class RandomIndexWriter implements Closeable {
public static IndexWriter mockIndexWriter(Directory dir, IndexWriterConfig conf, Random r) throws IOException {
// Randomly calls Thread.yield so we mixup thread scheduling
final Random random = new Random(r.nextLong());
return mockIndexWriter(dir, conf, new TestPoint() {
return mockIndexWriter(r, dir, conf, new TestPoint() {
@Override
public void apply(String message) {
if (random.nextInt(4) == 2)
@ -66,9 +66,31 @@ public class RandomIndexWriter implements Closeable {
}
/** Returns an indexwriter that enables the specified test point */
public static IndexWriter mockIndexWriter(Directory dir, IndexWriterConfig conf, TestPoint testPoint) throws IOException {
public static IndexWriter mockIndexWriter(Random r, Directory dir, IndexWriterConfig conf, TestPoint testPoint) throws IOException {
conf.setInfoStream(new TestPointInfoStream(conf.getInfoStream(), testPoint));
IndexWriter iw = new IndexWriter(dir, conf);
DirectoryReader reader = null;
if (r.nextBoolean() && DirectoryReader.indexExists(dir) && conf.getOpenMode() != IndexWriterConfig.OpenMode.CREATE) {
if (LuceneTestCase.VERBOSE) {
System.out.println("RIW: open writer from reader");
}
reader = DirectoryReader.open(dir);
conf.setIndexCommit(reader.getIndexCommit());
}
IndexWriter iw;
boolean success = false;
try {
iw = new IndexWriter(dir, conf);
success = true;
} finally {
if (reader != null) {
if (success) {
IOUtils.close(reader);
} else {
IOUtils.closeWhileHandlingException(reader);
}
}
}
iw.enableTestPoints = true;
return iw;
}
@ -418,7 +440,7 @@ public class RandomIndexWriter implements Closeable {
/**
* Simple interface that is executed for each <tt>TP</tt> {@link InfoStream} component
* message. See also {@link RandomIndexWriter#mockIndexWriter(Directory, IndexWriterConfig, TestPoint)}
* message. See also {@link RandomIndexWriter#mockIndexWriter(Random, Directory, IndexWriterConfig, TestPoint)}
*/
public static interface TestPoint {
public abstract void apply(String message);