LUCENE-2328: move tracking of which files are sync'd from IW/IR down to Directory; fixes slow memory leak

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@926653 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-03-23 16:40:01 +00:00
parent c285667cc1
commit 0b6a6bcbdf
11 changed files with 247 additions and 305 deletions

View File

@ -36,6 +36,10 @@ Changes in backwards compatibility policy
took IndexWriter as argument has been removed from all MergePolicy extensions.
(Shai Erera via Mike McCandless)
* LUCENE-2328: SimpleFSDirectory.SimpleFSIndexInput is moved to
FSDirectory.FSIndexInput. Anyone extending this class will have to
fix their code on upgrading. (Earwin Burrfoot via Mike McCandless)
Changes in runtime behavior
* LUCENE-1923: Made IndexReader.toString() produce something
@ -106,7 +110,13 @@ API Changes
* LUCENE-2320: IndexWriter's MergePolicy configuration was moved to
IndexWriterConfig and the respective methods on IndexWriter were deprecated.
(Shai Erera via Mike McCandless)
(Shai Erera via Mike McCandless)
* LUCENE-2328: Directory now keeps track itself of the files that are written
but not yet fsynced. The old Directory.sync(String file) method is deprecated
and replaced with Directory.sync(Collection<String> files). Take a look at
FSDirectory to see a sample of how such tracking might look like, if needed
in your custom Directories. (Earwin Burrfoot via Mike McCandless)
Bug fixes
@ -143,6 +153,10 @@ Bug fixes
has been obtained), and addIndexes* is run, do not pool the
readers from the external directory. This is harmless (NRT reader is
correct), but a waste of resources. (Mike McCandless)
* LUCENE-2328: Index files fsync tracking moved from
IndexWriter/IndexReader to Directory, and it no longer leaks memory.
(Earwin Burrfoot via Mike McCandless)
New features

View File

