LUCENE-5678: Use FileOutputStream instead of RandomAccessFile to write index data. BufferedIndexOutput and related APIs were removed.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1596057 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uwe Schindler 2014-05-19 21:08:16 +00:00
parent acedb179f4
commit c4bdb2dad0
8 changed files with 157 additions and 337 deletions

View File

@ -80,6 +80,12 @@ API Changes
* LUCENE-4371: Removed IndexInputSlicer and Directory.createSlicer() and replaced
with IndexInput.slice(). (Robert Muir)
* LUCENE-5678: IndexOutput no longer allows seeking, so it is no longer required
to use RandomAccessFile to write Indexes. Lucene now uses standard FileOutputStream
wrapped with OutputStreamIndexOutput to write index data. BufferedIndexOutput was
removed, because buffering and checksumming is provided by FilterOutputStreams,
provided by the JDK. (Uwe Schindler, Mike McCandless)
Documentation
* LUCENE-5392: Add/improve analysis package documentation to reflect

View File

@ -1,150 +0,0 @@
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.zip.CRC32;
/** Base implementation class for buffered {@link IndexOutput}. */
public abstract class BufferedIndexOutput extends IndexOutput {
/** The default buffer size in bytes ({@value #DEFAULT_BUFFER_SIZE}). */
public static final int DEFAULT_BUFFER_SIZE = 16384;
private final int bufferSize;
private final byte[] buffer;
private long bufferStart = 0; // position in file of buffer
private int bufferPosition = 0; // position in buffer
private final CRC32 crc = new CRC32();
/**
* Creates a new {@link BufferedIndexOutput} with the default buffer size
* ({@value #DEFAULT_BUFFER_SIZE} bytes see {@link #DEFAULT_BUFFER_SIZE})
*/
public BufferedIndexOutput() {
this(DEFAULT_BUFFER_SIZE);
}
/**
* Creates a new {@link BufferedIndexOutput} with the given buffer size.
* @param bufferSize the buffer size in bytes used to buffer writes internally.
* @throws IllegalArgumentException if the given buffer size is less or equal to <tt>0</tt>
*/
public BufferedIndexOutput(int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
}
this.bufferSize = bufferSize;
buffer = new byte[bufferSize];
}
@Override
public void writeByte(byte b) throws IOException {
if (bufferPosition >= bufferSize)
flush();
buffer[bufferPosition++] = b;
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
int bytesLeft = bufferSize - bufferPosition;
// is there enough space in the buffer?
if (bytesLeft >= length) {
// we add the data to the end of the buffer
System.arraycopy(b, offset, buffer, bufferPosition, length);
bufferPosition += length;
// if the buffer is full, flush it
if (bufferSize - bufferPosition == 0)
flush();
} else {
// is data larger then buffer?
if (length > bufferSize) {
// we flush the buffer
if (bufferPosition > 0)
flush();
// and write data at once
crc.update(b, offset, length);
flushBuffer(b, offset, length);
bufferStart += length;
} else {
// we fill/flush the buffer (until the input is written)
int pos = 0; // position in the input data
int pieceLength;
while (pos < length) {
pieceLength = (length - pos < bytesLeft) ? length - pos : bytesLeft;
System.arraycopy(b, pos + offset, buffer, bufferPosition, pieceLength);
pos += pieceLength;
bufferPosition += pieceLength;
// if the buffer is full, flush it
bytesLeft = bufferSize - bufferPosition;
if (bytesLeft == 0) {
flush();
bytesLeft = bufferSize;
}
}
}
}
}
/** Forces any buffered output to be written. */
protected void flush() throws IOException {
crc.update(buffer, 0, bufferPosition);
flushBuffer(buffer, bufferPosition);
bufferStart += bufferPosition;
bufferPosition = 0;
}
/** Expert: implements buffer write. Writes bytes at the current position in
* the output.
* @param b the bytes to write
* @param len the number of bytes to write
*/
private void flushBuffer(byte[] b, int len) throws IOException {
flushBuffer(b, 0, len);
}
/** Expert: implements buffer write. Writes bytes at the current position in
* the output.
* @param b the bytes to write
* @param offset the offset in the byte array
* @param len the number of bytes to write
*/
protected abstract void flushBuffer(byte[] b, int offset, int len) throws IOException;
@Override
public void close() throws IOException {
flush();
}
@Override
public long getFilePointer() {
return bufferStart + bufferPosition;
}
/**
* Returns size of the used output buffer in bytes.
* */
public final int getBufferSize() {
return bufferSize;
}
@Override
public long getChecksum() throws IOException {
flush();
return crc.getValue();
}
}

