Added index lock files. Indexing and search are now not just thread

safe, but also "process safe": multiple processes may may now search
an index while it is being updated from another process.

Two lock files are used in an index.  One is "commit.lock".  This is
used to synchronize commits [IndexWriter.close()] with opens
[IndexReader.open()].  Since these actions are short-lived, attempts
to obtain this lock will block for up to ten seconds, which should be
plenty of time, before an exception is thrown.

The second lock file is "write.lock".  This is used to enforce the
restriction that only one process should be adding documents to an
index at a time.  This is created when an IndexWriter is constructed
and removed when it is closed.  If index writing is aborted then this
file must be manually removed.  Attempts to index from another process
will immediately throw an exception.

It should be impossible to corrupt an index through the Lucene API.
However if a Lucene process exits unexpectedly it can leave the index
locked.  The remedy is simply to, at a time when it is certain that no
processes are accessing the index, remove all lock files.


git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@149595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Doug Cutting 2001-09-27 16:27:02 +00:00
parent 9bd3152b06
commit 58bcd780a9
8 changed files with 247 additions and 32 deletions

View File

@ -58,6 +58,7 @@ import java.io.IOException;
import java.io.File; import java.io.File;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
/** IndexReader is an abstract class, providing an interface for accessing an /** IndexReader is an abstract class, providing an interface for accessing an
@ -89,8 +90,10 @@ abstract public class IndexReader {
} }
/** Returns an IndexReader reading the index in the given Directory. */ /** Returns an IndexReader reading the index in the given Directory. */
public static IndexReader open(Directory directory) throws IOException { public static IndexReader open(final Directory directory) throws IOException{
synchronized (directory) { synchronized (directory) { // in- & inter-process sync
return (IndexReader)new Lock.With(directory.makeLock("commit.lock")) {
public Object doBody() throws IOException {
SegmentInfos infos = new SegmentInfos(); SegmentInfos infos = new SegmentInfos();
infos.read(directory); infos.read(directory);
if (infos.size() == 1) // index is optimized if (infos.size() == 1) // index is optimized
@ -98,9 +101,11 @@ abstract public class IndexReader {
SegmentReader[] readers = new SegmentReader[infos.size()]; SegmentReader[] readers = new SegmentReader[infos.size()];
for (int i = 0; i < infos.size(); i++) for (int i = 0; i < infos.size(); i++)
readers[i] = new SegmentReader(infos.info(i), i == infos.size() - 1); readers[i] = new SegmentReader(infos.info(i), i==infos.size()-1);
return new SegmentsReader(readers); return new SegmentsReader(readers);
} }
}.run();
}
} }
/** Returns the time the index in the named directory was last modified. */ /** Returns the time the index in the named directory was last modified. */

View File

@ -62,6 +62,7 @@ import java.util.Vector;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.InputStream; import org.apache.lucene.store.InputStream;
import org.apache.lucene.store.OutputStream; import org.apache.lucene.store.OutputStream;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -112,16 +113,25 @@ public final class IndexWriter {
analyzed with <code>a</code>. If <code>create</code> is true, then a new, analyzed with <code>a</code>. If <code>create</code> is true, then a new,
empty index will be created in <code>d</code>, replacing the index already empty index will be created in <code>d</code>, replacing the index already
there, if any. */ there, if any. */
public IndexWriter(Directory d, Analyzer a, boolean create) public IndexWriter(Directory d, Analyzer a, final boolean create)
throws IOException { throws IOException {
directory = d; directory = d;
analyzer = a; analyzer = a;
synchronized (directory) { Lock writeLock = directory.makeLock("write.lock");
if (!writeLock.obtain()) // obtain write lock
throw new IOException("Index locked for write: " + writeLock);
synchronized (directory) { // in- & inter-process sync
new Lock.With(directory.makeLock("commit.lock")) {
public Object doBody() throws IOException {
if (create) if (create)
segmentInfos.write(directory); segmentInfos.write(directory);
else else
segmentInfos.read(directory); segmentInfos.read(directory);
return null;
}
}.run();
} }
} }
@ -130,6 +140,7 @@ public final class IndexWriter {
public final synchronized void close() throws IOException { public final synchronized void close() throws IOException {
flushRamSegments(); flushRamSegments();
ramDirectory.close(); ramDirectory.close();
directory.makeLock("write.lock").release(); // release write lock
directory.close(); directory.close();
} }
@ -286,7 +297,7 @@ public final class IndexWriter {
int mergedDocCount = 0; int mergedDocCount = 0;
if (infoStream != null) infoStream.print("merging segments"); if (infoStream != null) infoStream.print("merging segments");
SegmentMerger merger = new SegmentMerger(directory, mergedName); SegmentMerger merger = new SegmentMerger(directory, mergedName);
Vector segmentsToDelete = new Vector(); final Vector segmentsToDelete = new Vector();
for (int i = minSegment; i < segmentInfos.size(); i++) { for (int i = minSegment; i < segmentInfos.size(); i++) {
SegmentInfo si = segmentInfos.info(i); SegmentInfo si = segmentInfos.info(i);
if (infoStream != null) if (infoStream != null)
@ -307,9 +318,14 @@ public final class IndexWriter {
segmentInfos.addElement(new SegmentInfo(mergedName, mergedDocCount, segmentInfos.addElement(new SegmentInfo(mergedName, mergedDocCount,
directory)); directory));
synchronized (directory) { synchronized (directory) { // in- & inter-process sync
new Lock.With(directory.makeLock("commit.lock")) {
public Object doBody() throws IOException {
segmentInfos.write(directory); // commit before deleting segmentInfos.write(directory); // commit before deleting
deleteSegments(segmentsToDelete); // delete now-unused segments deleteSegments(segmentsToDelete); // delete now-unused segments
return null;
}
}.run();
} }
} }

View File

@ -61,6 +61,7 @@ import java.util.Vector;
import org.apache.lucene.util.BitVector; import org.apache.lucene.util.BitVector;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.InputStream; import org.apache.lucene.store.InputStream;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -116,9 +117,14 @@ final class SegmentReader extends IndexReader {
public final synchronized void close() throws IOException { public final synchronized void close() throws IOException {
if (deletedDocsDirty) { if (deletedDocsDirty) {
synchronized (directory) { synchronized (directory) { // in- & inter-process sync
new Lock.With(directory.makeLock("commit.lock")) {
public Object doBody() throws IOException {
deletedDocs.write(directory, segment + ".tmp"); deletedDocs.write(directory, segment + ".tmp");
directory.renameFile(segment + ".tmp", segment + ".del"); directory.renameFile(segment + ".tmp", segment + ".del");
return null;
}
}.run();
} }
deletedDocsDirty = false; deletedDocsDirty = false;
} }

View File

@ -108,6 +108,11 @@ abstract public class Directory {
abstract public InputStream openFile(String name) abstract public InputStream openFile(String name)
throws IOException, SecurityException; throws IOException, SecurityException;
/** Construct a {@link Lock}.
* @param name the name of the lock file
*/
abstract public Lock makeLock(String name);
/** Closes the store. */ /** Closes the store. */
abstract public void close() abstract public void close()
throws IOException, SecurityException; throws IOException, SecurityException;

View File

@ -206,6 +206,24 @@ final public class FSDirectory extends Directory {
return new FSInputStream(new File(directory, name)); return new FSInputStream(new File(directory, name));
} }
/** Construct a {@link Lock}.
* @param name the name of the lock file
*/
public final Lock makeLock(String name) {
final File lockFile = new File(directory, name);
return new Lock() {
public boolean obtain() throws IOException {
return lockFile.createNewFile();
}
public void release() {
lockFile.delete();
}
public String toString() {
return "Lock@" + lockFile;
}
};
}
/** Closes the store to future operations. */ /** Closes the store to future operations. */
public final synchronized void close() throws IOException { public final synchronized void close() throws IOException {
if (--refCount <= 0) { if (--refCount <= 0) {
@ -214,6 +232,11 @@ final public class FSDirectory extends Directory {
} }
} }
} }
/** For debug output. */
public String toString() {
return "FSDirectory@" + directory;
}
} }
@ -278,8 +301,6 @@ final class FSOutputStream extends OutputStream {
RandomAccessFile file = null; RandomAccessFile file = null;
public FSOutputStream(File path) throws IOException { public FSOutputStream(File path) throws IOException {
if (path.isFile())
throw new IOException(path + " already exists");
file = new RandomAccessFile(path, "rw"); file = new RandomAccessFile(path, "rw");
} }

View File

@ -0,0 +1,125 @@
package org.apache.lucene.store;
/* ====================================================================
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Apache" and "Apache Software Foundation" and
* "Apache Lucene" must not be used to endorse or promote products
* derived from this software without prior written permission. For
* written permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* "Apache Lucene", nor may "Apache" appear in their name, without
* prior written permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.io.IOException;
/** An interprocess mutex lock.
* <p>Typical use might look like:<pre>
* new Lock.With(directory.makeLock("my.lock")) {
* public Object doBody() {
* <it>... code to execute while locked ...</it>
* }
* }.run();
* </pre>
*
* @author Doug Cutting
* @see Directory#makeLock(String)
*/
public abstract class Lock {
/** Attempt to obtain exclusive access.
*
* @return true iff exclusive access is obtained
*/
public abstract boolean obtain() throws IOException;
/** Release exclusive access. */
public abstract void release();
/** Utility class for executing code with exclusive access. */
public abstract static class With {
private Lock lock;
private int sleepInterval = 1000;
private int maxSleeps = 10;
/** Constructs an executor that will grab the named lock. */
public With(Lock lock) {
this.lock = lock;
}
/** Code to execute with exclusive access. */
protected abstract Object doBody() throws IOException;
/** Calls {@link #doBody} while <it>lock</it> is obtained. Blocks if lock
* cannot be obtained immediately. Retries to obtain lock once per second
* until it is obtained, or until it has tried ten times. */
public Object run() throws IOException {
boolean locked = false;
try {
locked = lock.obtain();
int sleepCount = 0;
while (!locked) {
if (++sleepCount == maxSleeps) {
throw new IOException("Timed out waiting for: " + lock);
}
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
throw new IOException(e.toString());
}
locked = lock.obtain();
}
return doBody();
} finally {
if (locked)
lock.release();
}
}
}
}

View File

@ -123,6 +123,26 @@ final public class RAMDirectory extends Directory {
return new RAMInputStream(file); return new RAMInputStream(file);
} }
/** Construct a {@link Lock}.
* @param name the name of the lock file
*/
public final Lock makeLock(final String name) {
return new Lock() {
public boolean obtain() throws IOException {
synchronized (files) {
if (!fileExists(name)) {
createFile(name).close();
return true;
}
return false;
}
}
public void release() {
deleteFile(name);
}
};
}
/** Closes the store to future operations. */ /** Closes the store to future operations. */
public final void close() { public final void close() {
} }

View File

@ -69,6 +69,8 @@ class ThreadSafetyTest {
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static Searcher SEARCHER; private static Searcher SEARCHER;
private static int ITERATIONS = 1;
private static int random(int i) { // for JDK 1.1 compatibility private static int random(int i) { // for JDK 1.1 compatibility
int r = RANDOM.nextInt(); int r = RANDOM.nextInt();
if (r < 0) r = -r; if (r < 0) r = -r;
@ -85,7 +87,7 @@ class ThreadSafetyTest {
public void run() { public void run() {
try { try {
for (int i = 0; i < 1024*16; i++) { for (int i = 0; i < 1024*ITERATIONS; i++) {
Document d = new Document(); Document d = new Document();
int n = RANDOM.nextInt(); int n = RANDOM.nextInt();
d.add(Field.Keyword("id", Integer.toString(n))); d.add(Field.Keyword("id", Integer.toString(n)));
@ -98,6 +100,9 @@ class ThreadSafetyTest {
writer = new IndexWriter("index", ANALYZER, false); writer = new IndexWriter("index", ANALYZER, false);
} }
} }
writer.close();
} catch (Exception e) { } catch (Exception e) {
System.out.println(e.toString()); System.out.println(e.toString());
e.printStackTrace(); e.printStackTrace();
@ -117,7 +122,7 @@ class ThreadSafetyTest {
public void run() { public void run() {
try { try {
for (int i = 0; i < 1024*8; i++) { for (int i = 0; i < 512*ITERATIONS; i++) {
searchFor(RANDOM.nextInt(), (searcher==null)?SEARCHER:searcher); searchFor(RANDOM.nextInt(), (searcher==null)?SEARCHER:searcher);
if (i%reopenInterval == 0) { if (i%reopenInterval == 0) {
if (searcher == null) { if (searcher == null) {
@ -150,12 +155,24 @@ class ThreadSafetyTest {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
IndexWriter writer = new IndexWriter("index", ANALYZER, true); boolean readOnly = false;
boolean add = false;
for (int i = 0; i < args.length; i++) {
if ("-ro".equals(args[i]))
readOnly = true;
if ("-add".equals(args[i]))
add = true;
}
if (!readOnly) {
IndexWriter writer = new IndexWriter("index", ANALYZER, !add);
Thread indexerThread = new IndexerThread(writer); Thread indexerThread = new IndexerThread(writer);
indexerThread.start(); indexerThread.start();
Thread.sleep(1000); Thread.sleep(1000);
}
SearcherThread searcherThread1 = new SearcherThread(false); SearcherThread searcherThread1 = new SearcherThread(false);
searcherThread1.start(); searcherThread1.start();