LUCENE-5969: simplify cfs for 5.0

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene5969@1629008 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Muir 2014-10-02 16:09:12 +00:00
parent 54473ee555
commit 5893b1da29
3 changed files with 70 additions and 488 deletions

View File

@ -1,368 +0,0 @@
package org.apache.lucene.codecs.lucene50;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
/**
* Combines multiple files into a single compound file.
*
* @see CompoundFileDirectory
* @lucene.internal
*/
final class CompoundFileWriter implements Closeable{
private static final class FileEntry {
/** source file */
String file;
long length;
/** temporary holder for the start of this file's data section */
long offset;
/** the directory which contains the file. */
Directory dir;
}
// versioning for the .cfs file
static final String DATA_CODEC = "CompoundFileWriterData";
static final int VERSION_START = 0;
static final int VERSION_CHECKSUM = 1;
static final int VERSION_SEGMENTHEADER = 2;
static final int VERSION_CURRENT = VERSION_SEGMENTHEADER;
// versioning for the .cfe file
static final String ENTRY_CODEC = "CompoundFileWriterEntries";
private final Directory directory;
private final Map<String, FileEntry> entries = new HashMap<>();
private final Set<String> seenIDs = new HashSet<>();
// all entries that are written to a sep. file but not yet moved into CFS
private final Queue<FileEntry> pendingEntries = new LinkedList<>();
private boolean closed = false;
private IndexOutput dataOut;
private final AtomicBoolean outputTaken = new AtomicBoolean(false);
final String entryTableName;
final String dataFileName;
final byte[] segmentID;
/**
* Create the compound stream in the specified file. The file name is the
* entire name (no extensions are added).
*
* @throws NullPointerException
* if <code>dir</code> or <code>name</code> is null
*/
CompoundFileWriter(byte segmentID[], Directory dir, String name) {
if (dir == null) {
throw new NullPointerException("directory cannot be null");
}
if (name == null) {
throw new NullPointerException("name cannot be null");
}
if (segmentID == null) {
throw new NullPointerException("segmentID cannot be null");
}
this.segmentID = segmentID;
directory = dir;
entryTableName = IndexFileNames.segmentFileName(
IndexFileNames.stripExtension(name), "",
IndexFileNames.COMPOUND_FILE_ENTRIES_EXTENSION);
dataFileName = name;
}
private synchronized IndexOutput getOutput(IOContext context) throws IOException {
if (dataOut == null) {
boolean success = false;
try {
dataOut = directory.createOutput(dataFileName, context);
CodecUtil.writeSegmentHeader(dataOut, DATA_CODEC, VERSION_CURRENT, segmentID, "");
success = true;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(dataOut);
}
}
}
return dataOut;
}
/** Returns the directory of the compound file. */
Directory getDirectory() {
return directory;
}
/** Returns the name of the compound file. */
String getName() {
return dataFileName;
}
/**
* Closes all resources and writes the entry table
*
* @throws IllegalStateException
* if close() had been called before or if no file has been added to
* this object
*/
@Override
public void close() throws IOException {
if (closed) {
return;
}
IndexOutput entryTableOut = null;
// TODO this code should clean up after itself
// (remove partial .cfs/.cfe)
boolean success = false;
try {
if (!pendingEntries.isEmpty() || outputTaken.get()) {
throw new IllegalStateException("CFS has pending open files");
}
closed = true;
// open the compound stream; we can safely use IOContext.DEFAULT
// here because this will only open the output if no file was
// added to the CFS
getOutput(IOContext.DEFAULT);
assert dataOut != null;
CodecUtil.writeFooter(dataOut);
success = true;
} finally {
if (success) {
IOUtils.close(dataOut);
} else {
IOUtils.closeWhileHandlingException(dataOut);
}
}
success = false;
try {
entryTableOut = directory.createOutput(entryTableName, IOContext.DEFAULT);
writeEntryTable(entries.values(), entryTableOut);
success = true;
} finally {
if (success) {
IOUtils.close(entryTableOut);
} else {
IOUtils.closeWhileHandlingException(entryTableOut);
}
}
}
private final void ensureOpen() {
if (closed) {
throw new AlreadyClosedException("CFS Directory is already closed");
}
}
/**
* Copy the contents of the file with specified extension into the provided
* output stream.
*/
private final long copyFileEntry(IndexOutput dataOut, FileEntry fileEntry)
throws IOException {
final IndexInput is = fileEntry.dir.openInput(fileEntry.file, IOContext.READONCE);
boolean success = false;
try {
final long startPtr = dataOut.getFilePointer();
final long length = fileEntry.length;
dataOut.copyBytes(is, length);
// Verify that the output length diff is equal to original file
long endPtr = dataOut.getFilePointer();
long diff = endPtr - startPtr;
if (diff != length)
throw new IOException("Difference in the output file offsets " + diff
+ " does not match the original file length " + length);
fileEntry.offset = startPtr;
success = true;
return length;
} finally {
if (success) {
IOUtils.close(is);
// copy successful - delete file
// if we can't we rely on IFD to pick up and retry
IOUtils.deleteFilesIgnoringExceptions(fileEntry.dir, fileEntry.file);
} else {
IOUtils.closeWhileHandlingException(is);
}
}
}
protected void writeEntryTable(Collection<FileEntry> entries,
IndexOutput entryOut) throws IOException {
CodecUtil.writeSegmentHeader(entryOut, ENTRY_CODEC, VERSION_CURRENT, segmentID, "");
entryOut.writeVInt(entries.size());
for (FileEntry fe : entries) {
entryOut.writeString(IndexFileNames.stripSegmentName(fe.file));
entryOut.writeLong(fe.offset);
entryOut.writeLong(fe.length);
}
CodecUtil.writeFooter(entryOut);
}
IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
boolean success = false;
boolean outputLocked = false;
try {
assert name != null : "name must not be null";
if (entries.containsKey(name)) {
throw new IllegalArgumentException("File " + name + " already exists");
}
final FileEntry entry = new FileEntry();
entry.file = name;
entries.put(name, entry);
final String id = IndexFileNames.stripSegmentName(name);
assert !seenIDs.contains(id): "file=\"" + name + "\" maps to id=\"" + id + "\", which was already written";
seenIDs.add(id);
final DirectCFSIndexOutput out;
if ((outputLocked = outputTaken.compareAndSet(false, true))) {
out = new DirectCFSIndexOutput(getOutput(context), entry, false);
} else {
entry.dir = this.directory;
out = new DirectCFSIndexOutput(directory.createOutput(name, context), entry,
true);
}
success = true;
return out;
} finally {
if (!success) {
entries.remove(name);
if (outputLocked) { // release the output lock if not successful
assert outputTaken.get();
releaseOutputLock();
}
}
}
}
final void releaseOutputLock() {
outputTaken.compareAndSet(true, false);
}
private final void prunePendingEntries() throws IOException {
// claim the output and copy all pending files in
if (outputTaken.compareAndSet(false, true)) {
try {
while (!pendingEntries.isEmpty()) {
FileEntry entry = pendingEntries.poll();
copyFileEntry(getOutput(new IOContext(new FlushInfo(0, entry.length))), entry);
entries.put(entry.file, entry);
}
} finally {
final boolean compareAndSet = outputTaken.compareAndSet(true, false);
assert compareAndSet;
}
}
}
long fileLength(String name) throws IOException {
FileEntry fileEntry = entries.get(name);
if (fileEntry == null) {
throw new FileNotFoundException(name + " does not exist");
}
return fileEntry.length;
}
boolean fileExists(String name) {
return entries.containsKey(name);
}
String[] listAll() {
return entries.keySet().toArray(new String[0]);
}
private final class DirectCFSIndexOutput extends IndexOutput {
private final IndexOutput delegate;
private final long offset;
private boolean closed;
private FileEntry entry;
private long writtenBytes;
private final boolean isSeparate;
DirectCFSIndexOutput(IndexOutput delegate, FileEntry entry,
boolean isSeparate) {
super();
this.delegate = delegate;
this.entry = entry;
entry.offset = offset = delegate.getFilePointer();
this.isSeparate = isSeparate;
}
@Override
public void close() throws IOException {
if (!closed) {
closed = true;
entry.length = writtenBytes;
if (isSeparate) {
delegate.close();
// we are a separate file - push into the pending entries
pendingEntries.add(entry);
} else {
// we have been written into the CFS directly - release the lock
releaseOutputLock();
}
// now prune all pending entries and push them into the CFS
prunePendingEntries();
}
}
@Override
public long getFilePointer() {
return delegate.getFilePointer() - offset;
}
@Override
public void writeByte(byte b) throws IOException {
assert !closed;
writtenBytes++;
delegate.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
assert !closed;
writtenBytes += length;
delegate.writeBytes(b, offset, length);
}
@Override
public long getChecksum() throws IOException {
return delegate.getChecksum();
}
}
}

View File

@ -28,6 +28,8 @@ import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
/**
* Lucene 5.0 compound file format
@ -64,17 +66,47 @@ public final class Lucene50CompoundFormat extends CompoundFormat {
@Override
public Directory getCompoundReader(Directory dir, SegmentInfo si, IOContext context) throws IOException {
String fileName = IndexFileNames.segmentFileName(si.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
return new CompoundFileDirectory(si.getId(), dir, fileName, context, false);
return new Lucene50CompoundReader(si.getId(), dir, fileName, context);
}
@Override
public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException {
String fileName = IndexFileNames.segmentFileName(si.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
try (CompoundFileDirectory cfs = new CompoundFileDirectory(si.getId(), dir, fileName, context, true)) {
String dataFile = IndexFileNames.segmentFileName(si.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
String entriesFile = IndexFileNames.segmentFileName(si.name, "", IndexFileNames.COMPOUND_FILE_ENTRIES_EXTENSION);
try (IndexOutput data = dir.createOutput(dataFile, context);
IndexOutput entries = dir.createOutput(entriesFile, context)) {
CodecUtil.writeSegmentHeader(data, DATA_CODEC, VERSION_CURRENT, si.getId(), "");
CodecUtil.writeSegmentHeader(entries, ENTRY_CODEC, VERSION_CURRENT, si.getId(), "");
// write number of files
entries.writeVInt(files.size());
for (String file : files) {
dir.copy(cfs, file, file, context);
checkAbort.work(dir.fileLength(file));
// write bytes for file
long startOffset = data.getFilePointer();
try (IndexInput in = dir.openInput(file, IOContext.READONCE)) {
data.copyBytes(in, in.length());
}
long endOffset = data.getFilePointer();
long length = endOffset - startOffset;
// write entry for file
entries.writeString(IndexFileNames.stripSegmentName(file));
entries.writeLong(startOffset);
entries.writeLong(length);
checkAbort.work(length);
}
CodecUtil.writeFooter(data);
CodecUtil.writeFooter(entries);
}
}
static final String DATA_CODEC = "Lucene50CompoundData";
static final String ENTRY_CODEC = "Lucene50CompoundEntries";
static final int VERSION_START = 0;
static final int VERSION_CURRENT = VERSION_START;
}

View File

@ -17,15 +17,11 @@ package org.apache.lucene.codecs.lucene50;
* limitations under the License.
*/
import org.apache.lucene.codecs.Codec; // javadocs
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.LiveDocsFormat; // javadocs
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.DataOutput; // javadocs
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@ -44,43 +40,9 @@ import java.io.IOException;
* Class for accessing a compound stream.
* This class implements a directory, but is limited to only read operations.
* Directory methods that would normally modify data throw an exception.
* <p>
* All files belonging to a segment have the same name with varying extensions.
* The extensions correspond to the different file formats used by the {@link Codec}.
* When using the Compound File format these files are collapsed into a
* single <tt>.cfs</tt> file (except for the {@link LiveDocsFormat}, with a
* corresponding <tt>.cfe</tt> file indexing its sub-files.
* <p>
* Files:
* <ul>
* <li><tt>.cfs</tt>: An optional "virtual" file consisting of all the other
* index files for systems that frequently run out of file handles.
* <li><tt>.cfe</tt>: The "virtual" compound file's entry table holding all
* entries in the corresponding .cfs file.
* </ul>
* <p>Description:</p>
* <ul>
* <li>Compound (.cfs) --&gt; Header, FileData <sup>FileCount</sup>, Footer</li>
* <li>Compound Entry Table (.cfe) --&gt; Header, FileCount, &lt;FileName,
* DataOffset, DataLength&gt; <sup>FileCount</sup></li>
* <li>Header --&gt; {@link CodecUtil#writeSegmentHeader SegmentHeader}</li>
* <li>FileCount --&gt; {@link DataOutput#writeVInt VInt}</li>
* <li>DataOffset,DataLength,Checksum --&gt; {@link DataOutput#writeLong UInt64}</li>
* <li>FileName --&gt; {@link DataOutput#writeString String}</li>
* <li>FileData --&gt; raw file data</li>
* <li>Footer --&gt; {@link CodecUtil#writeFooter CodecFooter}</li>
* </ul>
* <p>Notes:</p>
* <ul>
* <li>FileCount indicates how many files are contained in this compound file.
* The entry table that follows has that many entries.
* <li>Each directory entry contains a long pointer to the start of this file's data
* section, the files length, and a String with that file's name.
* </ul>
*
* @lucene.experimental
*/
final class CompoundFileDirectory extends BaseDirectory {
final class Lucene50CompoundReader extends BaseDirectory {
/** Offset/Length for a slice inside of a compound file */
public static final class FileEntry {
@ -90,65 +52,47 @@ final class CompoundFileDirectory extends BaseDirectory {
private final Directory directory;
private final String fileName;
protected final int readBufferSize;
private final Map<String,FileEntry> entries;
private final boolean openForWrite;
private static final Map<String,FileEntry> SENTINEL = Collections.emptyMap();
private final CompoundFileWriter writer;
private final IndexInput handle;
private int version;
private final byte[] segmentID;
/**
* Create a new CompoundFileDirectory.
*/
public CompoundFileDirectory(byte[] segmentID, Directory directory, String fileName, IOContext context, boolean openForWrite) throws IOException {
public Lucene50CompoundReader(byte[] segmentID, Directory directory, String fileName, IOContext context) throws IOException {
this.directory = directory;
this.segmentID = segmentID;
this.fileName = fileName;
this.readBufferSize = BufferedIndexInput.bufferSize(context);
this.isOpen = false;
this.openForWrite = openForWrite;
if (!openForWrite) {
boolean success = false;
handle = directory.openInput(fileName, context);
try {
this.entries = readEntries(directory, fileName);
CodecUtil.checkSegmentHeader(handle, CompoundFileWriter.DATA_CODEC, version, version, segmentID, "");
// NOTE: data file is too costly to verify checksum against all the bytes on open,
// but for now we at least verify proper structure of the checksum footer: which looks
// for FOOTER_MAGIC + algorithmID. This is cheap and can detect some forms of corruption
// such as file truncation.
CodecUtil.retrieveChecksum(handle);
success = true;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(handle);
}
this.entries = readEntries(segmentID, directory, fileName);
boolean success = false;
handle = directory.openInput(fileName, context);
try {
CodecUtil.checkSegmentHeader(handle, Lucene50CompoundFormat.DATA_CODEC, version, version, segmentID, "");
// NOTE: data file is too costly to verify checksum against all the bytes on open,
// but for now we at least verify proper structure of the checksum footer: which looks
// for FOOTER_MAGIC + algorithmID. This is cheap and can detect some forms of corruption
// such as file truncation.
CodecUtil.retrieveChecksum(handle);
success = true;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(handle);
}
this.isOpen = true;
writer = null;
} else {
assert !(directory instanceof CompoundFileDirectory) : "compound file inside of compound file: " + fileName;
this.entries = SENTINEL;
this.isOpen = true;
writer = new CompoundFileWriter(segmentID, directory, fileName);
handle = null;
}
this.isOpen = true;
}
/** Helper method that reads CFS entries from an input stream */
private final Map<String, FileEntry> readEntries(Directory dir, String name) throws IOException {
private final Map<String, FileEntry> readEntries(byte[] segmentID, Directory dir, String name) throws IOException {
Map<String,FileEntry> mapping = null;
final String entriesFileName = IndexFileNames.segmentFileName(IndexFileNames.stripExtension(name), "",
IndexFileNames.COMPOUND_FILE_ENTRIES_EXTENSION);
try (ChecksumIndexInput entriesStream = dir.openChecksumInput(entriesFileName, IOContext.READONCE)) {
Throwable priorE = null;
try {
version = CodecUtil.checkSegmentHeader(entriesStream, CompoundFileWriter.ENTRY_CODEC,
CompoundFileWriter.VERSION_START,
CompoundFileWriter.VERSION_CURRENT, segmentID, "");
version = CodecUtil.checkSegmentHeader(entriesStream, Lucene50CompoundFormat.ENTRY_CODEC,
Lucene50CompoundFormat.VERSION_START,
Lucene50CompoundFormat.VERSION_CURRENT, segmentID, "");
final int numEntries = entriesStream.readVInt();
mapping = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
@ -167,36 +111,18 @@ final class CompoundFileDirectory extends BaseDirectory {
CodecUtil.checkFooter(entriesStream, priorE);
}
}
return mapping;
}
public Directory getDirectory() {
return directory;
}
public String getName() {
return fileName;
return Collections.unmodifiableMap(mapping);
}
@Override
public synchronized void close() throws IOException {
if (!isOpen) {
// allow double close - usually to be consistent with other closeables
return; // already closed
}
public void close() throws IOException {
isOpen = false;
if (writer != null) {
assert openForWrite;
writer.close();
} else {
IOUtils.close(handle);
}
IOUtils.close(handle);
}
@Override
public synchronized IndexInput openInput(String name, IOContext context) throws IOException {
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
assert !openForWrite;
final String id = IndexFileNames.stripSegmentName(name);
final FileEntry entry = entries.get(id);
if (entry == null) {
@ -209,16 +135,12 @@ final class CompoundFileDirectory extends BaseDirectory {
@Override
public String[] listAll() {
ensureOpen();
String[] res;
if (writer != null) {
res = writer.listAll();
} else {
res = entries.keySet().toArray(new String[entries.size()]);
// Add the segment name
String seg = IndexFileNames.parseSegmentName(fileName);
for (int i = 0; i < res.length; i++) {
res[i] = seg + res[i];
}
String[] res = entries.keySet().toArray(new String[entries.size()]);
// Add the segment name
String seg = IndexFileNames.parseSegmentName(fileName);
for (int i = 0; i < res.length; i++) {
res[i] = seg + res[i];
}
return res;
}
@ -241,9 +163,6 @@ final class CompoundFileDirectory extends BaseDirectory {
@Override
public long fileLength(String name) throws IOException {
ensureOpen();
if (this.writer != null) {
return writer.fileLength(name);
}
FileEntry e = entries.get(IndexFileNames.stripSegmentName(name));
if (e == null)
throw new FileNotFoundException(name);
@ -252,8 +171,7 @@ final class CompoundFileDirectory extends BaseDirectory {
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
return writer.createOutput(name, context);
throw new UnsupportedOperationException();
}
@Override