View File

@ -22,9 +22,10 @@ import org.apache.lucene.util.IOUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@ -262,7 +263,7 @@ public abstract class FSDirectory extends BaseDirectory {
ensureOpen();
ensureCanWrite(name);
return new FSIndexOutput(this, name);
return new FSIndexOutput(name);
}
protected void ensureCanWrite(String name) throws IOException {
@ -338,58 +339,37 @@ public abstract class FSDirectory extends BaseDirectory {
return this.getClass().getSimpleName() + "@" + directory + " lockFactory=" + getLockFactory();
}
/**
* Writes output with {@link RandomAccessFile#write(byte[], int, int)}
*/
protected static class FSIndexOutput extends BufferedIndexOutput {
final class FSIndexOutput extends OutputStreamIndexOutput {
/**
* The maximum chunk size is 8192 bytes, because {@link RandomAccessFile} mallocs
* The maximum chunk size is 8192 bytes, because {@link FileOutputStream} mallocs
* a native buffer outside of stack if the write buffer size is larger.
*/
private static final int CHUNK_SIZE = 8192;
static final int CHUNK_SIZE = 8192;
private final FSDirectory parent;
private final String name;
private final RandomAccessFile file;
private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
public FSIndexOutput(FSDirectory parent, String name) throws IOException {
super(CHUNK_SIZE);
this.parent = parent;
this.name = name;
file = new RandomAccessFile(new File(parent.directory, name), "rw");
isOpen = true;
}
@Override
protected void flushBuffer(byte[] b, int offset, int size) throws IOException {
assert isOpen;
while (size > 0) {
final int toWrite = Math.min(CHUNK_SIZE, size);
file.write(b, offset, toWrite);
offset += toWrite;
size -= toWrite;
}
assert size == 0;
public FSIndexOutput(String name) throws IOException {
super(new FilterOutputStream(new FileOutputStream(new File(directory, name))) {
// This implementation ensures, that we never write more than CHUNK_SIZE bytes:
@Override
public void write(byte[] b, int offset, int length) throws IOException {
while (length > 0) {
final int chunk = Math.min(length, CHUNK_SIZE);
out.write(b, offset, chunk);
length -= chunk;
offset += chunk;
}
}
}, CHUNK_SIZE);
this.name = name;
}
@Override
public void close() throws IOException {
parent.onIndexOutputClosed(name);
// only close the file if it has not been closed yet
if (isOpen) {
boolean success = false;
try {
super.close();
success = true;
} finally {
isOpen = false;
if (success) {
IOUtils.close(file);
} else {
IOUtils.closeWhileHandlingException(file);
}
}
try {
onIndexOutputClosed(name);
} finally {
super.close();
}
}
}

View File

@ -0,0 +1,76 @@
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;
/** Implementation class for buffered {@link IndexOutput} that writes to an {@link OutputStream}. */
public class OutputStreamIndexOutput extends IndexOutput {
private final CRC32 crc = new CRC32();
private final BufferedOutputStream os;
private long bytesWritten = 0L;
/**
* Creates a new {@link OutputStreamIndexOutput} with the given buffer size.
* @param bufferSize the buffer size in bytes used to buffer writes internally.
* @throws IllegalArgumentException if the given buffer size is less or equal to <tt>0</tt>
*/
public OutputStreamIndexOutput(OutputStream out, int bufferSize) {
this.os = new BufferedOutputStream(new CheckedOutputStream(out, crc), bufferSize);
}
@Override
public final void writeByte(byte b) throws IOException {
os.write(b);
bytesWritten++;
}
@Override
public final void writeBytes(byte[] b, int offset, int length) throws IOException {
os.write(b, offset, length);
bytesWritten += length;
}
@Override
public void close() throws IOException {
try (final OutputStream o = os) {
// We want to make sure that os.flush() was running before close:
// BufferedOutputStream may ignore IOExceptions while flushing on close().
// TODO: this is no longer an issue in Java 8:
// http://hg.openjdk.java.net/jdk8/tl/jdk/rev/759aa847dcaf
o.flush();
}
}
@Override
public final long getFilePointer() {
return bytesWritten;
}
@Override
public final long getChecksum() throws IOException {
os.flush();
return crc.getValue();
}
}

View File

@ -1,4 +1,5 @@
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -15,6 +16,7 @@ package org.apache.lucene.store;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
/**
@ -22,48 +24,58 @@ import java.io.IOException;
*
* @lucene.internal
*/
final class RateLimitedIndexOutput extends BufferedIndexOutput {
final class RateLimitedIndexOutput extends IndexOutput {
private final IndexOutput delegate;
private final BufferedIndexOutput bufferedDelegate;
private final RateLimiter rateLimiter;
/** How many bytes we've written since we last called rateLimiter.pause. */
private long bytesSinceLastPause;
/** Cached here not not always have to call RateLimiter#getMinPauseCheckBytes()
* which does volatile read. */
private long currentMinPauseCheckBytes;
RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
// TODO should we make buffer size configurable
if (delegate instanceof BufferedIndexOutput) {
bufferedDelegate = (BufferedIndexOutput) delegate;
this.delegate = delegate;
} else {
this.delegate = delegate;
bufferedDelegate = null;
}
this.delegate = delegate;
this.rateLimiter = rateLimiter;
this.currentMinPauseCheckBytes = rateLimiter.getMinPauseCheckBytes();
}
@Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
bytesSinceLastPause += len;
if (bytesSinceLastPause > rateLimiter.getMinPauseCheckBytes()) {
rateLimiter.pause(bytesSinceLastPause);
bytesSinceLastPause = 0;
}
if (bufferedDelegate != null) {
bufferedDelegate.flushBuffer(b, offset, len);
} else {
delegate.writeBytes(b, offset, len);
}
public void close() throws IOException {
delegate.close();
}
@Override
public void close() throws IOException {
try {
super.close();
} finally {
delegate.close();
}
public long getFilePointer() {
return delegate.getFilePointer();
}
@Override
public long getChecksum() throws IOException {
return delegate.getChecksum();
}
@Override
public void writeByte(byte b) throws IOException {
bytesSinceLastPause++;
checkRate();
delegate.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
bytesSinceLastPause += length;
checkRate();
delegate.writeBytes(b, offset, length);
}
private void checkRate() {
if (bytesSinceLastPause > currentMinPauseCheckBytes) {
rateLimiter.pause(bytesSinceLastPause);
bytesSinceLastPause = 0;
currentMinPauseCheckBytes = rateLimiter.getMinPauseCheckBytes();
}
}
}

View File

@ -524,10 +524,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
}
//System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
IndexOutput delegateOutput = in.createOutput(name, LuceneTestCase.newIOContext(randomState, context));
if (randomState.nextInt(10) == 0){
// once in a while wrap the IO in a Buffered IO with random buffer sizes
delegateOutput = new BufferedIndexOutputWrapper(1+randomState.nextInt(BufferedIndexOutput.DEFAULT_BUFFER_SIZE), delegateOutput);
}
final IndexOutput io = new MockIndexOutputWrapper(this, delegateOutput, name);
addFileHandle(io, name, Handle.Output);
openFilesForWrite.add(name);
@ -947,29 +943,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
in.copy(to, src, dest, context);
}
final class BufferedIndexOutputWrapper extends BufferedIndexOutput {
private final IndexOutput io;
public BufferedIndexOutputWrapper(int bufferSize, IndexOutput io) {
super(bufferSize);
this.io = io;
}
@Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
io.writeBytes(b, offset, len);
}
@Override
public void close() throws IOException {
try {
super.close();
} finally {
io.close();
}
}
}
/** Use this when throwing fake {@code IOException},
* e.g. from {@link MockDirectoryWrapper.Failure}. */
public static class FakeIOException extends IOException {

View File

@ -30,12 +30,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.BufferedIndexOutput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.store.blockcache.CustomBufferedIndexInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -99,14 +97,11 @@ public class HdfsDirectory extends BaseDirectory {
}
@Override
public IndexOutput createOutput(String name, IOContext context)
throws IOException {
public IndexOutput createOutput(String name, IOContext context) throws IOException {
if (SEGMENTS_GEN.equals(name)) {
return new NullIndexOutput();
}
HdfsFileWriter writer = new HdfsFileWriter(getFileSystem(), new Path(
hdfsDirPath, name));
return new HdfsIndexOutput(writer);
return new HdfsFileWriter(getFileSystem(), new Path(hdfsDirPath, name));
}
private String[] getNormalNames(List<String> files) {
@ -233,36 +228,6 @@ public class HdfsDirectory extends BaseDirectory {
}
}
static class HdfsIndexOutput extends BufferedIndexOutput {
private HdfsFileWriter writer;
public HdfsIndexOutput(HdfsFileWriter writer) {
this.writer = writer;
}
@Override
public void close() throws IOException {
boolean success = false;
try {
super.close();
success = true;
} finally {
if (success) {
IOUtils.close(writer);
} else {
IOUtils.closeWhileHandlingException(writer);
}
}
}
@Override
protected void flushBuffer(byte[] b, int offset, int len)
throws IOException {
writer.writeBytes(b, offset, len);
}
}
@Override
public void sync(Collection<String> names) throws IOException {
LOG.debug("Sync called on {}", Arrays.toString(names.toArray()));

View File

@ -17,37 +17,31 @@ package org.apache.solr.store.hdfs;
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.lucene.store.DataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.lucene.store.OutputStreamIndexOutput;
/**
* @lucene.experimental
*/
public class HdfsFileWriter extends DataOutput implements Closeable {
public static Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class);
public class HdfsFileWriter extends OutputStreamIndexOutput {
public static final String HDFS_SYNC_BLOCK = "solr.hdfs.sync.block";
private final Path path;
private FSDataOutputStream outputStream;
private long currentPosition;
public static final int BUFFER_SIZE = 16384;
public HdfsFileWriter(FileSystem fileSystem, Path path) throws IOException {
LOG.debug("Creating writer on {}", path);
this.path = path;
super(getOutputStream(fileSystem, path), BUFFER_SIZE);
}
private static final OutputStream getOutputStream(FileSystem fileSystem, Path path) throws IOException {
Configuration conf = fileSystem.getConf();
FsServerDefaults fsDefaults = fileSystem.getServerDefaults(path);
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE,
@ -55,45 +49,9 @@ public class HdfsFileWriter extends DataOutput implements Closeable {
if (Boolean.getBoolean(HDFS_SYNC_BLOCK)) {
flags.add(CreateFlag.SYNC_BLOCK);
}
outputStream = fileSystem.create(path, FsPermission.getDefault()
return fileSystem.create(path, FsPermission.getDefault()
.applyUMask(FsPermission.getUMask(conf)), flags, fsDefaults
.getFileBufferSize(), fsDefaults.getReplication(), fsDefaults
.getBlockSize(), null);
}
public long length() {
return currentPosition;
}
public void seek(long pos) throws IOException {
LOG.error("Invalid seek called on {}", path);
throw new IOException("Seek not supported");
}
public void flush() throws IOException {
// flush to the network, not guarantees it makes it to the DN (vs hflush)
outputStream.flush();
LOG.debug("Flushed file {}", path);
}
public void close() throws IOException {
outputStream.close();
LOG.debug("Closed writer on {}", path);
}
@Override
public void writeByte(byte b) throws IOException {
outputStream.write(b & 0xFF);
currentPosition++;
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
outputStream.write(b, offset, length);
currentPosition += length;
}
public long getPosition() {
return currentPosition;
}
}