diff --git a/src/java/org/apache/lucene/index/IndexReader.java b/src/java/org/apache/lucene/index/IndexReader.java index 6aeca66ad9c..0abdb2c1a36 100644 --- a/src/java/org/apache/lucene/index/IndexReader.java +++ b/src/java/org/apache/lucene/index/IndexReader.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.io.File; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.Lock; import org.apache.lucene.document.Document; /** IndexReader is an abstract class, providing an interface for accessing an @@ -89,17 +90,21 @@ abstract public class IndexReader { } /** Returns an IndexReader reading the index in the given Directory. */ - public static IndexReader open(Directory directory) throws IOException { - synchronized (directory) { - SegmentInfos infos = new SegmentInfos(); - infos.read(directory); - if (infos.size() == 1) // index is optimized - return new SegmentReader(infos.info(0), true); - - SegmentReader[] readers = new SegmentReader[infos.size()]; - for (int i = 0; i < infos.size(); i++) - readers[i] = new SegmentReader(infos.info(i), i == infos.size() - 1); - return new SegmentsReader(readers); + public static IndexReader open(final Directory directory) throws IOException{ + synchronized (directory) { // in- & inter-process sync + return (IndexReader)new Lock.With(directory.makeLock("commit.lock")) { + public Object doBody() throws IOException { + SegmentInfos infos = new SegmentInfos(); + infos.read(directory); + if (infos.size() == 1) // index is optimized + return new SegmentReader(infos.info(0), true); + + SegmentReader[] readers = new SegmentReader[infos.size()]; + for (int i = 0; i < infos.size(); i++) + readers[i] = new SegmentReader(infos.info(i), i==infos.size()-1); + return new SegmentsReader(readers); + } + }.run(); } } diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java index 5136d64abf8..fa1335c12cf 100644 --- a/src/java/org/apache/lucene/index/IndexWriter.java +++ b/src/java/org/apache/lucene/index/IndexWriter.java @@ -62,6 +62,7 @@ import java.util.Vector; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.Lock; import org.apache.lucene.store.InputStream; import org.apache.lucene.store.OutputStream; import org.apache.lucene.document.Document; @@ -112,16 +113,25 @@ public final class IndexWriter { analyzed with a. If create is true, then a new, empty index will be created in d, replacing the index already there, if any. */ - public IndexWriter(Directory d, Analyzer a, boolean create) + public IndexWriter(Directory d, Analyzer a, final boolean create) throws IOException { directory = d; analyzer = a; - synchronized (directory) { - if (create) - segmentInfos.write(directory); - else - segmentInfos.read(directory); + Lock writeLock = directory.makeLock("write.lock"); + if (!writeLock.obtain()) // obtain write lock + throw new IOException("Index locked for write: " + writeLock); + + synchronized (directory) { // in- & inter-process sync + new Lock.With(directory.makeLock("commit.lock")) { + public Object doBody() throws IOException { + if (create) + segmentInfos.write(directory); + else + segmentInfos.read(directory); + return null; + } + }.run(); } } @@ -130,6 +140,7 @@ public final class IndexWriter { public final synchronized void close() throws IOException { flushRamSegments(); ramDirectory.close(); + directory.makeLock("write.lock").release(); // release write lock directory.close(); } @@ -286,7 +297,7 @@ public final class IndexWriter { int mergedDocCount = 0; if (infoStream != null) infoStream.print("merging segments"); SegmentMerger merger = new SegmentMerger(directory, mergedName); - Vector segmentsToDelete = new Vector(); + final Vector segmentsToDelete = new Vector(); for (int i = minSegment; i < segmentInfos.size(); i++) { SegmentInfo si = segmentInfos.info(i); if (infoStream != null) @@ -307,9 +318,14 @@ public final class IndexWriter { segmentInfos.addElement(new SegmentInfo(mergedName, mergedDocCount, directory)); - synchronized (directory) { - segmentInfos.write(directory); // commit before deleting - deleteSegments(segmentsToDelete); // delete now-unused segments + synchronized (directory) { // in- & inter-process sync + new Lock.With(directory.makeLock("commit.lock")) { + public Object doBody() throws IOException { + segmentInfos.write(directory); // commit before deleting + deleteSegments(segmentsToDelete); // delete now-unused segments + return null; + } + }.run(); } } diff --git a/src/java/org/apache/lucene/index/SegmentReader.java b/src/java/org/apache/lucene/index/SegmentReader.java index 67292205c7a..ac054f7d8f4 100644 --- a/src/java/org/apache/lucene/index/SegmentReader.java +++ b/src/java/org/apache/lucene/index/SegmentReader.java @@ -61,6 +61,7 @@ import java.util.Vector; import org.apache.lucene.util.BitVector; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.Lock; import org.apache.lucene.store.InputStream; import org.apache.lucene.document.Document; @@ -116,9 +117,14 @@ final class SegmentReader extends IndexReader { public final synchronized void close() throws IOException { if (deletedDocsDirty) { - synchronized (directory) { - deletedDocs.write(directory, segment + ".tmp"); - directory.renameFile(segment + ".tmp", segment + ".del"); + synchronized (directory) { // in- & inter-process sync + new Lock.With(directory.makeLock("commit.lock")) { + public Object doBody() throws IOException { + deletedDocs.write(directory, segment + ".tmp"); + directory.renameFile(segment + ".tmp", segment + ".del"); + return null; + } + }.run(); } deletedDocsDirty = false; } diff --git a/src/java/org/apache/lucene/store/Directory.java b/src/java/org/apache/lucene/store/Directory.java index 2660c03b5e4..2aaacbfa83d 100644 --- a/src/java/org/apache/lucene/store/Directory.java +++ b/src/java/org/apache/lucene/store/Directory.java @@ -108,6 +108,11 @@ abstract public class Directory { abstract public InputStream openFile(String name) throws IOException, SecurityException; + /** Construct a {@link Lock}. + * @param name the name of the lock file + */ + abstract public Lock makeLock(String name); + /** Closes the store. */ abstract public void close() throws IOException, SecurityException; diff --git a/src/java/org/apache/lucene/store/FSDirectory.java b/src/java/org/apache/lucene/store/FSDirectory.java index e158501c465..6587e4f6019 100644 --- a/src/java/org/apache/lucene/store/FSDirectory.java +++ b/src/java/org/apache/lucene/store/FSDirectory.java @@ -206,6 +206,24 @@ final public class FSDirectory extends Directory { return new FSInputStream(new File(directory, name)); } + /** Construct a {@link Lock}. + * @param name the name of the lock file + */ + public final Lock makeLock(String name) { + final File lockFile = new File(directory, name); + return new Lock() { + public boolean obtain() throws IOException { + return lockFile.createNewFile(); + } + public void release() { + lockFile.delete(); + } + public String toString() { + return "Lock@" + lockFile; + } + }; + } + /** Closes the store to future operations. */ public final synchronized void close() throws IOException { if (--refCount <= 0) { @@ -214,6 +232,11 @@ final public class FSDirectory extends Directory { } } } + + /** For debug output. */ + public String toString() { + return "FSDirectory@" + directory; + } } @@ -278,8 +301,6 @@ final class FSOutputStream extends OutputStream { RandomAccessFile file = null; public FSOutputStream(File path) throws IOException { - if (path.isFile()) - throw new IOException(path + " already exists"); file = new RandomAccessFile(path, "rw"); } diff --git a/src/java/org/apache/lucene/store/Lock.java b/src/java/org/apache/lucene/store/Lock.java new file mode 100644 index 00000000000..46d8a03c0c6 --- /dev/null +++ b/src/java/org/apache/lucene/store/Lock.java @@ -0,0 +1,125 @@ +package org.apache.lucene.store; + +/* ==================================================================== + * The Apache Software License, Version 1.1 + * + * Copyright (c) 2001 The Apache Software Foundation. All rights + * reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * + * 3. The end-user documentation included with the redistribution, + * if any, must include the following acknowledgment: + * "This product includes software developed by the + * Apache Software Foundation (http://www.apache.org/)." + * Alternately, this acknowledgment may appear in the software itself, + * if and wherever such third-party acknowledgments normally appear. + * + * 4. The names "Apache" and "Apache Software Foundation" and + * "Apache Lucene" must not be used to endorse or promote products + * derived from this software without prior written permission. For + * written permission, please contact apache@apache.org. + * + * 5. Products derived from this software may not be called "Apache", + * "Apache Lucene", nor may "Apache" appear in their name, without + * prior written permission of the Apache Software Foundation. + * + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR + * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + */ + +import java.io.IOException; + +/** An interprocess mutex lock. + *

Typical use might look like:

+ * new Lock.With(directory.makeLock("my.lock")) {
+ *     public Object doBody() {
+ *       ... code to execute while locked ...
+ *     }
+ *   }.run();
+ * 
+ * + * @author Doug Cutting + * @see Directory#makeLock(String) +*/ + +public abstract class Lock { + /** Attempt to obtain exclusive access. + * + * @return true iff exclusive access is obtained + */ + public abstract boolean obtain() throws IOException; + + /** Release exclusive access. */ + public abstract void release(); + + /** Utility class for executing code with exclusive access. */ + public abstract static class With { + private Lock lock; + private int sleepInterval = 1000; + private int maxSleeps = 10; + + /** Constructs an executor that will grab the named lock. */ + public With(Lock lock) { + this.lock = lock; + } + + /** Code to execute with exclusive access. */ + protected abstract Object doBody() throws IOException; + + /** Calls {@link #doBody} while lock is obtained. Blocks if lock + * cannot be obtained immediately. Retries to obtain lock once per second + * until it is obtained, or until it has tried ten times. */ + public Object run() throws IOException { + boolean locked = false; + try { + locked = lock.obtain(); + int sleepCount = 0; + while (!locked) { + if (++sleepCount == maxSleeps) { + throw new IOException("Timed out waiting for: " + lock); + } + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + throw new IOException(e.toString()); + } + locked = lock.obtain(); + } + + return doBody(); + + } finally { + if (locked) + lock.release(); + } + } + } + +} diff --git a/src/java/org/apache/lucene/store/RAMDirectory.java b/src/java/org/apache/lucene/store/RAMDirectory.java index c67e5d5b5ef..93dbc931012 100644 --- a/src/java/org/apache/lucene/store/RAMDirectory.java +++ b/src/java/org/apache/lucene/store/RAMDirectory.java @@ -123,6 +123,26 @@ final public class RAMDirectory extends Directory { return new RAMInputStream(file); } + /** Construct a {@link Lock}. + * @param name the name of the lock file + */ + public final Lock makeLock(final String name) { + return new Lock() { + public boolean obtain() throws IOException { + synchronized (files) { + if (!fileExists(name)) { + createFile(name).close(); + return true; + } + return false; + } + } + public void release() { + deleteFile(name); + } + }; + } + /** Closes the store to future operations. */ public final void close() { } diff --git a/src/test/org/apache/lucene/ThreadSafetyTest.java b/src/test/org/apache/lucene/ThreadSafetyTest.java index 7b01ad54b21..3770829804c 100644 --- a/src/test/org/apache/lucene/ThreadSafetyTest.java +++ b/src/test/org/apache/lucene/ThreadSafetyTest.java @@ -69,6 +69,8 @@ class ThreadSafetyTest { private static final Random RANDOM = new Random(); private static Searcher SEARCHER; + private static int ITERATIONS = 1; + private static int random(int i) { // for JDK 1.1 compatibility int r = RANDOM.nextInt(); if (r < 0) r = -r; @@ -85,7 +87,7 @@ class ThreadSafetyTest { public void run() { try { - for (int i = 0; i < 1024*16; i++) { + for (int i = 0; i < 1024*ITERATIONS; i++) { Document d = new Document(); int n = RANDOM.nextInt(); d.add(Field.Keyword("id", Integer.toString(n))); @@ -98,6 +100,9 @@ class ThreadSafetyTest { writer = new IndexWriter("index", ANALYZER, false); } } + + writer.close(); + } catch (Exception e) { System.out.println(e.toString()); e.printStackTrace(); @@ -117,7 +122,7 @@ class ThreadSafetyTest { public void run() { try { - for (int i = 0; i < 1024*8; i++) { + for (int i = 0; i < 512*ITERATIONS; i++) { searchFor(RANDOM.nextInt(), (searcher==null)?SEARCHER:searcher); if (i%reopenInterval == 0) { if (searcher == null) { @@ -150,12 +155,24 @@ class ThreadSafetyTest { public static void main(String[] args) throws Exception { - IndexWriter writer = new IndexWriter("index", ANALYZER, true); + boolean readOnly = false; + boolean add = false; - Thread indexerThread = new IndexerThread(writer); - indexerThread.start(); + for (int i = 0; i < args.length; i++) { + if ("-ro".equals(args[i])) + readOnly = true; + if ("-add".equals(args[i])) + add = true; + } - Thread.sleep(1000); + if (!readOnly) { + IndexWriter writer = new IndexWriter("index", ANALYZER, !add); + + Thread indexerThread = new IndexerThread(writer); + indexerThread.start(); + + Thread.sleep(1000); + } SearcherThread searcherThread1 = new SearcherThread(false); searcherThread1.start();