fix some more nocommits; reduce retry-pending-deletes frequency so it's not O(N^2)

This commit is contained in:
Mike McCandless 2016-02-03 16:56:10 -05:00
parent f0b9186f55
commit 8cd731be50
4 changed files with 80 additions and 35 deletions

View File

@ -4617,8 +4617,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* commits are no longer needed. Otherwise, those commits will * commits are no longer needed. Otherwise, those commits will
* be deleted the next time commit() is called. * be deleted the next time commit() is called.
*/ */
// nocommit remove this
public synchronized void deleteUnusedFiles() throws IOException { public synchronized void deleteUnusedFiles() throws IOException {
// TODO: should we remove this method now that it's the Directory's job to retry deletions? Except, for the super expert IDP use case
// it's still needed?
ensureOpen(false); ensureOpen(false);
deleter.revisitPolicy(); deleter.revisitPolicy();
} }

View File

@ -38,6 +38,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
@ -129,7 +130,9 @@ public abstract class FSDirectory extends BaseDirectory {
/** Maps files that we are trying to delete (or we tried already but failed) /** Maps files that we are trying to delete (or we tried already but failed)
* before attempting to delete that key. */ * before attempting to delete that key. */
protected final Set<String> pendingDeletes = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>()); private final Set<String> pendingDeletes = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
private final AtomicInteger opsSinceLastDelete = new AtomicInteger();
/** Used to generate temp file names in {@link #createTempOutput}. */ /** Used to generate temp file names in {@link #createTempOutput}. */
private final AtomicLong nextTempFileCounter = new AtomicLong(); private final AtomicLong nextTempFileCounter = new AtomicLong();
@ -241,17 +244,23 @@ public abstract class FSDirectory extends BaseDirectory {
@Override @Override
public IndexOutput createOutput(String name, IOContext context) throws IOException { public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen(); ensureOpen();
// nocommit do we need to check pending deletes?
deletePendingFiles(); // If this file was pending delete, we are now bringing it back to life:
pendingDeletes.remove(name);
maybeDeletePendingFiles();
return new FSIndexOutput(name); return new FSIndexOutput(name);
} }
@Override @Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
ensureOpen(); ensureOpen();
maybeDeletePendingFiles();
while (true) { while (true) {
try { try {
String name = IndexFileNames.segmentFileName(prefix, suffix + "_" + Long.toString(nextTempFileCounter.getAndIncrement(), Character.MAX_RADIX), "tmp"); String name = IndexFileNames.segmentFileName(prefix, suffix + "_" + Long.toString(nextTempFileCounter.getAndIncrement(), Character.MAX_RADIX), "tmp");
if (pendingDeletes.contains(name)) {
continue;
}
return new FSIndexOutput(name, return new FSIndexOutput(name,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
} catch (FileAlreadyExistsException faee) { } catch (FileAlreadyExistsException faee) {
@ -261,7 +270,7 @@ public abstract class FSDirectory extends BaseDirectory {
} }
protected void ensureCanRead(String name) throws IOException { protected void ensureCanRead(String name) throws IOException {
deletePendingFiles(); maybeDeletePendingFiles();
if (pendingDeletes.contains(name)) { if (pendingDeletes.contains(name)) {
throw new NoSuchFileException("file \"" + name + "\" is pending delete and cannot be opened for read"); throw new NoSuchFileException("file \"" + name + "\" is pending delete and cannot be opened for read");
} }
@ -270,6 +279,7 @@ public abstract class FSDirectory extends BaseDirectory {
@Override @Override
public void sync(Collection<String> names) throws IOException { public void sync(Collection<String> names) throws IOException {
ensureOpen(); ensureOpen();
maybeDeletePendingFiles();
for (String name : names) { for (String name : names) {
fsync(name); fsync(name);
@ -279,6 +289,7 @@ public abstract class FSDirectory extends BaseDirectory {
@Override @Override
public void renameFile(String source, String dest) throws IOException { public void renameFile(String source, String dest) throws IOException {
ensureOpen(); ensureOpen();
maybeDeletePendingFiles();
Files.move(directory.resolve(source), directory.resolve(dest), StandardCopyOption.ATOMIC_MOVE); Files.move(directory.resolve(source), directory.resolve(dest), StandardCopyOption.ATOMIC_MOVE);
// TODO: should we move directory fsync to a separate 'syncMetadata' method? // TODO: should we move directory fsync to a separate 'syncMetadata' method?
// for example, to improve listCommits(), IndexFileDeleter could also call that after deleting segments_Ns // for example, to improve listCommits(), IndexFileDeleter could also call that after deleting segments_Ns
@ -288,7 +299,7 @@ public abstract class FSDirectory extends BaseDirectory {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
isOpen = false; isOpen = false;
deletePendingFiles(); maybeDeletePendingFiles();
} }
/** @return the underlying filesystem directory */ /** @return the underlying filesystem directory */
@ -303,12 +314,48 @@ public abstract class FSDirectory extends BaseDirectory {
} }
protected void fsync(String name) throws IOException { protected void fsync(String name) throws IOException {
deletePendingFiles();
IOUtils.fsync(directory.resolve(name), false); IOUtils.fsync(directory.resolve(name), false);
} }
@Override @Override
public void deleteFile(String name) throws IOException { public void deleteFile(String name) throws IOException {
if (pendingDeletes.contains(name)) {
throw new NoSuchFileException("file \"" + name + "\" is already pending delete");
}
privateDeleteFile(name);
}
/** Tries to delete any pending deleted files, and returns true if
* there are still files that could not be deleted. */
public boolean checkPendingDeletions() throws IOException {
deletePendingFiles();
return pendingDeletes.isEmpty() == false;
}
/** Try to delete any pending files that we had previously tried to delete but failed
* because we are on Windows and the files were still held open. */
public void deletePendingFiles() throws IOException {
// TODO: we could fix IndexInputs from FSDirectory subclasses to call this when they are closed?
// Clone the set since we mutate it in privateDeleteFile:
for(String name : new HashSet<>(pendingDeletes)) {
privateDeleteFile(name);
}
}
private void maybeDeletePendingFiles() throws IOException {
if (pendingDeletes.isEmpty() == false) {
// This is a silly heuristic to try to avoid O(N^2), where N = number of files pending deletion, behavior:
int count = opsSinceLastDelete.incrementAndGet();
if (count >= pendingDeletes.size()) {
opsSinceLastDelete.addAndGet(-count);
deletePendingFiles();
}
}
}
private void privateDeleteFile(String name) throws IOException {
try { try {
Files.delete(directory.resolve(name)); Files.delete(directory.resolve(name));
pendingDeletes.remove(name); pendingDeletes.remove(name);
@ -331,28 +378,6 @@ public abstract class FSDirectory extends BaseDirectory {
} }
} }
/** Tries to delete any pending deleted files, and returns true if
* there are still files that could not be deleted. */
public boolean checkPendingDeletions() throws IOException {
deletePendingFiles();
return pendingDeletes.isEmpty() == false;
}
/** Try to delete any pending files that we had previously tried to delete but failed
* because we are on Windows and the files were still held open. */
public void deletePendingFiles() throws IOException {
// nocommit do we need exponential backoff here for windows?
// TODO: we could fix IndexInputs from FSDirectory subclasses to call this when they are closed?
Set<String> toDelete = new HashSet<>(pendingDeletes);
// nocommit heroic exceptions here or not?
for(String name : toDelete) {
deleteFile(name);
}
}
final class FSIndexOutput extends OutputStreamIndexOutput { final class FSIndexOutput extends OutputStreamIndexOutput {
/** /**
* The maximum chunk size is 8192 bytes, because file channel mallocs * The maximum chunk size is 8192 bytes, because file channel mallocs

View File

@ -75,15 +75,18 @@ import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
@ -2703,26 +2706,43 @@ public class TestIndexWriter extends LuceneTestCase {
IOUtils.close(r, r2, w, dir); IOUtils.close(r, r2, w, dir);
} }
// nocommit turn test on once we have VirusCheckingFS
/*
public void testWithPendingDeletions() throws Exception { public void testWithPendingDeletions() throws Exception {
try (FSDirectory dir = FSDirectory.open(createTempDir())) { // irony: currently we don't emulate windows well enough to work on windows!
assumeFalse("windows is not supported", Constants.WINDOWS);
Path path = createTempDir();
// Use WindowsFS to prevent open files from being deleted:
FileSystem fs = new WindowsFS(path.getFileSystem()).getFileSystem(URI.create("file:///"));
Path root = new FilterPath(path, fs);
// MMapDirectory doesn't work because it closes its file handles after mapping!
try (FSDirectory dir = new SimpleFSDirectory(root)) {
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
IndexWriter w = new IndexWriter(dir, iwc); IndexWriter w = new IndexWriter(dir, iwc);
w.commit(); w.commit();
IndexInput in = dir.openInput("segments_0", IOContext.DEFAULT); IndexInput in = dir.openInput("segments_1", IOContext.DEFAULT);
w.addDocument(new Document()); w.addDocument(new Document());
w.close(); w.close();
assertTrue(dir.checkPendingDeletions()); assertTrue(dir.checkPendingDeletions());
// make sure we get NFSF if we try to delete and already-pending-delete file:
try {
dir.deleteFile("segments_1");
fail("didn't hit exception");
} catch (NoSuchFileException nfse) {
// expected
}
iwc = new IndexWriterConfig(new MockAnalyzer(random())); iwc = new IndexWriterConfig(new MockAnalyzer(random()));
try { try {
w = new IndexWriter(dir, iwc); w = new IndexWriter(dir, iwc);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
assertEquals("Directory still has pending deleted files", iae.getMessage()); assertEquals("Directory still has pending deleted files", iae.getMessage());
} }
in.close();
} }
} }
*/
public void testLeftoverTempFiles() throws Exception { public void testLeftoverTempFiles() throws Exception {
Directory dir = newDirectory(); Directory dir = newDirectory();

View File

@ -742,7 +742,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
System.out.println("\nNOTE: MockDirectoryWrapper: now run CheckIndex"); System.out.println("\nNOTE: MockDirectoryWrapper: now run CheckIndex");
} }
// nocommit: we should also confirm all prior segments_N are not corrupt?
TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true); TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true);
// TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles