HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575109 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
591be56052
commit
173c115951
|
@ -50,13 +50,15 @@ public interface HasEnhancedByteBufferAccess {
|
|||
* Options to use when reading.
|
||||
*
|
||||
* @return
|
||||
* We will return null on EOF (and only on EOF).
|
||||
* Otherwise, we will return a direct ByteBuffer containing at
|
||||
* least one byte. You must free this ByteBuffer when you are
|
||||
* done with it by calling releaseBuffer on it.
|
||||
* The buffer will continue to be readable until it is released
|
||||
* in this manner. However, the input stream's close method may
|
||||
* warn about unclosed buffers.
|
||||
* We will always return an empty buffer if maxLength was 0,
|
||||
* whether or not we are at EOF.
|
||||
* If maxLength > 0, we will return null if the stream has
|
||||
* reached EOF.
|
||||
* Otherwise, we will return a ByteBuffer containing at least one
|
||||
* byte. You must free this ByteBuffer when you are done with it
|
||||
* by calling releaseBuffer on it. The buffer will continue to be
|
||||
* readable until it is released in this manner. However, the
|
||||
* input stream's close method may warn about unclosed buffers.
|
||||
* @throws
|
||||
* IOException: if there was an error reading.
|
||||
* UnsupportedOperationException: if factory was null, and we
|
||||
|
@ -76,4 +78,4 @@ public interface HasEnhancedByteBufferAccess {
|
|||
* The ByteBuffer to release.
|
||||
*/
|
||||
public void releaseBuffer(ByteBuffer buffer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -711,6 +711,9 @@ Release 2.4.0 - UNRELEASED
|
|||
|
||||
HDFS-6067. TestPread.testMaxOutHedgedReadPool is flaky (cmccabe)
|
||||
|
||||
HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR
|
||||
(cmccabe)
|
||||
|
||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||
|
|
|
@ -1556,13 +1556,27 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
closeCurrentBlockReader();
|
||||
}
|
||||
|
||||
/**
|
||||
* The immutable empty buffer we return when we reach EOF when doing a
|
||||
* zero-copy read.
|
||||
*/
|
||||
private static final ByteBuffer EMPTY_BUFFER =
|
||||
ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
|
||||
|
||||
@Override
|
||||
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
|
||||
int maxLength, EnumSet<ReadOption> opts)
|
||||
throws IOException, UnsupportedOperationException {
|
||||
assert(maxLength > 0);
|
||||
if (((blockReader == null) || (blockEnd == -1)) &&
|
||||
(pos < getFileLength())) {
|
||||
if (maxLength == 0) {
|
||||
return EMPTY_BUFFER;
|
||||
} else if (maxLength < 0) {
|
||||
throw new IllegalArgumentException("can't read a negative " +
|
||||
"number of bytes.");
|
||||
}
|
||||
if ((blockReader == null) || (blockEnd == -1)) {
|
||||
if (pos >= getFileLength()) {
|
||||
return null;
|
||||
}
|
||||
/*
|
||||
* If we don't have a blockReader, or the one we have has no more bytes
|
||||
* left to read, we call seekToBlockSource to get a new blockReader and
|
||||
|
@ -1645,6 +1659,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
|
||||
@Override
|
||||
public synchronized void releaseBuffer(ByteBuffer buffer) {
|
||||
if (buffer == EMPTY_BUFFER) return;
|
||||
Object val = extendedReadBuffers.remove(buffer);
|
||||
if (val == null) {
|
||||
throw new IllegalArgumentException("tried to release a buffer " +
|
||||
|
|
|
@ -277,7 +277,11 @@ public class ShortCircuitReplica {
|
|||
MappedByteBuffer loadMmapInternal() {
|
||||
try {
|
||||
FileChannel channel = dataStream.getChannel();
|
||||
return channel.map(MapMode.READ_ONLY, 0, channel.size());
|
||||
MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, channel.size());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + ": created mmap of size " + channel.size());
|
||||
}
|
||||
return mmap;
|
||||
} catch (IOException e) {
|
||||
LOG.warn(this + ": mmap error", e);
|
||||
return null;
|
||||
|
|
|
@ -38,7 +38,7 @@ struct hdfsFile_internal;
|
|||
|
||||
#define EXPECT_NULL(x) \
|
||||
do { \
|
||||
void* __my_ret__ = x; \
|
||||
const void* __my_ret__ = x; \
|
||||
int __my_errno__ = errno; \
|
||||
if (__my_ret__ != NULL) { \
|
||||
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
|
||||
|
@ -50,7 +50,7 @@ struct hdfsFile_internal;
|
|||
|
||||
#define EXPECT_NONNULL(x) \
|
||||
do { \
|
||||
void* __my_ret__ = x; \
|
||||
const void* __my_ret__ = x; \
|
||||
int __my_errno__ = errno; \
|
||||
if (__my_ret__ == NULL) { \
|
||||
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
|
||||
|
|
|
@ -746,12 +746,16 @@ extern "C" {
|
|||
* @param maxLength The maximum length to read. We may read fewer bytes
|
||||
* than this length.
|
||||
*
|
||||
* @return On success, returns a new hadoopRzBuffer.
|
||||
* @return On success, we will return a new hadoopRzBuffer.
|
||||
* This buffer will continue to be valid and readable
|
||||
* until it is released by readZeroBufferFree. Failure to
|
||||
* release a buffer will lead to a memory leak.
|
||||
* You can access the data within the hadoopRzBuffer with
|
||||
* hadoopRzBufferGet. If you have reached EOF, the data
|
||||
* within the hadoopRzBuffer will be NULL. You must still
|
||||
* free hadoopRzBuffer instances containing NULL.
|
||||
*
|
||||
* NULL plus an errno code on an error.
|
||||
* On failure, we will return NULL plus an errno code.
|
||||
* errno = EOPNOTSUPP indicates that we could not do a
|
||||
* zero-copy read, and there was no ByteBufferPool
|
||||
* supplied.
|
||||
|
|
|
@ -38,6 +38,9 @@
|
|||
#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
|
||||
#define TEST_ZEROCOPY_NUM_BLOCKS 6
|
||||
#define SMALL_READ_LEN 16
|
||||
#define TEST_ZEROCOPY_FILE_LEN \
|
||||
(((TEST_ZEROCOPY_NUM_BLOCKS - 1) * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + \
|
||||
TEST_ZEROCOPY_LAST_BLOCK_SIZE)
|
||||
|
||||
#define ZC_BUF_LEN 32768
|
||||
|
||||
|
@ -165,6 +168,22 @@ static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
|
|||
EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
|
||||
(TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
|
||||
hadoopRzBufferFree(file, buffer);
|
||||
|
||||
/* Check the result of a zero-length read. */
|
||||
buffer = hadoopReadZero(file, opts, 0);
|
||||
EXPECT_NONNULL(buffer);
|
||||
EXPECT_NONNULL(hadoopRzBufferGet(buffer));
|
||||
EXPECT_INT_EQ(0, hadoopRzBufferLength(buffer));
|
||||
hadoopRzBufferFree(file, buffer);
|
||||
|
||||
/* Check the result of reading past EOF */
|
||||
EXPECT_INT_EQ(0, hdfsSeek(fs, file, TEST_ZEROCOPY_FILE_LEN));
|
||||
buffer = hadoopReadZero(file, opts, 1);
|
||||
EXPECT_NONNULL(buffer);
|
||||
EXPECT_NULL(hadoopRzBufferGet(buffer));
|
||||
hadoopRzBufferFree(file, buffer);
|
||||
|
||||
/* Cleanup */
|
||||
free(block);
|
||||
hadoopRzOptionsFree(opts);
|
||||
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
||||
|
|
|
@ -753,6 +753,10 @@ public class TestEnhancedByteBufferAccess {
|
|||
fsIn = fs.open(TEST_PATH);
|
||||
ByteBuffer buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||
fsIn.releaseBuffer(buf);
|
||||
// Test EOF behavior
|
||||
IOUtils.skipFully(fsIn, TEST_FILE_LENGTH - 1);
|
||||
buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||
Assert.assertEquals(null, buf);
|
||||
} finally {
|
||||
if (fsIn != null) fsIn.close();
|
||||
if (fs != null) fs.close();
|
||||
|
|
Loading…
Reference in New Issue