@ -47,7 +47,6 @@ class DirectoryReader extends IndexReader implements Cloneable {
IndexWriter writer;
private IndexDeletionPolicy deletionPolicy;
private final HashSet<String> synced = new HashSet<String>();
private Lock writeLock;
private SegmentInfos segmentInfos;
private SegmentInfos segmentInfosStart;
@ -87,12 +86,6 @@ class DirectoryReader extends IndexReader implements Cloneable {
this.deletionPolicy = deletionPolicy;
this.termInfosIndexDivisor = termInfosIndexDivisor;
if (!readOnly) {
// We assume that this segments_N was previously
// properly sync'd:
synced.addAll(sis.files(directory, true));
}
// To reduce the chance of hitting FileNotFound
// (and having to retry), we open segments in
// reverse because IndexWriter merges & deletes
@ -128,11 +121,6 @@ class DirectoryReader extends IndexReader implements Cloneable {
segmentInfos = infos;
segmentInfosStart = (SegmentInfos) infos.clone();
this.termInfosIndexDivisor = termInfosIndexDivisor;
if (!readOnly) {
// We assume that this segments_N was previously
// properly sync'd:
synced.addAll(infos.files(directory, true));
}
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
@ -183,11 +171,6 @@ class DirectoryReader extends IndexReader implements Cloneable {
this.readOnly = readOnly;
this.segmentInfos = infos;
this.termInfosIndexDivisor = termInfosIndexDivisor;
if (!readOnly) {
// We assume that this segments_N was previously
// properly sync'd:
synced.addAll(infos.files(directory, true));
}
// we put the old SegmentReaders in a map, that allows us
// to lookup a reader using its segment name
@ -786,14 +769,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
subReaders[i].commit();
// Sync all files we just wrote
final Collection<String> files = segmentInfos.files(directory, false);
for (final String fileName : files) {
if (!synced.contains(fileName)) {
assert directory.fileExists(fileName);
directory.sync(fileName);
synced.add(fileName);
}
}
directory.sync(segmentInfos.files(directory, false));
segmentInfos.commit(directory);
success = true;

View File

@ -1123,7 +1123,6 @@ public class IndexWriter implements Closeable {
// Only commit if there is no segments file in
// this dir already.
segmentInfos.commit(directory);
synced.addAll(segmentInfos.files(directory, true));
} else {
// Record that we have a change (zero out all
// segments) pending:
@ -1148,10 +1147,6 @@ public class IndexWriter implements Closeable {
if (infoStream != null)
message("init: loaded commit \"" + commit.getSegmentsFileName() + "\"");
}
// We assume that this segments_N was previously
// properly sync'd:
synced.addAll(segmentInfos.files(directory, true));
}
setRollbackSegmentInfos(segmentInfos);
@ -4614,60 +4609,6 @@ public class IndexWriter implements Closeable {
return buffer.toString();
}
// Files that have been sync'd already
private HashSet<String> synced = new HashSet<String>();
// Files that are now being sync'd
private HashSet<String> syncing = new HashSet<String>();
private boolean startSync(String fileName, Collection<String> pending) {
synchronized(synced) {
if (!synced.contains(fileName)) {
if (!syncing.contains(fileName)) {
syncing.add(fileName);
return true;
} else {
pending.add(fileName);
return false;
}
} else
return false;
}
}
private void finishSync(String fileName, boolean success) {
synchronized(synced) {
assert syncing.contains(fileName);
syncing.remove(fileName);
if (success)
synced.add(fileName);
synced.notifyAll();
}
}
/** Blocks until all files in syncing are sync'd */
private boolean waitForAllSynced(Collection<String> syncing) throws IOException {
synchronized(synced) {
Iterator<String> it = syncing.iterator();
while(it.hasNext()) {
final String fileName = it.next();
while(!synced.contains(fileName)) {
if (!syncing.contains(fileName))
// There was an error because a file that was
// previously syncing failed to appear in synced
return false;
else
try {
synced.wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
return true;
}
}
private synchronized void doWait() {
// NOTE: the callers of this method should in theory
// be able to do simply wait(), but, as a defense
@ -4761,40 +4702,7 @@ public class IndexWriter implements Closeable {
boolean setPending = false;
try {
// Loop until all files toSync references are sync'd:
while(true) {
final Collection<String> pending = new ArrayList<String>();
Iterator<String> it = toSync.files(directory, false).iterator();
while(it.hasNext()) {
final String fileName = it.next();
if (startSync(fileName, pending)) {
boolean success = false;
try {
// Because we incRef'd this commit point, above,
// the file had better exist:
assert directory.fileExists(fileName): "file '" + fileName + "' does not exist dir=" + directory;
if (infoStream != null)
message("now sync " + fileName);
directory.sync(fileName);
success = true;
} finally {
finishSync(fileName, success);
}
}
}
// All files that I require are either synced or being
// synced by other threads. If they are being synced,
// we must at this point block until they are done.
// If this returns false, that means an error in
// another thread resulted in failing to actually
// sync one of our files, so we repeat:
if (waitForAllSynced(pending))
break;
}
directory.sync(toSync.files(directory, false));
assert testPoint("midStartCommit2");

View File

@ -852,7 +852,7 @@ public final class SegmentInfos extends Vector<SegmentInfo> {
generation);
success = false;
try {
dir.sync(fileName);
dir.sync(Collections.singleton(fileName));
success = true;
} finally {
if (!success) {

View File

@ -19,6 +19,8 @@ package org.apache.lucene.store;
import java.io.IOException;
import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import org.apache.lucene.index.IndexFileNameFilter;
@ -78,11 +80,31 @@ public abstract class Directory implements Closeable {
public abstract IndexOutput createOutput(String name)
throws IOException;
/** Ensure that any writes to this file are moved to
* stable storage. Lucene uses this to properly commit
* changes to the index, to prevent a machine/OS crash
* from corrupting the index. */
public void sync(String name) throws IOException {}
/**
* Ensure that any writes to this file are moved to
* stable storage. Lucene uses this to properly commit
* changes to the index, to prevent a machine/OS crash
* from corrupting the index.
*/
@Deprecated
public void sync(String name) throws IOException { // TODO 4.0 kill me
}
/**
* Ensure that any writes to these files are moved to
* stable storage. Lucene uses this to properly commit
* changes to the index, to prevent a machine/OS crash
* from corrupting the index.<br/>
* <br/>
* NOTE: Clients may call this method for same files over
* and over again, so some impls might optimize for that.
* For other impls the operation can be a noop, for various
* reasons.
*/
public void sync(Collection<String> names) throws IOException { // TODO 4.0 make me abstract
for (String name : names)
sync(name);
}
/** Returns a stream reading an existing file. */
public abstract IndexInput openInput(String name)

View File

@ -24,6 +24,11 @@ import java.io.RandomAccessFile;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.Collections;
import static java.util.Collections.synchronizedSet;
import java.util.HashSet;
import java.util.Set;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.Constants;
@ -94,8 +99,7 @@ import org.apache.lucene.util.Constants;
* @see Directory
*/
public abstract class FSDirectory extends Directory {
private static MessageDigest DIGESTER;
private final static MessageDigest DIGESTER;
static {
try {
@ -105,36 +109,23 @@ public abstract class FSDirectory extends Directory {
}
}
/**
* Default read chunk size. This is a conditional default: on 32bit JVMs, it defaults to 100 MB. On 64bit JVMs, it's
* <code>Integer.MAX_VALUE</code>.
*
* @see #setReadChunkSize
*/
public static final int DEFAULT_READ_CHUNK_SIZE = Constants.JRE_IS_64BIT ? Integer.MAX_VALUE : 100 * 1024 * 1024;
protected final File directory; // The underlying filesystem directory
protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
// returns the canonical version of the directory, creating it if it doesn't exist.
private static File getCanonicalPath(File file) throws IOException {
return new File(file.getCanonicalPath());
}
private boolean checked;
final void createDir() throws IOException {
if (!checked) {
if (!directory.exists())
if (!directory.mkdirs())
throw new IOException("Cannot create directory: " + directory);
checked = true;
}
}
/** Initializes the directory to create a new file with the given name.
* This method should be used in {@link #createOutput}. */
protected final void initOutput(String name) throws IOException {
ensureOpen();
createDir();
File file = new File(directory, name);
if (file.exists() && !file.delete()) // delete existing, if any
throw new IOException("Cannot overwrite: " + file);
}
/** The underlying filesystem directory */
protected File directory = null;
/** Create a new FSDirectory for the named location (ctor for subclasses).
* @param path the path of the directory
* @param lockFactory the lock factory to use, or null for the default
@ -142,12 +133,11 @@ public abstract class FSDirectory extends Directory {
* @throws IOException
*/
protected FSDirectory(File path, LockFactory lockFactory) throws IOException {
path = getCanonicalPath(path);
// new ctors use always NativeFSLockFactory as default:
if (lockFactory == null) {
lockFactory = new NativeFSLockFactory();
}
directory = path;
directory = getCanonicalPath(path);
if (directory.exists() && !directory.isDirectory())
throw new NoSuchDirectoryException("file '" + directory + "' exists but is not a directory");
@ -161,9 +151,9 @@ public abstract class FSDirectory extends Directory {
final File dir = lf.getLockDir();
// if the lock factory has no lockDir set, use the this directory as lockDir
if (dir == null) {
lf.setLockDir(this.directory);
lf.setLockDir(directory);
lf.setLockPrefix(null);
} else if (dir.getCanonicalPath().equals(this.directory.getCanonicalPath())) {
} else if (dir.getCanonicalPath().equals(directory.getCanonicalPath())) {
lf.setLockPrefix(null);
}
}
@ -196,12 +186,6 @@ public abstract class FSDirectory extends Directory {
/** Just like {@link #open(File)}, but allows you to
* also specify a custom {@link LockFactory}. */
public static FSDirectory open(File path, LockFactory lockFactory) throws IOException {
/* For testing:
MMapDirectory dir=new MMapDirectory(path, lockFactory);
dir.setUseUnmap(true);
return dir;
*/
if (Constants.WINDOWS) {
return new SimpleFSDirectory(path, lockFactory);
} else {
@ -290,41 +274,48 @@ public abstract class FSDirectory extends Directory {
File file = new File(directory, name);
if (!file.delete())
throw new IOException("Cannot delete " + file);
staleFiles.remove(name);
}
/** Creates an IndexOutput for the file with the given name. */
@Override
public IndexOutput createOutput(String name) throws IOException {
ensureOpen();
ensureCanWrite(name);
return new FSIndexOutput(this, name);
}
protected void ensureCanWrite(String name) throws IOException {
if (!directory.exists())
if (!directory.mkdirs())
throw new IOException("Cannot create directory: " + directory);
File file = new File(directory, name);
if (file.exists() && !file.delete()) // delete existing, if any
throw new IOException("Cannot overwrite: " + file);
}
protected void onIndexOutputClosed(FSIndexOutput io) {
staleFiles.add(io.name);
}
@Deprecated
@Override
public void sync(String name) throws IOException {
sync(Collections.singleton(name));
}
@Override
public void sync(String name) throws IOException {
public void sync(Collection<String> names) throws IOException {
ensureOpen();
File fullFile = new File(directory, name);
boolean success = false;
int retryCount = 0;
IOException exc = null;
while(!success && retryCount < 5) {
retryCount++;
RandomAccessFile file = null;
try {
try {
file = new RandomAccessFile(fullFile, "rw");
file.getFD().sync();
success = true;
} finally {
if (file != null)
file.close();
}
} catch (IOException ioe) {
if (exc == null)
exc = ioe;
try {
// Pause 5 msec
Thread.sleep(5);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
if (!success)
// Throw original exception
throw exc;
Set<String> toSync = new HashSet<String>(names);
toSync.retainAll(staleFiles);
for (String name : toSync)
fsync(name);
staleFiles.removeAll(toSync);
}
// Inherit javadoc
@ -339,7 +330,6 @@ public abstract class FSDirectory extends Directory {
*/
private static final char[] HEX_DIGITS =
{'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
@Override
public String getLockID() {
@ -390,17 +380,6 @@ public abstract class FSDirectory extends Directory {
return this.getClass().getName() + "@" + directory;
}
/**
* Default read chunk size. This is a conditional
* default: on 32bit JVMs, it defaults to 100 MB. On
* 64bit JVMs, it's <code>Integer.MAX_VALUE</code>.
* @see #setReadChunkSize
*/
public static final int DEFAULT_READ_CHUNK_SIZE = Constants.JRE_IS_64BIT ? Integer.MAX_VALUE: 100 * 1024 * 1024;
// LUCENE-1566
private int chunkSize = DEFAULT_READ_CHUNK_SIZE;
/**
* Sets the maximum number of bytes read at once from the
* underlying file during {@link IndexInput#readBytes}.
@ -443,4 +422,96 @@ public abstract class FSDirectory extends Directory {
return chunkSize;
}
protected static class FSIndexOutput extends BufferedIndexOutput {
private final FSDirectory parent;
private final String name;
private final RandomAccessFile file;
private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
public FSIndexOutput(FSDirectory parent, String name) throws IOException {
this.parent = parent;
this.name = name;
file = new RandomAccessFile(new File(parent.directory, name), "rw");
isOpen = true;
}
/** output methods: */
@Override
public void flushBuffer(byte[] b, int offset, int size) throws IOException {
file.write(b, offset, size);
}
@Override
public void close() throws IOException {
// only close the file if it has not been closed yet
if (isOpen) {
boolean success = false;
try {
super.close();
success = true;
} finally {
isOpen = false;
if (!success) {
try {
file.close();
parent.onIndexOutputClosed(this);
} catch (Throwable t) {
// Suppress so we don't mask original exception
}
} else
file.close();
}
}
}
/** Random-access methods */
@Override
public void seek(long pos) throws IOException {
super.seek(pos);
file.seek(pos);
}
@Override
public long length() throws IOException {
return file.length();
}
@Override
public void setLength(long length) throws IOException {
file.setLength(length);
}
}
protected void fsync(String name) throws IOException {
File fullFile = new File(directory, name);
boolean success = false;
int retryCount = 0;
IOException exc = null;
while (!success && retryCount < 5) {
retryCount++;
RandomAccessFile file = null;
try {
try {
file = new RandomAccessFile(fullFile, "rw");
file.getFD().sync();
success = true;
} finally {
if (file != null)
file.close();
}
} catch (IOException ioe) {
if (exc == null)
exc = ioe;
try {
// Pause 5 msec
Thread.sleep(5);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
if (!success)
// Throw original exception
throw exc;
}
}

View File

@ -19,6 +19,10 @@ package org.apache.lucene.store;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
@ -128,9 +132,25 @@ public class FileSwitchDirectory extends Directory {
return getDirectory(name).createOutput(name);
}
@Deprecated
@Override
public void sync(String name) throws IOException {
getDirectory(name).sync(name);
sync(Collections.singleton(name));
}
@Override
public void sync(Collection<String> names) throws IOException {
List<String> primaryNames = new ArrayList<String>();
List<String> secondaryNames = new ArrayList<String>();
for (String name : names)
if (primaryExtensions.contains(getExtension(name)))
primaryNames.add(name);
else
secondaryNames.add(name);
primaryDir.sync(primaryNames);
secondaryDir.sync(secondaryNames);
}
@Override

View File

@ -70,6 +70,8 @@ import org.apache.lucene.util.Constants;
* can be enabled (with no guarantees).
*/
public class MMapDirectory extends FSDirectory {
private boolean useUnmapHack = false;
private int maxBBuf = Constants.JRE_IS_64BIT ? Integer.MAX_VALUE : (256 * 1024 * 1024);
/** Create a new MMapDirectory for the named location.
*
@ -91,9 +93,6 @@ public class MMapDirectory extends FSDirectory {
super(path, null);
}
private boolean useUnmapHack = false;
private int maxBBuf = Constants.JRE_IS_64BIT ? Integer.MAX_VALUE : (256*1024*1024);
/**
* <code>true</code>, if this platform supports unmapping mmapped files.
*/
@ -189,7 +188,22 @@ public class MMapDirectory extends FSDirectory {
*/
public int getMaxChunkSize() {
return maxBBuf;
}
}
/** Creates an IndexInput for the file with the given name. */
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
ensureOpen();
File f = new File(getDirectory(), name);
RandomAccessFile raf = new RandomAccessFile(f, "r");
try {
return (raf.length() <= maxBBuf)
? (IndexInput) new MMapIndexInput(raf)
: (IndexInput) new MultiMMapIndexInput(raf, maxBBuf);
} finally {
raf.close();
}
}
private class MMapIndexInput extends IndexInput {
@ -396,26 +410,4 @@ public class MMapDirectory extends FSDirectory {
}
}
}
/** Creates an IndexInput for the file with the given name. */
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
ensureOpen();
File f = new File(getDirectory(), name);
RandomAccessFile raf = new RandomAccessFile(f, "r");
try {
return (raf.length() <= maxBBuf)
? (IndexInput) new MMapIndexInput(raf)
: (IndexInput) new MultiMMapIndexInput(raf, maxBBuf);
} finally {
raf.close();
}
}
/** Creates an IndexOutput for the file with the given name. */
@Override
public IndexOutput createOutput(String name) throws IOException {
initOutput(name);
return new SimpleFSDirectory.SimpleFSIndexOutput(new File(directory, name));
}
}

View File

@ -67,13 +67,6 @@ public class NIOFSDirectory extends FSDirectory {
return new NIOFSIndexInput(new File(getDirectory(), name), bufferSize, getReadChunkSize());
}
/** Creates an IndexOutput for the file with the given name. */
@Override
public IndexOutput createOutput(String name) throws IOException {
initOutput(name);
return new SimpleFSDirectory.SimpleFSIndexOutput(new File(directory, name));
}
protected static class NIOFSIndexInput extends SimpleFSDirectory.SimpleFSIndexInput {
private ByteBuffer byteBuf; // wraps the buffer for NIO

View File

@ -49,13 +49,6 @@ public class SimpleFSDirectory extends FSDirectory {
super(path, null);
}
/** Creates an IndexOutput for the file with the given name. */
@Override
public IndexOutput createOutput(String name) throws IOException {
initOutput(name);
return new SimpleFSIndexOutput(new File(directory, name));
}
/** Creates an IndexInput for the file with the given name. */
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
@ -168,59 +161,4 @@ public class SimpleFSDirectory extends FSDirectory {
return file.getFD().valid();
}
}
protected static class SimpleFSIndexOutput extends BufferedIndexOutput {
RandomAccessFile file = null;
// remember if the file is open, so that we don't try to close it
// more than once
private volatile boolean isOpen;
public SimpleFSIndexOutput(File path) throws IOException {
file = new RandomAccessFile(path, "rw");
isOpen = true;
}
/** output methods: */
@Override
public void flushBuffer(byte[] b, int offset, int size) throws IOException {
file.write(b, offset, size);
}
@Override
public void close() throws IOException {
// only close the file if it has not been closed yet
if (isOpen) {
boolean success = false;
try {
super.close();
success = true;
} finally {
isOpen = false;
if (!success) {
try {
file.close();
} catch (Throwable t) {
// Suppress so we don't mask original exception
}
} else
file.close();
}
}
}
/** Random-access methods */
@Override
public void seek(long pos) throws IOException {
super.seek(pos);
file.seek(pos);
}
@Override
public long length() throws IOException {
return file.length();
}
@Override
public void setLength(long length) throws IOException {
file.setLength(length);
}
}
}

View File

@ -19,6 +19,8 @@ package org.apache.lucene.store;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Random;
import java.util.Map;
@ -76,13 +78,19 @@ public class MockRAMDirectory extends RAMDirectory {
preventDoubleWrite = value;
}
@Deprecated
@Override
public synchronized void sync(String name) throws IOException {
maybeThrowDeterministicException();
public void sync(String name) throws IOException {
sync(Collections.singleton(name));
}
@Override
public synchronized void sync(Collection<String> names) throws IOException {
for (String name : names)
maybeThrowDeterministicException();
if (crashed)
throw new IOException("cannot sync after crash");
if (unSyncedFiles.contains(name))
unSyncedFiles.remove(name);
unSyncedFiles.removeAll(names);
}
/** Simulates a crash of OS or machine by overwriting