mirror of https://github.com/apache/lucene.git
cutover to 'Directory owns retrying deletes on buggy filesystems, cleaning up a lot of crazy retry logic
This commit is contained in:
parent
9ba62e5e38
commit
f0f42780a9
|
@ -186,7 +186,7 @@ public abstract class CopyJob implements Comparable<CopyJob> {
|
|||
/** Use current thread (blocking) to do all copying and then return once done, or throw exception on failure */
|
||||
public abstract void runBlocking() throws Exception;
|
||||
|
||||
public void cancel(String reason, Throwable exc) {
|
||||
public void cancel(String reason, Throwable exc) throws IOException {
|
||||
if (this.exc != null) {
|
||||
// Already cancelled
|
||||
return;
|
||||
|
|
|
@ -49,8 +49,7 @@ import org.apache.lucene.util.StringHelper;
|
|||
abstract class Node implements Closeable {
|
||||
|
||||
static boolean VERBOSE_FILES = true;
|
||||
// nocommit
|
||||
static boolean VERBOSE_CONNECTIONS = true;
|
||||
static boolean VERBOSE_CONNECTIONS = false;
|
||||
|
||||
// Keys we store into IndexWriter's commit user data:
|
||||
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.lucene.store.IOContext;
|
|||
|
||||
class ReplicaFileDeleter {
|
||||
private final Map<String,Integer> refCounts = new HashMap<String,Integer>();
|
||||
private final Set<String> pending = new HashSet<String>();
|
||||
private final Directory dir;
|
||||
private final Node node;
|
||||
|
||||
|
@ -63,10 +62,6 @@ class ReplicaFileDeleter {
|
|||
public synchronized void incRef(Collection<String> fileNames) throws IOException {
|
||||
for(String fileName : fileNames) {
|
||||
|
||||
if (pending.contains(fileName)) {
|
||||
throw new IllegalStateException("cannot incRef file \"" + fileName + "\": it is pending delete");
|
||||
}
|
||||
|
||||
assert slowFileExists(dir, fileName): "file " + fileName + " does not exist!";
|
||||
|
||||
Integer curCount = refCounts.get(fileName);
|
||||
|
@ -78,24 +73,23 @@ class ReplicaFileDeleter {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized void decRef(Collection<String> fileNames) {
|
||||
// We don't delete the files immediately when their RC drops to 0; instead, we add to the pending set, and then call deletePending in
|
||||
// the end:
|
||||
public synchronized void decRef(Collection<String> fileNames) throws IOException {
|
||||
Set<String> toDelete = new HashSet<>();
|
||||
for(String fileName : fileNames) {
|
||||
Integer curCount = refCounts.get(fileName);
|
||||
assert curCount != null: "fileName=" + fileName;
|
||||
assert curCount.intValue() > 0;
|
||||
if (curCount.intValue() == 1) {
|
||||
refCounts.remove(fileName);
|
||||
pending.add(fileName);
|
||||
toDelete.add(fileName);
|
||||
} else {
|
||||
refCounts.put(fileName, curCount.intValue() - 1);
|
||||
}
|
||||
}
|
||||
|
||||
deletePending();
|
||||
delete(toDelete);
|
||||
|
||||
// TODO: this local IR could incRef files here, like we do now with IW ... then we can assert this again:
|
||||
// TODO: this local IR could incRef files here, like we do now with IW's NRT readers ... then we can assert this again:
|
||||
|
||||
// we can't assert this, e.g a search can be running when we switch to a new NRT point, holding a previous IndexReader still open for
|
||||
// a bit:
|
||||
|
@ -109,111 +103,69 @@ class ReplicaFileDeleter {
|
|||
*/
|
||||
}
|
||||
|
||||
private synchronized boolean delete(String fileName) {
|
||||
try {
|
||||
if (Node.VERBOSE_FILES) {
|
||||
node.message("file " + fileName + ": now delete");
|
||||
}
|
||||
dir.deleteFile(fileName);
|
||||
pending.remove(fileName);
|
||||
return true;
|
||||
} catch (FileNotFoundException|NoSuchFileException missing) {
|
||||
// This should never happen: we should only be asked to track files that do exist
|
||||
node.message("file " + fileName + ": delete failed: " + missing);
|
||||
throw new IllegalStateException("file " + fileName + ": we attempted delete but the file does not exist?", missing);
|
||||
} catch (IOException ioe) {
|
||||
// nocommit remove this retry logic! it's Directory's job now...
|
||||
if (Node.VERBOSE_FILES) {
|
||||
node.message("file " + fileName + ": delete failed: " + ioe + "; will retry later");
|
||||
}
|
||||
pending.add(fileName);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Integer getRefCount(String fileName) {
|
||||
return refCounts.get(fileName);
|
||||
}
|
||||
|
||||
public synchronized boolean isPending(String fileName) {
|
||||
return pending.contains(fileName);
|
||||
}
|
||||
|
||||
public synchronized void deletePending() {
|
||||
private synchronized void delete(Collection<String> toDelete) throws IOException {
|
||||
if (Node.VERBOSE_FILES) {
|
||||
node.message("now deletePending: " + pending.size() + " files to try: " + pending);
|
||||
node.message("now delete " + toDelete.size() + " files: " + toDelete);
|
||||
}
|
||||
|
||||
// Clone the set because it will change as we iterate:
|
||||
List<String> toDelete = new ArrayList<>(pending);
|
||||
|
||||
// First pass: delete any segments_N files. We do these first to be certain stale commit points are removed
|
||||
// before we remove any files they reference. If any delete of segments_N fails, we leave all other files
|
||||
// undeleted so index is never in a corrupt state:
|
||||
// before we remove any files they reference, in case we crash right now:
|
||||
for (String fileName : toDelete) {
|
||||
Integer rc = refCounts.get(fileName);
|
||||
if (rc != null && rc > 0) {
|
||||
// Should never happen! This means we are about to pending-delete a referenced index file
|
||||
throw new IllegalStateException("file \"" + fileName + "\" is in pending delete set but has non-zero refCount=" + rc);
|
||||
} else if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
|
||||
if (delete(fileName) == false) {
|
||||
if (Node.VERBOSE_FILES) {
|
||||
node.message("failed to remove commit point \"" + fileName + "\"; skipping deletion of all other pending files");
|
||||
}
|
||||
return;
|
||||
}
|
||||
assert refCounts.containsKey(fileName) == false;
|
||||
if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
|
||||
delete(fileName);
|
||||
}
|
||||
}
|
||||
|
||||
// Only delete other files if we were able to remove the segments_N files; this way we never
|
||||
// leave a corrupt commit in the index even in the presense of virus checkers:
|
||||
for(String fileName : toDelete) {
|
||||
assert refCounts.containsKey(fileName) == false;
|
||||
if (fileName.startsWith(IndexFileNames.SEGMENTS) == false) {
|
||||
delete(fileName);
|
||||
}
|
||||
}
|
||||
|
||||
Set<String> copy = new HashSet<String>(pending);
|
||||
pending.clear();
|
||||
for(String fileName : copy) {
|
||||
delete(fileName);
|
||||
}
|
||||
}
|
||||
|
||||
/** Necessary in case we had tried to delete this fileName before, it failed, but then it was later overwritten (because primary changed
|
||||
* and new primary didn't know this segment name had been previously attempted) and now has > 0 refCount */
|
||||
public synchronized void clearPending(Collection<String> fileNames) {
|
||||
for(String fileName : fileNames) {
|
||||
if (pending.remove(fileName)) {
|
||||
node.message("file " + fileName + ": deleter.clearPending now clear from pending");
|
||||
}
|
||||
private synchronized void delete(String fileName) throws IOException {
|
||||
if (Node.VERBOSE_FILES) {
|
||||
node.message("file " + fileName + ": now delete");
|
||||
}
|
||||
dir.deleteFile(fileName);
|
||||
}
|
||||
|
||||
public synchronized void deleteIfNoRef(String fileName) {
|
||||
public synchronized Integer getRefCount(String fileName) {
|
||||
return refCounts.get(fileName);
|
||||
}
|
||||
|
||||
public synchronized void deleteIfNoRef(String fileName) throws IOException {
|
||||
if (refCounts.containsKey(fileName) == false) {
|
||||
deleteNewFile(fileName);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void deleteNewFile(String fileName) {
|
||||
public synchronized void deleteNewFile(String fileName) throws IOException {
|
||||
delete(fileName);
|
||||
}
|
||||
|
||||
/*
|
||||
public synchronized Set<String> getPending() {
|
||||
return new HashSet<String>(pending);
|
||||
}
|
||||
*/
|
||||
|
||||
public synchronized void deleteUnknownFiles(String segmentsFileName) throws IOException {
|
||||
Set<String> toDelete = new HashSet<>();
|
||||
for(String fileName : dir.listAll()) {
|
||||
if (refCounts.containsKey(fileName) == false &&
|
||||
fileName.equals("write.lock") == false &&
|
||||
fileName.equals(segmentsFileName) == false) {
|
||||
node.message("will delete unknown file \"" + fileName + "\"");
|
||||
pending.add(fileName);
|
||||
toDelete.add(fileName);
|
||||
}
|
||||
}
|
||||
|
||||
deletePending();
|
||||
delete(toDelete);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -195,10 +195,13 @@ abstract class ReplicaNode extends Node {
|
|||
// If this throws exc (e.g. due to virus checker), we cannot start this replica:
|
||||
assert deleter.getRefCount(segmentsFileName) == 1;
|
||||
deleter.decRef(Collections.singleton(segmentsFileName));
|
||||
if (deleter.isPending(segmentsFileName)) {
|
||||
// If e.g. virus checker blocks us from deleting, we absolutely cannot start this node else we can cause corruption:
|
||||
|
||||
if (dir instanceof FSDirectory && ((FSDirectory) dir).checkPendingDeletions()) {
|
||||
// If e.g. virus checker blocks us from deleting, we absolutely cannot start this node else there is a definite window during
|
||||
// which if we carsh, we cause corruption:
|
||||
throw new RuntimeException("replica cannot start: existing segments file=" + segmentsFileName + " must be removed in order to start, but the file delete failed");
|
||||
}
|
||||
|
||||
// So we don't later try to decRef it (illegally) again:
|
||||
boolean didRemove = lastCommitFiles.remove(segmentsFileName);
|
||||
assert didRemove;
|
||||
|
@ -427,9 +430,6 @@ abstract class ReplicaNode extends Node {
|
|||
}
|
||||
|
||||
lastFileMetaData = copyState.files;
|
||||
|
||||
// It's a good time to delete pending files, since we just refreshed and some previously open files are now closed:
|
||||
deleter.deletePending();
|
||||
}
|
||||
|
||||
int markerCount;
|
||||
|
@ -720,17 +720,6 @@ abstract class ReplicaNode extends Node {
|
|||
* (inclues the segment id), length, footer (including checksum) differ, then this returns false, else true. */
|
||||
private boolean fileIsIdentical(String fileName, FileMetaData srcMetaData) throws IOException {
|
||||
|
||||
if (deleter.isPending(fileName)) {
|
||||
// This was a file we had wanted to delete yet a virus checker prevented us, and now we need to overwrite it.
|
||||
// Such files are in an unknown state, and even if their header and footer and length all
|
||||
// match, since they may not have been fsync'd by the previous node instance on this directory,
|
||||
// they could in theory have corruption internally. So we always force ourselves to copy them here:
|
||||
if (Node.VERBOSE_FILES) {
|
||||
message("file " + fileName + ": will copy [we had wanted to delete this file on init, but failed]");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
FileMetaData destMetaData = readLocalFileMetaData(fileName);
|
||||
if (destMetaData == null) {
|
||||
// Something went wrong in reading the file (it's corrupt, truncated, does not exist, etc.):
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.lucene.replicator.nrt;
|
|||
*/
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.PriorityQueue;
|
||||
|
@ -131,7 +132,7 @@ class Jobs extends Thread implements Closeable {
|
|||
}
|
||||
|
||||
/** Cancels any existing jobs that are copying the same file names as this one */
|
||||
public synchronized void cancelConflictingJobs(CopyJob newJob) {
|
||||
public synchronized void cancelConflictingJobs(CopyJob newJob) throws IOException {
|
||||
for (CopyJob job : queue) {
|
||||
if (job.conflicts(newJob)) {
|
||||
node.message("top: now cancel existing conflicting job=" + job + " due to newJob=" + newJob);
|
||||
|
|
|
@ -173,13 +173,6 @@ class SimpleCopyJob extends CopyJob {
|
|||
String tmpFileName = ent.getValue();
|
||||
String fileName = ent.getKey();
|
||||
|
||||
// Tricky: if primary crashes while warming (pre-copying) a merged segment _X, the new primary can easily flush or merge to _X (since we don't
|
||||
// have a distributed inflateGens for the new primary) and _X file names will be reused. In this case, our local deleter will be
|
||||
// thinking it must remove _X's files (from the warmed merge that never went live), but this is dangerous when virus checker is active
|
||||
// since deleter may finally succeed in deleting the file after we have copied the new _X flushed files. So at this point was ask the
|
||||
// deleter to NOT delete the file anymore:
|
||||
dest.deleter.clearPending(Collections.singleton(fileName));
|
||||
|
||||
if (Node.VERBOSE_FILES) {
|
||||
dest.message("rename file " + tmpFileName + " to " + fileName);
|
||||
}
|
||||
|
@ -241,7 +234,7 @@ class SimpleCopyJob extends CopyJob {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized void cancel(String reason, Throwable exc) {
|
||||
public synchronized void cancel(String reason, Throwable exc) throws IOException {
|
||||
try {
|
||||
super.cancel(reason, exc);
|
||||
} finally {
|
||||
|
|
|
@ -135,9 +135,7 @@ class SimpleReplicaNode extends ReplicaNode {
|
|||
MockDirectoryWrapper dir = LuceneTestCase.newMockFSDirectory(path);
|
||||
|
||||
dir.setAssertNoUnrefencedFilesOnClose(true);
|
||||
// nocommit
|
||||
//dir.setCheckIndexOnClose(doCheckIndexOnClose);
|
||||
dir.setCheckIndexOnClose(true);
|
||||
dir.setCheckIndexOnClose(doCheckIndexOnClose);
|
||||
|
||||
// Corrupt any index files not referenced by current commit point; this is important (increases test evilness) because we may have done
|
||||
// a hard crash of the previous JVM writing to this directory and so MDW's corrupt-unknown-files-on-close never ran:
|
||||
|
|
|
@ -1 +1,3 @@
|
|||
python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 1 -mult 4 -nightly
|
||||
python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 3
|
||||
|
||||
# -mult 4 -nightly
|
||||
|
|
Loading…
Reference in New Issue