mirror of https://github.com/apache/lucene.git
LUCENE-5925: use rename instead of segments_N fallback/segments.gen
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1624194 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e631c5ed41
commit
bb0256f260
|
@ -114,6 +114,10 @@ New Features
|
|||
when things go wrong, you get a real exception message why.
|
||||
(Uwe Schindler, Robert Muir)
|
||||
|
||||
* LUCENE-5925: Remove fallback logic from opening commits, instead use
|
||||
Directory.renameFile so that in-progress commits are never visible.
|
||||
(Robert Muir)
|
||||
|
||||
API Changes:
|
||||
|
||||
* LUCENE-5900: Deprecated more constructors taking Version in *InfixSuggester and
|
||||
|
|
|
@ -208,8 +208,8 @@ for the Segment info file, the Lock file, and Deleted documents file) are collap
|
|||
into a single .cfs file (see below for details)</p>
|
||||
<p>Typically, all segments in an index are stored in a single directory,
|
||||
although this is not required.</p>
|
||||
<p>As of version 2.1 (lock-less commits), file names are never re-used (there
|
||||
is one exception, "segments.gen", see below). That is, when any file is saved
|
||||
<p>As of version 2.1 (lock-less commits), file names are never re-used.
|
||||
That is, when any file is saved
|
||||
to the Directory it is given a never before used filename. This is achieved
|
||||
using a simple generations approach. For example, the first segments file is
|
||||
segments_1, then segments_2, etc. The generation is a sequential long integer
|
||||
|
@ -228,7 +228,7 @@ Lucene:</p>
|
|||
</tr>
|
||||
<tr>
|
||||
<td>{@link org.apache.lucene.index.SegmentInfos Segments File}</td>
|
||||
<td>segments.gen, segments_N</td>
|
||||
<td>segments_N</td>
|
||||
<td>Stores information about a commit point</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
|
@ -237,7 +237,7 @@ public abstract class DirectoryReader extends BaseCompositeReader<AtomicReader>
|
|||
final String fileName = files[i];
|
||||
|
||||
if (fileName.startsWith(IndexFileNames.SEGMENTS) &&
|
||||
!fileName.equals(IndexFileNames.SEGMENTS_GEN) &&
|
||||
!fileName.equals(IndexFileNames.OLD_SEGMENTS_GEN) &&
|
||||
SegmentInfos.generationFromSegmentsFileName(fileName) < currentGen) {
|
||||
|
||||
SegmentInfos sis = new SegmentInfos();
|
||||
|
@ -301,7 +301,7 @@ public abstract class DirectoryReader extends BaseCompositeReader<AtomicReader>
|
|||
if (files != null) {
|
||||
String prefix = IndexFileNames.SEGMENTS + "_";
|
||||
for(String file : files) {
|
||||
if (file.startsWith(prefix) || file.equals(IndexFileNames.SEGMENTS_GEN)) {
|
||||
if (file.startsWith(prefix)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -156,13 +156,12 @@ final class IndexFileDeleter implements Closeable {
|
|||
Matcher m = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
|
||||
for (String fileName : files) {
|
||||
m.reset(fileName);
|
||||
if (!fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)
|
||||
&& (m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS))) {
|
||||
if (!fileName.endsWith("write.lock") && (m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS) || fileName.startsWith(IndexFileNames.PENDING_SEGMENTS))) {
|
||||
|
||||
// Add this file to refCounts with initial count 0:
|
||||
getRefCount(fileName);
|
||||
|
||||
if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
|
||||
if (fileName.startsWith(IndexFileNames.SEGMENTS) && !fileName.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
|
||||
|
||||
// This is a commit (segments or segments_N), and
|
||||
// it's valid (<= the max gen). Load it, then
|
||||
|
@ -237,7 +236,7 @@ final class IndexFileDeleter implements Closeable {
|
|||
// We keep commits list in sorted order (oldest to newest):
|
||||
CollectionUtil.timSort(commits);
|
||||
|
||||
// refCounts only includes "normal" filenames (does not include segments.gen, write.lock)
|
||||
// refCounts only includes "normal" filenames (does not include write.lock)
|
||||
inflateGens(segmentInfos, refCounts.keySet(), infoStream);
|
||||
|
||||
// Now delete anything with ref count at 0. These are
|
||||
|
@ -282,7 +281,7 @@ final class IndexFileDeleter implements Closeable {
|
|||
Map<String,Long> maxPerSegmentGen = new HashMap<>();
|
||||
|
||||
for(String fileName : files) {
|
||||
if (fileName.equals(IndexFileNames.SEGMENTS_GEN) || fileName.equals(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
if (fileName.equals(IndexFileNames.OLD_SEGMENTS_GEN) || fileName.equals(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
// do nothing
|
||||
} else if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
|
||||
try {
|
||||
|
@ -290,6 +289,12 @@ final class IndexFileDeleter implements Closeable {
|
|||
} catch (NumberFormatException ignore) {
|
||||
// trash file: we have to handle this since we allow anything starting with 'segments' here
|
||||
}
|
||||
} else if (fileName.startsWith(IndexFileNames.PENDING_SEGMENTS)) {
|
||||
try {
|
||||
maxSegmentGen = Math.max(SegmentInfos.generationFromSegmentsFileName(fileName.substring(8)), maxSegmentGen);
|
||||
} catch (NumberFormatException ignore) {
|
||||
// trash file: we have to handle this since we allow anything starting with 'pending_segments' here
|
||||
}
|
||||
} else {
|
||||
String segmentName = IndexFileNames.parseSegmentName(fileName);
|
||||
assert segmentName.startsWith("_"): "wtf? file=" + fileName;
|
||||
|
@ -417,7 +422,7 @@ final class IndexFileDeleter implements Closeable {
|
|||
* is non-null, we will only delete files corresponding to
|
||||
* that segment.
|
||||
*/
|
||||
public void refresh(String segmentName) throws IOException {
|
||||
void refresh(String segmentName) throws IOException {
|
||||
assert locked();
|
||||
|
||||
String[] files = directory.listAll();
|
||||
|
@ -439,8 +444,11 @@ final class IndexFileDeleter implements Closeable {
|
|||
if ((segmentName == null || fileName.startsWith(segmentPrefix1) || fileName.startsWith(segmentPrefix2)) &&
|
||||
!fileName.endsWith("write.lock") &&
|
||||
!refCounts.containsKey(fileName) &&
|
||||
!fileName.equals(IndexFileNames.SEGMENTS_GEN) &&
|
||||
(m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS))) {
|
||||
(m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS)
|
||||
// we only try to clear out pending_segments_N during rollback(), because we don't ref-count it
|
||||
// TODO: this is sneaky, should we do this, or change TestIWExceptions? rollback closes anyway, and
|
||||
// any leftover file will be deleted/retried on next IW bootup anyway...
|
||||
|| (segmentName == null && fileName.startsWith(IndexFileNames.PENDING_SEGMENTS)))) {
|
||||
// Unreferenced file, so remove it
|
||||
if (infoStream.isEnabled("IFD")) {
|
||||
infoStream.message("IFD", "refresh [prefix=" + segmentName + "]: removing newly created unreferenced file \"" + fileName + "\"");
|
||||
|
@ -450,7 +458,7 @@ final class IndexFileDeleter implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public void refresh() throws IOException {
|
||||
void refresh() throws IOException {
|
||||
// Set to null so that we regenerate the list of pending
|
||||
// files; else we can accumulate same file more than
|
||||
// once
|
||||
|
|
|
@ -47,11 +47,11 @@ public final class IndexFileNames {
|
|||
/** Name of the index segment file */
|
||||
public static final String SEGMENTS = "segments";
|
||||
|
||||
/** Extension of gen file */
|
||||
public static final String GEN_EXTENSION = "gen";
|
||||
/** Name of pending index segment file */
|
||||
public static final String PENDING_SEGMENTS = "pending_segments";
|
||||
|
||||
/** Name of the generation reference file name */
|
||||
public static final String SEGMENTS_GEN = "segments." + GEN_EXTENSION;
|
||||
public static final String OLD_SEGMENTS_GEN = "segments.gen";
|
||||
|
||||
/** Extension of compound file */
|
||||
public static final String COMPOUND_FILE_EXTENSION = "cfs";
|
||||
|
@ -59,19 +59,6 @@ public final class IndexFileNames {
|
|||
/** Extension of compound file entries */
|
||||
public static final String COMPOUND_FILE_ENTRIES_EXTENSION = "cfe";
|
||||
|
||||
/**
|
||||
* This array contains all filename extensions used by
|
||||
* Lucene's index files, with one exception, namely the
|
||||
* extension made up from <code>.s</code> + a number.
|
||||
* Also note that Lucene's <code>segments_N</code> files
|
||||
* do not have any filename extension.
|
||||
*/
|
||||
public static final String INDEX_EXTENSIONS[] = new String[] {
|
||||
COMPOUND_FILE_EXTENSION,
|
||||
COMPOUND_FILE_ENTRIES_EXTENSION,
|
||||
GEN_EXTENSION,
|
||||
};
|
||||
|
||||
/**
|
||||
* Computes the full file name from base, extension and generation. If the
|
||||
* generation is -1, the file name is null. If it's 0, the file name is
|
||||
|
|
|
@ -53,25 +53,13 @@ import org.apache.lucene.util.StringHelper;
|
|||
* <tt>segments_N</tt>. There may be one or more <tt>segments_N</tt> files in
|
||||
* the index; however, the one with the largest generation is the active one
|
||||
* (when older segments_N files are present it's because they temporarily cannot
|
||||
* be deleted, or, a writer is in the process of committing, or a custom
|
||||
* {@link org.apache.lucene.index.IndexDeletionPolicy IndexDeletionPolicy} is in
|
||||
* be deleted, or a custom {@link IndexDeletionPolicy} is in
|
||||
* use). This file lists each segment by name and has details about the codec
|
||||
* and generation of deletes.
|
||||
* </p>
|
||||
* <p>
|
||||
* There is also a file <tt>segments.gen</tt>. This file contains the current
|
||||
* generation (the <tt>_N</tt> in <tt>segments_N</tt>) of the index. This is
|
||||
* used only as a fallback in case the current generation cannot be accurately
|
||||
* determined by directory listing alone (as is the case for some NFS clients
|
||||
* with time-based directory cache expiration). This file simply contains an
|
||||
* {@link DataOutput#writeInt Int32} version header (
|
||||
* {@link #FORMAT_SEGMENTS_GEN_CURRENT}), followed by the generation recorded as
|
||||
* {@link DataOutput#writeLong Int64}, written twice.
|
||||
* </p>
|
||||
* <p>
|
||||
* Files:
|
||||
* <ul>
|
||||
* <li><tt>segments.gen</tt>: GenHeader, Generation, Generation, Footer
|
||||
* <li><tt>segments_N</tt>: Header, Version, NameCounter, SegCount, <SegName,
|
||||
* SegCodec, DelGen, DeletionCount, FieldInfosGen, DocValuesGen,
|
||||
* UpdatesFiles><sup>SegCount</sup>, CommitUserData, Footer
|
||||
|
@ -141,14 +129,6 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
/** The file format version for the segments_N codec header, since 4.11+ */
|
||||
public static final int VERSION_411 = 4;
|
||||
|
||||
// Used for the segments.gen file only!
|
||||
// Whenever you add a new format, make it 1 smaller (negative version logic)!
|
||||
private static final int FORMAT_SEGMENTS_GEN_47 = -2;
|
||||
private static final int FORMAT_SEGMENTS_GEN_CHECKSUM = -3;
|
||||
private static final int FORMAT_SEGMENTS_GEN_START = FORMAT_SEGMENTS_GEN_47;
|
||||
/** Current format of segments.gen */
|
||||
public static final int FORMAT_SEGMENTS_GEN_CURRENT = FORMAT_SEGMENTS_GEN_CHECKSUM;
|
||||
|
||||
/** Used to name new segments. */
|
||||
// TODO: should this be a long ...?
|
||||
public int counter;
|
||||
|
@ -201,7 +181,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
}
|
||||
long max = -1;
|
||||
for (String file : files) {
|
||||
if (file.startsWith(IndexFileNames.SEGMENTS) && !file.equals(IndexFileNames.SEGMENTS_GEN)) {
|
||||
if (file.startsWith(IndexFileNames.SEGMENTS) && !file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
|
||||
long gen = generationFromSegmentsFileName(file);
|
||||
if (gen > max) {
|
||||
max = gen;
|
||||
|
@ -275,39 +255,9 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
}
|
||||
|
||||
/**
|
||||
* A utility for writing the {@link IndexFileNames#SEGMENTS_GEN} file to a
|
||||
* {@link Directory}.
|
||||
*
|
||||
* <p>
|
||||
* <b>NOTE:</b> this is an internal utility which is kept public so that it's
|
||||
* accessible by code from other packages. You should avoid calling this
|
||||
* method unless you're absolutely sure what you're doing!
|
||||
*
|
||||
* @lucene.internal
|
||||
* Get the next pending_segments_N filename that will be written.
|
||||
*/
|
||||
public static void writeSegmentsGen(Directory dir, long generation) {
|
||||
try {
|
||||
IndexOutput genOutput = dir.createOutput(IndexFileNames.SEGMENTS_GEN, IOContext.READONCE);
|
||||
try {
|
||||
genOutput.writeInt(FORMAT_SEGMENTS_GEN_CURRENT);
|
||||
genOutput.writeLong(generation);
|
||||
genOutput.writeLong(generation);
|
||||
CodecUtil.writeFooter(genOutput);
|
||||
} finally {
|
||||
genOutput.close();
|
||||
dir.sync(Collections.singleton(IndexFileNames.SEGMENTS_GEN));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// It's OK if we fail to write this file since it's
|
||||
// used only as one of the retry fallbacks.
|
||||
IOUtils.deleteFilesIgnoringExceptions(dir, IndexFileNames.SEGMENTS_GEN);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next segments_N filename that will be written.
|
||||
*/
|
||||
public String getNextSegmentFileName() {
|
||||
public String getNextPendingSegmentFileName() {
|
||||
long nextGeneration;
|
||||
|
||||
if (generation == -1) {
|
||||
|
@ -315,7 +265,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
} else {
|
||||
nextGeneration = generation+1;
|
||||
}
|
||||
return IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
|
||||
return IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS,
|
||||
"",
|
||||
nextGeneration);
|
||||
}
|
||||
|
@ -462,13 +412,13 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
}.run();
|
||||
}
|
||||
|
||||
// Only non-null after prepareCommit has been called and
|
||||
// Only true after prepareCommit has been called and
|
||||
// before finishCommit is called
|
||||
IndexOutput pendingSegnOutput;
|
||||
boolean pendingCommit;
|
||||
|
||||
private void write(Directory directory) throws IOException {
|
||||
|
||||
String segmentFileName = getNextSegmentFileName();
|
||||
String segmentFileName = getNextPendingSegmentFileName();
|
||||
|
||||
// Always advance the generation on write:
|
||||
if (generation == -1) {
|
||||
|
@ -509,10 +459,14 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
}
|
||||
segnOutput.writeStringStringMap(userData);
|
||||
segnOutput.writeString(StringHelper.randomId());
|
||||
pendingSegnOutput = segnOutput;
|
||||
CodecUtil.writeFooter(segnOutput);
|
||||
segnOutput.close();
|
||||
directory.sync(Collections.singleton(segmentFileName));
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
if (success) {
|
||||
pendingCommit = true;
|
||||
} else {
|
||||
// We hit an exception above; try to close the file
|
||||
// but suppress any exception:
|
||||
IOUtils.closeWhileHandlingException(segnOutput);
|
||||
|
@ -570,34 +524,6 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
SegmentInfos.infoStream = infoStream;
|
||||
}
|
||||
|
||||
/* Advanced configuration of retry logic in loading
|
||||
segments_N file */
|
||||
private static int defaultGenLookaheadCount = 10;
|
||||
|
||||
/**
|
||||
* Advanced: set how many times to try incrementing the
|
||||
* gen when loading the segments file. This only runs if
|
||||
* the primary (listing directory) and secondary (opening
|
||||
* segments.gen file) methods fail to find the segments
|
||||
* file.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public static void setDefaultGenLookaheadCount(int count) {
|
||||
defaultGenLookaheadCount = count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@code defaultGenLookaheadCount}.
|
||||
*
|
||||
* @see #setDefaultGenLookaheadCount
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public static int getDefaultGenLookahedCount() {
|
||||
return defaultGenLookaheadCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code infoStream}.
|
||||
*
|
||||
|
@ -649,18 +575,13 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
return doBody(commit.getSegmentsFileName());
|
||||
}
|
||||
|
||||
String segmentFileName = null;
|
||||
long lastGen = -1;
|
||||
long gen = 0;
|
||||
int genLookaheadCount = 0;
|
||||
long gen = -1;
|
||||
IOException exc = null;
|
||||
int retryCount = 0;
|
||||
|
||||
boolean useFirstMethod = true;
|
||||
|
||||
// Loop until we succeed in calling doBody() without
|
||||
// hitting an IOException. An IOException most likely
|
||||
// means a commit was in process and has finished, in
|
||||
// means an IW deleted our commit while opening
|
||||
// the time it took us to load the now-old infos files
|
||||
// (and segments files). It's also possible it's a
|
||||
// true error (corrupt index). To distinguish these,
|
||||
|
@ -669,127 +590,26 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
// don't, then the original error is real and we throw
|
||||
// it.
|
||||
|
||||
// We have three methods for determining the current
|
||||
// generation. We try the first two in parallel (when
|
||||
// useFirstMethod is true), and fall back to the third
|
||||
// when necessary.
|
||||
|
||||
while(true) {
|
||||
|
||||
if (useFirstMethod) {
|
||||
|
||||
// List the directory and use the highest
|
||||
// segments_N file. This method works well as long
|
||||
// as there is no stale caching on the directory
|
||||
// contents (NOTE: NFS clients often have such stale
|
||||
// caching):
|
||||
String[] files = null;
|
||||
|
||||
long genA = -1;
|
||||
|
||||
files = directory.listAll();
|
||||
|
||||
if (files != null) {
|
||||
genA = getLastCommitGeneration(files);
|
||||
for (;;) {
|
||||
lastGen = gen;
|
||||
String files[] = directory.listAll();
|
||||
String files2[] = directory.listAll();
|
||||
Arrays.sort(files);
|
||||
Arrays.sort(files2);
|
||||
if (!Arrays.equals(files, files2)) {
|
||||
// listAll() is weakly consistent, this means we hit "concurrent modification exception"
|
||||
continue;
|
||||
}
|
||||
gen = getLastCommitGeneration(files);
|
||||
|
||||
if (infoStream != null) {
|
||||
message("directory listing genA=" + genA);
|
||||
message("directory listing gen=" + gen);
|
||||
}
|
||||
|
||||
// Also open segments.gen and read its
|
||||
// contents. Then we take the larger of the two
|
||||
// gens. This way, if either approach is hitting
|
||||
// a stale cache (NFS) we have a better chance of
|
||||
// getting the right generation.
|
||||
long genB = -1;
|
||||
ChecksumIndexInput genInput = null;
|
||||
try {
|
||||
genInput = directory.openChecksumInput(IndexFileNames.SEGMENTS_GEN, IOContext.READONCE);
|
||||
} catch (IOException e) {
|
||||
if (infoStream != null) {
|
||||
message("segments.gen open: IOException " + e);
|
||||
}
|
||||
}
|
||||
|
||||
if (genInput != null) {
|
||||
try {
|
||||
int version = genInput.readInt();
|
||||
if (version == FORMAT_SEGMENTS_GEN_47 || version == FORMAT_SEGMENTS_GEN_CHECKSUM) {
|
||||
long gen0 = genInput.readLong();
|
||||
long gen1 = genInput.readLong();
|
||||
if (infoStream != null) {
|
||||
message("fallback check: " + gen0 + "; " + gen1);
|
||||
}
|
||||
if (version == FORMAT_SEGMENTS_GEN_CHECKSUM) {
|
||||
CodecUtil.checkFooter(genInput);
|
||||
} else {
|
||||
CodecUtil.checkEOF(genInput);
|
||||
}
|
||||
if (gen0 == gen1) {
|
||||
// The file is consistent.
|
||||
genB = gen0;
|
||||
}
|
||||
} else {
|
||||
throw new IndexFormatTooNewException(genInput, version, FORMAT_SEGMENTS_GEN_START, FORMAT_SEGMENTS_GEN_CURRENT);
|
||||
}
|
||||
} catch (IOException err2) {
|
||||
// rethrow any format exception
|
||||
if (err2 instanceof CorruptIndexException) throw err2;
|
||||
} finally {
|
||||
genInput.close();
|
||||
}
|
||||
}
|
||||
|
||||
if (infoStream != null) {
|
||||
message(IndexFileNames.SEGMENTS_GEN + " check: genB=" + genB);
|
||||
}
|
||||
|
||||
// Pick the larger of the two gen's:
|
||||
gen = Math.max(genA, genB);
|
||||
|
||||
if (gen == -1) {
|
||||
// Neither approach found a generation
|
||||
throw new IndexNotFoundException("no segments* file found in " + directory + ": files: " + Arrays.toString(files));
|
||||
}
|
||||
}
|
||||
|
||||
if (useFirstMethod && lastGen == gen && retryCount >= 2) {
|
||||
// Give up on first method -- this is 3rd cycle on
|
||||
// listing directory and checking gen file to
|
||||
// attempt to locate the segments file.
|
||||
useFirstMethod = false;
|
||||
}
|
||||
|
||||
// Second method: since both directory cache and
|
||||
// file contents cache seem to be stale, just
|
||||
// advance the generation.
|
||||
if (!useFirstMethod) {
|
||||
if (genLookaheadCount < defaultGenLookaheadCount) {
|
||||
gen++;
|
||||
genLookaheadCount++;
|
||||
if (infoStream != null) {
|
||||
message("look ahead increment gen to " + gen);
|
||||
}
|
||||
} else {
|
||||
// All attempts have failed -- throw first exc:
|
||||
throw exc;
|
||||
}
|
||||
} else if (lastGen == gen) {
|
||||
// This means we're about to try the same
|
||||
// segments_N last tried.
|
||||
retryCount++;
|
||||
} else {
|
||||
// Segment file has advanced since our last loop
|
||||
// (we made "progress"), so reset retryCount:
|
||||
retryCount = 0;
|
||||
}
|
||||
|
||||
lastGen = gen;
|
||||
|
||||
segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
|
||||
"",
|
||||
gen);
|
||||
} else if (gen > lastGen) {
|
||||
String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", gen);
|
||||
|
||||
try {
|
||||
Object v = doBody(segmentFileName);
|
||||
|
@ -798,56 +618,17 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
}
|
||||
return v;
|
||||
} catch (IOException err) {
|
||||
|
||||
// TODO: we should use the new IO apis in Java7 to get better exceptions on why the open failed. E.g. we don't want to fall back
|
||||
// if the open failed for a "different" reason (too many open files, access denied) than "the commit was in progress"
|
||||
|
||||
// Save the original root cause:
|
||||
if (exc == null) {
|
||||
exc = err;
|
||||
}
|
||||
|
||||
if (infoStream != null) {
|
||||
message("primary Exception on '" + segmentFileName + "': " + err + "'; will retry: retryCount=" + retryCount + "; gen = " + gen);
|
||||
}
|
||||
|
||||
if (gen > 1 && useFirstMethod && retryCount == 1) {
|
||||
|
||||
// This is our second time trying this same segments
|
||||
// file (because retryCount is 1), and, there is
|
||||
// possibly a segments_(N-1) (because gen > 1).
|
||||
// So, check if the segments_(N-1) exists and
|
||||
// try it if so:
|
||||
String prevSegmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
|
||||
"",
|
||||
gen-1);
|
||||
|
||||
boolean prevExists;
|
||||
|
||||
try {
|
||||
directory.openInput(prevSegmentFileName, IOContext.DEFAULT).close();
|
||||
prevExists = true;
|
||||
} catch (IOException ioe) {
|
||||
prevExists = false;
|
||||
}
|
||||
|
||||
if (prevExists) {
|
||||
if (infoStream != null) {
|
||||
message("fallback to prior segment file '" + prevSegmentFileName + "'");
|
||||
}
|
||||
try {
|
||||
Object v = doBody(prevSegmentFileName);
|
||||
if (infoStream != null) {
|
||||
message("success on fallback " + prevSegmentFileName);
|
||||
}
|
||||
return v;
|
||||
} catch (IOException err2) {
|
||||
if (infoStream != null) {
|
||||
message("secondary Exception on '" + prevSegmentFileName + "': " + err2 + "'; will retry");
|
||||
}
|
||||
}
|
||||
message("primary Exception on '" + segmentFileName + "': " + err + "'; will retry: gen = " + gen);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw exc;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -873,20 +654,18 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
}
|
||||
|
||||
final void rollbackCommit(Directory dir) {
|
||||
if (pendingSegnOutput != null) {
|
||||
// Suppress so we keep throwing the original exception
|
||||
// in our caller
|
||||
IOUtils.closeWhileHandlingException(pendingSegnOutput);
|
||||
pendingSegnOutput = null;
|
||||
if (pendingCommit) {
|
||||
pendingCommit = false;
|
||||
|
||||
// we try to clean up our pending_segments_N
|
||||
|
||||
// Must carefully compute fileName from "generation"
|
||||
// since lastGeneration isn't incremented:
|
||||
final String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
|
||||
"",
|
||||
generation);
|
||||
final String pending = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", generation);
|
||||
|
||||
// Suppress so we keep throwing the original exception
|
||||
// in our caller
|
||||
IOUtils.deleteFilesIgnoringExceptions(dir, segmentFileName);
|
||||
IOUtils.deleteFilesIgnoringExceptions(dir, pending);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -901,7 +680,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
* </p>
|
||||
**/
|
||||
final void prepareCommit(Directory dir) throws IOException {
|
||||
if (pendingSegnOutput != null) {
|
||||
if (pendingCommit) {
|
||||
throw new IllegalStateException("prepareCommit was already called");
|
||||
}
|
||||
write(dir);
|
||||
|
@ -933,56 +712,24 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
}
|
||||
|
||||
final void finishCommit(Directory dir) throws IOException {
|
||||
if (pendingSegnOutput == null) {
|
||||
if (pendingCommit == false) {
|
||||
throw new IllegalStateException("prepareCommit was not called");
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
CodecUtil.writeFooter(pendingSegnOutput);
|
||||
final String src = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", generation);
|
||||
final String dest = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation);
|
||||
dir.renameFile(src, dest);
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
// Closes pendingSegnOutput & deletes partial segments_N:
|
||||
// deletes pending_segments_N:
|
||||
rollbackCommit(dir);
|
||||
} else {
|
||||
success = false;
|
||||
try {
|
||||
pendingSegnOutput.close();
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
// Closes pendingSegnOutput & deletes partial segments_N:
|
||||
rollbackCommit(dir);
|
||||
} else {
|
||||
pendingSegnOutput = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: if we crash here, we have left a segments_N
|
||||
// file in the directory in a possibly corrupt state (if
|
||||
// some bytes made it to stable storage and others
|
||||
// didn't). But, the segments_N file includes checksum
|
||||
// at the end, which should catch this case. So when a
|
||||
// reader tries to read it, it will throw a
|
||||
// CorruptIndexException, which should cause the retry
|
||||
// logic in SegmentInfos to kick in and load the last
|
||||
// good (previous) segments_N-1 file.
|
||||
|
||||
final String fileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation);
|
||||
success = false;
|
||||
try {
|
||||
dir.sync(Collections.singleton(fileName));
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
|
||||
}
|
||||
}
|
||||
|
||||
pendingCommit = false;
|
||||
lastGeneration = generation;
|
||||
writeSegmentsGen(dir, generation);
|
||||
}
|
||||
|
||||
/** Writes & syncs to the Directory dir, taking care to
|
||||
|
|
|
@ -90,6 +90,18 @@ public abstract class Directory implements Closeable {
|
|||
*/
|
||||
public abstract void sync(Collection<String> names) throws IOException;
|
||||
|
||||
/**
|
||||
* Renames {@code source} to {@code dest} as an atomic operation,
|
||||
* where {@code dest} does not yet exist in the directory.
|
||||
* <p>
|
||||
* Notes: This method is used by IndexWriter to publish commits.
|
||||
* It is ok if this operation is not truly atomic, for example
|
||||
* both {@code source} and {@code dest} can be visible temporarily.
|
||||
* It is just important that the contents of {@code dest} appear
|
||||
* atomically, or an exception is thrown.
|
||||
*/
|
||||
public abstract void renameFile(String source, String dest) throws IOException;
|
||||
|
||||
/** Returns a stream reading an existing file, with the
|
||||
* specified read buffer size. The particular Directory
|
||||
* implementation may ignore the buffer size. Currently
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.FilenameFilter;
|
|||
import java.io.FilterOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
@ -292,13 +293,16 @@ public abstract class FSDirectory extends BaseDirectory {
|
|||
fsync(name);
|
||||
}
|
||||
|
||||
// fsync the directory itsself, but only if there was any file fsynced before
|
||||
// (otherwise it can happen that the directory does not yet exist)!
|
||||
if (!toSync.isEmpty()) {
|
||||
IOUtils.fsync(directory, true);
|
||||
staleFiles.removeAll(toSync);
|
||||
}
|
||||
|
||||
staleFiles.removeAll(toSync);
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
ensureOpen();
|
||||
Files.move(new File(directory, source).toPath(), new File(directory, dest).toPath(), StandardCopyOption.ATOMIC_MOVE);
|
||||
// TODO: should we move directory fsync to a separate 'syncMetadata' method?
|
||||
// for example, to improve listCommits(), IndexFileDeleter could also call that after deleting segments_Ns
|
||||
IOUtils.fsync(directory, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.lucene.store;
|
|||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.AtomicMoveNotSupportedException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -162,6 +163,17 @@ public class FileSwitchDirectory extends BaseDirectory {
|
|||
secondaryDir.sync(secondaryNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
Directory sourceDir = getDirectory(source);
|
||||
// won't happen with standard lucene index files since pending and commit will
|
||||
// always have the same extension ("")
|
||||
if (sourceDir != getDirectory(dest)) {
|
||||
throw new AtomicMoveNotSupportedException(source, dest, "source and dest are in different directories");
|
||||
}
|
||||
sourceDir.renameFile(source, dest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
return getDirectory(name).openInput(name, context);
|
||||
|
|
|
@ -69,6 +69,11 @@ public class FilterDirectory extends Directory {
|
|||
in.sync(names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
in.renameFile(source, dest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context)
|
||||
throws IOException {
|
||||
|
|
|
@ -178,6 +178,14 @@ public class NRTCachingDirectory extends FilterDirectory implements Accountable
|
|||
in.sync(fileNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
// NOTE: uncache is unnecessary for lucene's usage, as we always sync() before renaming.
|
||||
unCache(source);
|
||||
in.renameFile(source, dest);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
if (VERBOSE) {
|
||||
|
@ -221,7 +229,7 @@ public class NRTCachingDirectory extends FilterDirectory implements Accountable
|
|||
bytes = context.flushInfo.estimatedSegmentSize;
|
||||
}
|
||||
|
||||
return !name.equals(IndexFileNames.SEGMENTS_GEN) && (bytes <= maxMergeSizeBytes) && (bytes + cache.ramBytesUsed()) <= maxCachedBytes;
|
||||
return (bytes <= maxMergeSizeBytes) && (bytes + cache.ramBytesUsed()) <= maxCachedBytes;
|
||||
}
|
||||
|
||||
private final Object uncacheLock = new Object();
|
||||
|
|
|
@ -112,6 +112,8 @@ public class RAMDirectory extends BaseDirectory implements Accountable {
|
|||
@Override
|
||||
public final String[] listAll() {
|
||||
ensureOpen();
|
||||
// NOTE: this returns a "weakly consistent view". Unless we change Dir API, keep this,
|
||||
// and do not synchronize or anything stronger. its great for testing!
|
||||
// NOTE: fileMap.keySet().toArray(new String[0]) is broken in non Sun JDKs,
|
||||
// and the code below is resilient to map changes during the array population.
|
||||
Set<String> fileNames = fileMap.keySet();
|
||||
|
@ -190,6 +192,17 @@ public class RAMDirectory extends BaseDirectory implements Accountable {
|
|||
public void sync(Collection<String> names) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
ensureOpen();
|
||||
RAMFile file = fileMap.get(source);
|
||||
if (file == null) {
|
||||
throw new FileNotFoundException(source);
|
||||
}
|
||||
fileMap.put(dest, file);
|
||||
fileMap.remove(source);
|
||||
}
|
||||
|
||||
/** Returns a stream reading an existing file. */
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
|
@ -207,5 +220,4 @@ public class RAMDirectory extends BaseDirectory implements Accountable {
|
|||
isOpen = false;
|
||||
fileMap.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1115,8 +1115,8 @@ public class TestAddIndexes extends LuceneTestCase {
|
|||
w3.addIndexes(readers);
|
||||
w3.close();
|
||||
// we should now see segments_X,
|
||||
// segments.gen,_Y.cfs,_Y.cfe, _Z.si
|
||||
assertEquals("Only one compound segment should exist, but got: " + Arrays.toString(dir.listAll()), 5, dir.listAll().length);
|
||||
// _Y.cfs,_Y.cfe, _Z.si
|
||||
assertEquals("Only one compound segment should exist, but got: " + Arrays.toString(dir.listAll()), 4, dir.listAll().length);
|
||||
dir.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -78,9 +78,6 @@ public class TestAllFilesHaveCodecHeader extends LuceneTestCase {
|
|||
if (file.equals(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
continue; // write.lock has no header, thats ok
|
||||
}
|
||||
if (file.equals(IndexFileNames.SEGMENTS_GEN)) {
|
||||
continue; // segments.gen has no header, thats ok
|
||||
}
|
||||
if (file.endsWith(IndexFileNames.COMPOUND_FILE_EXTENSION)) {
|
||||
CompoundFileDirectory cfsDir = new CompoundFileDirectory(dir, file, newIOContext(random()), false);
|
||||
checkHeaders(cfsDir); // recurse into cfs
|
||||
|
|
|
@ -70,7 +70,7 @@ public class TestCrashCausesCorruptIndex extends LuceneTestCase {
|
|||
// writes segments_1:
|
||||
indexWriter.commit();
|
||||
|
||||
crashAfterCreateOutput.setCrashAfterCreateOutput("segments_2");
|
||||
crashAfterCreateOutput.setCrashAfterCreateOutput("pending_segments_2");
|
||||
indexWriter.addDocument(getDocument());
|
||||
try {
|
||||
// tries to write segments_2 but hits fake exc:
|
||||
|
|
|
@ -272,8 +272,6 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||
String fileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
|
||||
"",
|
||||
gen);
|
||||
dir.deleteFile(IndexFileNames.SEGMENTS_GEN);
|
||||
|
||||
boolean oneSecondResolution = true;
|
||||
|
||||
while(gen > 0) {
|
||||
|
@ -377,7 +375,6 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||
|
||||
// Simplistic check: just verify all segments_N's still
|
||||
// exist, and, I can open a reader on each:
|
||||
dir.deleteFile(IndexFileNames.SEGMENTS_GEN);
|
||||
long gen = SegmentInfos.getLastCommitGeneration(dir);
|
||||
while(gen > 0) {
|
||||
IndexReader reader = DirectoryReader.open(dir);
|
||||
|
@ -599,7 +596,6 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||
|
||||
// Simplistic check: just verify only the past N segments_N's still
|
||||
// exist, and, I can open a reader on each:
|
||||
dir.deleteFile(IndexFileNames.SEGMENTS_GEN);
|
||||
long gen = SegmentInfos.getLastCommitGeneration(dir);
|
||||
for(int i=0;i<N+1;i++) {
|
||||
try {
|
||||
|
@ -702,7 +698,6 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||
// exist, and, I can open a reader on each:
|
||||
long gen = SegmentInfos.getLastCommitGeneration(dir);
|
||||
|
||||
dir.deleteFile(IndexFileNames.SEGMENTS_GEN);
|
||||
int expectedCount = 0;
|
||||
|
||||
rwReader.close();
|
||||
|
|
|
@ -652,13 +652,11 @@ public class TestDirectoryReaderReopen extends LuceneTestCase {
|
|||
// Fail when reopen tries to open the live docs file:
|
||||
dir.failOn(new MockDirectoryWrapper.Failure() {
|
||||
|
||||
int failCount;
|
||||
boolean failed;
|
||||
|
||||
@Override
|
||||
public void eval(MockDirectoryWrapper dir) throws IOException {
|
||||
// Need to throw exc three times so the logic in
|
||||
// SegmentInfos.FindSegmentsFile "really believes" us:
|
||||
if (failCount >= 3) {
|
||||
if (failed) {
|
||||
return;
|
||||
}
|
||||
//System.out.println("failOn: ");
|
||||
|
@ -670,7 +668,7 @@ public class TestDirectoryReaderReopen extends LuceneTestCase {
|
|||
System.out.println("TEST: now fail; exc:");
|
||||
new Throwable().printStackTrace(System.out);
|
||||
}
|
||||
failCount++;
|
||||
failed = true;
|
||||
throw new FakeIOException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,6 +145,11 @@ public class TestFieldsReader extends LuceneTestCase {
|
|||
fsDir.sync(names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
fsDir.renameFile(source, dest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
fsDir.close();
|
||||
|
|
|
@ -1366,8 +1366,7 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
if (iter == 1) {
|
||||
// we run a full commit so there should be a segments file etc.
|
||||
assertTrue(files.contains("segments_1"));
|
||||
assertTrue(files.contains("segments.gen"));
|
||||
assertEquals(files.toString(), files.size(), 5);
|
||||
assertEquals(files.toString(), files.size(), 4);
|
||||
} else {
|
||||
// this is an NRT reopen - no segments files yet
|
||||
|
||||
|
|
|
@ -1111,7 +1111,8 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||
|
||||
// LUCENE-1044: Simulate checksum error in segments_N
|
||||
public void testSegmentsChecksumError() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
BaseDirectoryWrapper dir = newDirectory();
|
||||
dir.setCheckIndexOnClose(false); // we corrupt the index
|
||||
|
||||
IndexWriter writer = null;
|
||||
|
||||
|
@ -1137,17 +1138,12 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||
out.close();
|
||||
in.close();
|
||||
|
||||
IndexReader reader = null;
|
||||
try {
|
||||
reader = DirectoryReader.open(dir);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace(System.out);
|
||||
fail("segmentInfos failed to retry fallback to correct segments_N file");
|
||||
DirectoryReader.open(dir);
|
||||
fail("didn't get expected checksum error");
|
||||
} catch (CorruptIndexException expected) {
|
||||
}
|
||||
reader.close();
|
||||
|
||||
// should remove the corrumpted segments_N
|
||||
new IndexWriter(dir, newIndexWriterConfig(null)).close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
|
@ -1260,73 +1256,6 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||
dir.close();
|
||||
}
|
||||
|
||||
// Simulate a writer that crashed while writing segments
|
||||
// file: make sure we can still open the index (ie,
|
||||
// gracefully fallback to the previous segments file),
|
||||
// and that we can add to the index:
|
||||
public void testSimulatedCrashedWriter() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
if (dir instanceof MockDirectoryWrapper) {
|
||||
((MockDirectoryWrapper)dir).setPreventDoubleWrite(false);
|
||||
}
|
||||
|
||||
IndexWriter writer = null;
|
||||
|
||||
writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
|
||||
// add 100 documents
|
||||
for (int i = 0; i < 100; i++) {
|
||||
addDoc(writer);
|
||||
}
|
||||
|
||||
// close
|
||||
writer.close();
|
||||
|
||||
long gen = SegmentInfos.getLastCommitGeneration(dir);
|
||||
assertTrue("segment generation should be > 0 but got " + gen, gen > 0);
|
||||
|
||||
// Make the next segments file, with last byte
|
||||
// missing, to simulate a writer that crashed while
|
||||
// writing segments file:
|
||||
String fileNameIn = SegmentInfos.getLastCommitSegmentsFileName(dir);
|
||||
String fileNameOut = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
|
||||
"",
|
||||
1+gen);
|
||||
IndexInput in = dir.openInput(fileNameIn, newIOContext(random()));
|
||||
IndexOutput out = dir.createOutput(fileNameOut, newIOContext(random()));
|
||||
long length = in.length();
|
||||
for(int i=0;i<length-1;i++) {
|
||||
out.writeByte(in.readByte());
|
||||
}
|
||||
in.close();
|
||||
out.close();
|
||||
|
||||
IndexReader reader = null;
|
||||
try {
|
||||
reader = DirectoryReader.open(dir);
|
||||
} catch (Exception e) {
|
||||
fail("reader failed to open on a crashed index");
|
||||
}
|
||||
reader.close();
|
||||
|
||||
try {
|
||||
writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setOpenMode(OpenMode.CREATE));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace(System.out);
|
||||
fail("writer failed to open on a crashed index");
|
||||
}
|
||||
|
||||
// add 100 documents
|
||||
for (int i = 0; i < 100; i++) {
|
||||
addDoc(writer);
|
||||
}
|
||||
|
||||
// close
|
||||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testTermVectorExceptions() throws IOException {
|
||||
FailOnTermVectors[] failures = new FailOnTermVectors[] {
|
||||
new FailOnTermVectors(FailOnTermVectors.AFTER_INIT_STAGE),
|
||||
|
|
|
@ -344,6 +344,12 @@ public class TestBufferedIndexInput extends LuceneTestCase {
|
|||
public void sync(Collection<String> names) throws IOException {
|
||||
dir.sync(names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
dir.renameFile(source, dest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long fileLength(String name) throws IOException {
|
||||
return dir.fileLength(name);
|
||||
|
|
|
@ -77,7 +77,7 @@ public class TestIndexSplitter extends LuceneTestCase {
|
|||
// now test cmdline
|
||||
File destDir2 = createTempDir(LuceneTestCase.getTestClass().getSimpleName());
|
||||
IndexSplitter.main(new String[] {dir.getAbsolutePath(), destDir2.getAbsolutePath(), splitSegName});
|
||||
assertEquals(5, destDir2.listFiles().length);
|
||||
assertEquals(4, destDir2.listFiles().length);
|
||||
Directory fsDirDest2 = newFSDirectory(destDir2);
|
||||
r = DirectoryReader.open(fsDirDest2);
|
||||
assertEquals(50, r.maxDoc());
|
||||
|
|
|
@ -113,6 +113,8 @@ public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler {
|
|||
List<String> indexFiles = copiedFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
|
||||
String taxoSegmentsFile = IndexReplicationHandler.getSegmentsFile(taxoFiles, true);
|
||||
String indexSegmentsFile = IndexReplicationHandler.getSegmentsFile(indexFiles, false);
|
||||
String taxoPendingFile = taxoSegmentsFile == null ? null : "pending_" + taxoSegmentsFile;
|
||||
String indexPendingFile = "pending_" + indexSegmentsFile;
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -126,24 +128,35 @@ public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler {
|
|||
}
|
||||
indexDir.sync(indexFiles);
|
||||
|
||||
// now copy and fsync segmentsFile, taxonomy first because it is ok if a
|
||||
// now copy, fsync, and rename segmentsFile, taxonomy first because it is ok if a
|
||||
// reader sees a more advanced taxonomy than the index.
|
||||
if (taxoSegmentsFile != null) {
|
||||
taxoClientDir.copy(taxoDir, taxoSegmentsFile, taxoSegmentsFile, IOContext.READONCE);
|
||||
}
|
||||
indexClientDir.copy(indexDir, indexSegmentsFile, indexSegmentsFile, IOContext.READONCE);
|
||||
|
||||
if (taxoSegmentsFile != null) {
|
||||
taxoDir.sync(Collections.singletonList(taxoSegmentsFile));
|
||||
taxoClientDir.copy(taxoDir, taxoSegmentsFile, taxoPendingFile, IOContext.READONCE);
|
||||
}
|
||||
indexDir.sync(Collections.singletonList(indexSegmentsFile));
|
||||
indexClientDir.copy(indexDir, indexSegmentsFile, indexPendingFile, IOContext.READONCE);
|
||||
|
||||
if (taxoSegmentsFile != null) {
|
||||
taxoDir.sync(Collections.singletonList(taxoPendingFile));
|
||||
}
|
||||
indexDir.sync(Collections.singletonList(indexPendingFile));
|
||||
|
||||
if (taxoSegmentsFile != null) {
|
||||
taxoDir.renameFile(taxoPendingFile, taxoSegmentsFile);
|
||||
}
|
||||
|
||||
indexDir.renameFile(indexPendingFile, indexSegmentsFile);
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
if (taxoSegmentsFile != null) {
|
||||
taxoFiles.add(taxoSegmentsFile); // add it back so it gets deleted too
|
||||
taxoFiles.add(taxoPendingFile);
|
||||
}
|
||||
IndexReplicationHandler.cleanupFilesOnFailure(taxoDir, taxoFiles);
|
||||
indexFiles.add(indexSegmentsFile); // add it back so it gets deleted too
|
||||
indexFiles.add(indexPendingFile);
|
||||
IndexReplicationHandler.cleanupFilesOnFailure(indexDir, indexFiles);
|
||||
}
|
||||
}
|
||||
|
@ -157,10 +170,6 @@ public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler {
|
|||
+ " currentRevisionFiles=" + currentRevisionFiles);
|
||||
}
|
||||
|
||||
// update the segments.gen file
|
||||
IndexReplicationHandler.writeSegmentsGen(taxoSegmentsFile, taxoDir);
|
||||
IndexReplicationHandler.writeSegmentsGen(indexSegmentsFile, indexDir);
|
||||
|
||||
// Cleanup the index directory from old and unused index files.
|
||||
// NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
|
||||
// side-effects, e.g. if it hits sudden IO errors while opening the index
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.lucene.index.IndexCommit;
|
|||
import org.apache.lucene.index.IndexFileNames;
|
||||
import org.apache.lucene.index.IndexNotFoundException;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
|
@ -110,7 +109,7 @@ public class IndexReplicationHandler implements ReplicationHandler {
|
|||
}
|
||||
|
||||
String segmentsFile = files.remove(files.size() - 1);
|
||||
if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.SEGMENTS_GEN)) {
|
||||
if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
|
||||
throw new IllegalStateException("last file to copy+sync must be segments_N but got " + segmentsFile
|
||||
+ "; check your Revision implementation!");
|
||||
}
|
||||
|
@ -148,7 +147,6 @@ public class IndexReplicationHandler implements ReplicationHandler {
|
|||
if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) {
|
||||
Set<String> commitFiles = new HashSet<>();
|
||||
commitFiles.addAll(commit.getFileNames());
|
||||
commitFiles.add(IndexFileNames.SEGMENTS_GEN);
|
||||
Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
|
||||
for (String file : dir.listAll()) {
|
||||
if (!commitFiles.contains(file)
|
||||
|
@ -180,19 +178,6 @@ public class IndexReplicationHandler implements ReplicationHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes {@link IndexFileNames#SEGMENTS_GEN} file to the directory, reading
|
||||
* the generation from the given {@code segmentsFile}. If it is {@code null},
|
||||
* this method deletes segments.gen from the directory.
|
||||
*/
|
||||
public static void writeSegmentsGen(String segmentsFile, Directory dir) {
|
||||
if (segmentsFile != null) {
|
||||
SegmentInfos.writeSegmentsGen(dir, SegmentInfos.generationFromSegmentsFileName(segmentsFile));
|
||||
} else {
|
||||
IOUtils.deleteFilesIgnoringExceptions(dir, IndexFileNames.SEGMENTS_GEN);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with the given index directory and callback to notify when the
|
||||
* indexes were updated.
|
||||
|
@ -236,6 +221,7 @@ public class IndexReplicationHandler implements ReplicationHandler {
|
|||
Directory clientDir = sourceDirectory.values().iterator().next();
|
||||
List<String> files = copiedFiles.values().iterator().next();
|
||||
String segmentsFile = getSegmentsFile(files, false);
|
||||
String pendingSegmentsFile = "pending_" + segmentsFile;
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -245,14 +231,16 @@ public class IndexReplicationHandler implements ReplicationHandler {
|
|||
// fsync all copied files (except segmentsFile)
|
||||
indexDir.sync(files);
|
||||
|
||||
// now copy and fsync segmentsFile
|
||||
clientDir.copy(indexDir, segmentsFile, segmentsFile, IOContext.READONCE);
|
||||
indexDir.sync(Collections.singletonList(segmentsFile));
|
||||
// now copy and fsync segmentsFile as pending, then rename (simulating lucene commit)
|
||||
clientDir.copy(indexDir, segmentsFile, pendingSegmentsFile, IOContext.READONCE);
|
||||
indexDir.sync(Collections.singletonList(pendingSegmentsFile));
|
||||
indexDir.renameFile(pendingSegmentsFile, segmentsFile);
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
files.add(segmentsFile); // add it back so it gets deleted too
|
||||
files.add(pendingSegmentsFile);
|
||||
cleanupFilesOnFailure(indexDir, files);
|
||||
}
|
||||
}
|
||||
|
@ -266,9 +254,6 @@ public class IndexReplicationHandler implements ReplicationHandler {
|
|||
+ " currentRevisionFiles=" + currentRevisionFiles);
|
||||
}
|
||||
|
||||
// update the segments.gen file
|
||||
writeSegmentsGen(segmentsFile, indexDir);
|
||||
|
||||
// Cleanup the index directory from old and unused index files.
|
||||
// NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
|
||||
// side-effects, e.g. if it hits sudden IO errors while opening the index
|
||||
|
|
|
@ -126,7 +126,7 @@ public class IndexAndTaxonomyRevisionTest extends ReplicatorTestCase {
|
|||
assertEquals(2, sourceFiles.size());
|
||||
for (List<RevisionFile> files : sourceFiles.values()) {
|
||||
String lastFile = files.get(files.size() - 1).fileName;
|
||||
assertTrue(lastFile.startsWith(IndexFileNames.SEGMENTS) && !lastFile.equals(IndexFileNames.SEGMENTS_GEN));
|
||||
assertTrue(lastFile.startsWith(IndexFileNames.SEGMENTS));
|
||||
}
|
||||
indexWriter.close();
|
||||
} finally {
|
||||
|
|
|
@ -114,7 +114,7 @@ public class IndexRevisionTest extends ReplicatorTestCase {
|
|||
assertEquals(1, sourceFiles.size());
|
||||
List<RevisionFile> files = sourceFiles.values().iterator().next();
|
||||
String lastFile = files.get(files.size() - 1).fileName;
|
||||
assertTrue(lastFile.startsWith(IndexFileNames.SEGMENTS) && !lastFile.equals(IndexFileNames.SEGMENTS_GEN));
|
||||
assertTrue(lastFile.startsWith(IndexFileNames.SEGMENTS));
|
||||
writer.close();
|
||||
} finally {
|
||||
IOUtils.close(dir);
|
||||
|
|
|
@ -96,6 +96,29 @@ public abstract class BaseDirectoryTestCase extends LuceneTestCase {
|
|||
IOUtils.close(source, dest);
|
||||
}
|
||||
|
||||
public void testRename() throws Exception {
|
||||
Directory dir = getDirectory(createTempDir("testRename"));
|
||||
|
||||
IndexOutput output = dir.createOutput("foobar", newIOContext(random()));
|
||||
int numBytes = random().nextInt(20000);
|
||||
byte bytes[] = new byte[numBytes];
|
||||
random().nextBytes(bytes);
|
||||
output.writeBytes(bytes, bytes.length);
|
||||
output.close();
|
||||
|
||||
dir.renameFile("foobar", "foobaz");
|
||||
|
||||
IndexInput input = dir.openInput("foobaz", newIOContext(random()));
|
||||
byte bytes2[] = new byte[numBytes];
|
||||
input.readBytes(bytes2, 0, bytes2.length);
|
||||
assertEquals(input.length(), numBytes);
|
||||
input.close();
|
||||
|
||||
assertArrayEquals(bytes, bytes2);
|
||||
|
||||
dir.close();
|
||||
}
|
||||
|
||||
// TODO: are these semantics really needed by lucene? can we just throw exception?
|
||||
public void testCopyOverwrite() throws Exception {
|
||||
Directory source = getDirectory(createTempDir("testCopyOverwrite"));
|
||||
|
|
|
@ -238,6 +238,39 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void renameFile(String source, String dest) throws IOException {
|
||||
maybeYield();
|
||||
maybeThrowDeterministicException();
|
||||
|
||||
if (crashed) {
|
||||
throw new IOException("cannot rename after crash");
|
||||
}
|
||||
|
||||
if (openFiles.containsKey(source)) {
|
||||
if (assertNoDeleteOpenFile) {
|
||||
throw (AssertionError) fillOpenTrace(new AssertionError("MockDirectoryWrapper: file \"" + source + "\" is still open: cannot rename"), source, true);
|
||||
} else if (noDeleteOpenFile) {
|
||||
throw (IOException) fillOpenTrace(new IOException("MockDirectoryWrapper: file \"" + source + "\" is still open: cannot rename"), source, true);
|
||||
}
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
in.renameFile(source, dest);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success) {
|
||||
// we don't do this stuff with lucene's commit, but its just for completeness
|
||||
if (unSyncedFiles.contains(source)) {
|
||||
unSyncedFiles.remove(source);
|
||||
unSyncedFiles.add(dest);
|
||||
}
|
||||
openFilesDeleted.remove(source);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized final long sizeInBytes() throws IOException {
|
||||
if (in instanceof RAMDirectory)
|
||||
return ((RAMDirectory) in).ramBytesUsed();
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.lucene.index.IndexFileNames;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
|
@ -236,8 +237,6 @@ public class BlockDirectory extends Directory {
|
|||
for (String file : files) {
|
||||
cache.delete(getFileCacheName(file));
|
||||
}
|
||||
// segments.gen won't be removed above
|
||||
cache.delete(dirName + "/" + "segments.gen");
|
||||
|
||||
} catch (FileNotFoundException e) {
|
||||
// the local file system folder may be gone
|
||||
|
@ -342,7 +341,9 @@ public class BlockDirectory extends Directory {
|
|||
* file/context.
|
||||
*/
|
||||
boolean useWriteCache(String name, IOContext context) {
|
||||
if (!blockCacheWriteEnabled) {
|
||||
if (!blockCacheWriteEnabled || name.startsWith(IndexFileNames.PENDING_SEGMENTS)) {
|
||||
// for safety, don't bother caching pending commits.
|
||||
// the cache does support renaming (renameCacheFile), but thats a scary optimization.
|
||||
return false;
|
||||
}
|
||||
if (blockCacheFileTypes != null && !isCachableFile(name)) {
|
||||
|
@ -375,6 +376,11 @@ public class BlockDirectory extends Directory {
|
|||
directory.deleteFile(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
directory.renameFile(source, dest);
|
||||
}
|
||||
|
||||
public long fileLength(String name) throws IOException {
|
||||
return directory.fileLength(name);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -44,11 +45,11 @@ public class HdfsDirectory extends BaseDirectory {
|
|||
public static final int BUFFER_SIZE = 8192;
|
||||
|
||||
private static final String LF_EXT = ".lf";
|
||||
protected static final String SEGMENTS_GEN = "segments.gen";
|
||||
protected Path hdfsDirPath;
|
||||
protected Configuration configuration;
|
||||
|
||||
private final FileSystem fileSystem;
|
||||
private final FileContext fileContext;
|
||||
|
||||
public HdfsDirectory(Path hdfsDirPath, Configuration configuration)
|
||||
throws IOException {
|
||||
|
@ -56,6 +57,7 @@ public class HdfsDirectory extends BaseDirectory {
|
|||
this.hdfsDirPath = hdfsDirPath;
|
||||
this.configuration = configuration;
|
||||
fileSystem = FileSystem.newInstance(hdfsDirPath.toUri(), configuration);
|
||||
fileContext = FileContext.getFileContext(hdfsDirPath.toUri(), configuration);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -98,9 +100,6 @@ public class HdfsDirectory extends BaseDirectory {
|
|||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
if (SEGMENTS_GEN.equals(name)) {
|
||||
return new NullIndexOutput();
|
||||
}
|
||||
return new HdfsFileWriter(getFileSystem(), new Path(hdfsDirPath, name));
|
||||
}
|
||||
|
||||
|
@ -138,6 +137,13 @@ public class HdfsDirectory extends BaseDirectory {
|
|||
getFileSystem().delete(path, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
Path sourcePath = new Path(hdfsDirPath, source);
|
||||
Path destPath = new Path(hdfsDirPath, dest);
|
||||
fileContext.rename(sourcePath, destPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long fileLength(String name) throws IOException {
|
||||
return HdfsFileReader.getLength(getFileSystem(),
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
package org.apache.solr.store.hdfs;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
|
||||
/**
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public class NullIndexOutput extends IndexOutput {
|
||||
|
||||
private long pos;
|
||||
private long length;
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return pos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws IOException {
|
||||
pos++;
|
||||
updateLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] b, int offset, int length) throws IOException {
|
||||
pos += length;
|
||||
updateLength();
|
||||
}
|
||||
|
||||
private void updateLength() {
|
||||
if (pos > length) {
|
||||
length = pos;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getChecksum() throws IOException {
|
||||
return 0; // we don't write anything.
|
||||
}
|
||||
}
|
|
@ -141,9 +141,6 @@ public class CoreAdminCreateDiscoverTest extends SolrTestCaseJ4 {
|
|||
// Should have segments in the directory pointed to by the ${DATA_TEST}.
|
||||
File test = new File(dataDir, "index");
|
||||
assertTrue("Should have found index dir at " + test.getAbsolutePath(), test.exists());
|
||||
File gen = new File(test, "segments.gen");
|
||||
assertTrue("Should be segments.gen in the dir at " + gen.getAbsolutePath(), gen.exists());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -276,9 +273,6 @@ public class CoreAdminCreateDiscoverTest extends SolrTestCaseJ4 {
|
|||
// Should have segments in the directory pointed to by the ${DATA_TEST}.
|
||||
File test = new File(data, "index");
|
||||
assertTrue("Should have found index dir at " + test.getAbsolutePath(), test.exists());
|
||||
File gen = new File(test, "segments.gen");
|
||||
assertTrue("Should be segments.gen in the dir at " + gen.getAbsolutePath(), gen.exists());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -120,8 +120,6 @@ public class CoreAdminHandlerTest extends SolrTestCaseJ4 {
|
|||
// Should have segments in the directory pointed to by the ${DATA_TEST}.
|
||||
File test = new File(dataDir, "index");
|
||||
assertTrue("Should have found index dir at " + test.getAbsolutePath(), test.exists());
|
||||
test = new File(test,"segments.gen");
|
||||
assertTrue("Should have found segments.gen at " + test.getAbsolutePath(), test.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -119,6 +119,26 @@ public class HdfsDirectoryTest extends SolrTestCaseJ4 {
|
|||
assertFalse(slowFileExists(directory, "testing.test"));
|
||||
}
|
||||
|
||||
public void testRename() throws IOException {
|
||||
String[] listAll = directory.listAll();
|
||||
for (String file : listAll) {
|
||||
directory.deleteFile(file);
|
||||
}
|
||||
|
||||
IndexOutput output = directory.createOutput("testing.test", new IOContext());
|
||||
output.writeInt(12345);
|
||||
output.close();
|
||||
directory.renameFile("testing.test", "testing.test.renamed");
|
||||
assertFalse(slowFileExists(directory, "testing.test"));
|
||||
assertTrue(slowFileExists(directory, "testing.test.renamed"));
|
||||
IndexInput input = directory.openInput("testing.test.renamed", new IOContext());
|
||||
assertEquals(12345, input.readInt());
|
||||
assertEquals(input.getFilePointer(), input.length());
|
||||
input.close();
|
||||
directory.deleteFile("testing.test.renamed");
|
||||
assertFalse(slowFileExists(directory, "testing.test.renamed"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEOF() throws IOException {
|
||||
Directory fsDir = new RAMDirectory();
|
||||
|
|
Loading…
Reference in New Issue