LUCENE-5871: one one thread can IW.close at once

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1617961 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2014-08-14 14:51:05 +00:00
parent 23fdcfd45f
commit ba04a792ce
4 changed files with 159 additions and 27 deletions

View File

@ -889,29 +889,36 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
/**
* Implementation for {@link #close()} when {@link IndexWriterConfig#commitOnClose} is true.
* Gracefully closes (commits, waits for merges), but calls rollback
* if there's an exc so the IndexWriter is always closed. This is called
* from {@link #close} when {@link IndexWriterConfig#commitOnClose} is
* {@code true}.
*/
private void shutdown() throws IOException {
if (pendingCommit != null) {
throw new IllegalStateException("cannot close: prepareCommit was already called with no corresponding call to commit");
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now flush at close");
}
boolean success = false;
try {
flush(true, true);
finishMerges(true);
commit();
rollback(); // ie close, since we just committed
success = true;
} finally {
if (success == false) {
// Be certain to close the index on any exception
try {
rollback();
} catch (Throwable t) {
// Suppress so we keep throwing original exception
// Ensure that only one thread actually gets to do the
// closing
if (shouldClose()) {
boolean success = false;
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now flush at close");
}
flush(true, true);
finishMerges(true);
commitInternal(config.getMergePolicy());
rollbackInternal(); // ie close, since we just committed
success = true;
} finally {
if (success == false) {
// Be certain to close the index on any exception
try {
rollbackInternal();
} catch (Throwable t) {
// Suppress so we keep throwing original exception
}
}
}
}
@ -932,15 +939,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* will be closed, but changes may have been lost.</li>
* </ul>
*
* <p>
* Note that this may be a costly
* operation, so, try to re-use a single writer instead of
* closing and opening a new one. See {@link #commit()} for
* caveats about write caching done by some IO devices.
*
* <p><b>NOTE</b>: You must ensure no other threads are still making
* changes at the same time that this method is invoked.</p>
*/
@Override
public void close() throws IOException {
if (config.getCommitOnClose()) {
if (closed == false) {
shutdown();
}
shutdown();
} else {
rollback();
}

View File

@ -2610,7 +2610,7 @@ public class TestIndexWriter extends LuceneTestCase {
Analyzer analyzer = new MockAnalyzer(random());
Directory directory = newDirectory();
// we don't use RandomIndexWriter because it might add more docvalues than we expect !!!!1
// we don't use RandomIndexWriter because it might add more docvalues than we expect !!!!
IndexWriterConfig iwc = newIndexWriterConfig(analyzer);
iwc.setMergePolicy(newLogMergePolicy());
IndexWriter iwriter = new IndexWriter(directory, iwc);
@ -2763,4 +2763,61 @@ public class TestIndexWriter extends LuceneTestCase {
r.close();
dir.close();
}
/** Make sure that close waits for any still-running commits. */
public void testCloseDuringCommit() throws Exception {
final CountDownLatch startCommit = new CountDownLatch(1);
final CountDownLatch finishCommit = new CountDownLatch(1);
// infostream that "takes a long time" to commit
InfoStream slowCommittingInfoStream = new InfoStream() {
@Override
public void message(String component, String message) {
if (message.equals("finishStartCommit")) {
startCommit.countDown();
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
@Override
public boolean isEnabled(String component) {
return true;
}
@Override
public void close() throws IOException {}
};
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(null);
iwc.setInfoStream(slowCommittingInfoStream);
final IndexWriter iw = new IndexWriter(dir, iwc);
Document doc = new Document();
new Thread() {
@Override
public void run() {
try {
iw.commit();
finishCommit.countDown();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}.start();
startCommit.await();
try {
iw.close();
fail("didn't hit exception");
} catch (IllegalStateException ise) {
// expected
}
finishCommit.await();
iw.close();
dir.close();
}
}

View File

@ -658,4 +658,23 @@ public class TestIndexWriterCommit extends LuceneTestCase {
dir.close();
}
public void testPrepareCommitThenClose() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
w.addDocument(new Document());
w.prepareCommit();
try {
w.close();
fail("didn't hit exception");
} catch (IllegalStateException ise) {
// expected
}
w.commit();
w.close();
DirectoryReader r = DirectoryReader.open(dir);
assertEquals(1, r.maxDoc());
r.close();
dir.close();
}
}

View File

@ -1030,6 +1030,55 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
dir.close();
}
/** If IW hits OOME during indexing, it should refuse to commit any further changes. */
public void testOutOfMemoryErrorRollback() throws Exception {
final AtomicBoolean thrown = new AtomicBoolean(false);
final Directory dir = newDirectory();
final IndexWriter writer = new IndexWriter(dir,
newIndexWriterConfig(new MockAnalyzer(random()))
.setInfoStream(new InfoStream() {
@Override
public void message(String component, final String message) {
if (message.contains("startFullFlush") && thrown.compareAndSet(false, true)) {
throw new OutOfMemoryError("fake OOME at " + message);
}
}
@Override
public boolean isEnabled(String component) {
return true;
}
@Override
public void close() {}
}));
writer.addDocument(new Document());
try {
writer.commit();
fail("OutOfMemoryError expected");
}
catch (final OutOfMemoryError expected) {}
try {
writer.close();
} catch (IllegalStateException ise) {
// expected
}
try {
writer.addDocument(new Document());
} catch (AlreadyClosedException ace) {
// expected
}
// IW should have done rollback() during close, since it hit OOME, and so no index should exist:
assertFalse(DirectoryReader.indexExists(dir));
dir.close();
}
// LUCENE-1347
private static final class TestPoint4 implements RandomIndexWriter.TestPoint {
@ -2072,11 +2121,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
if (VERBOSE) {
System.out.println(" now 2nd close writer");
}
try {
w.close();
} catch (AlreadyClosedException ace) {
// OK
}
w.close();
w = null;
}