HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1575110 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a9acce33a7
commit
fca36dcb4e
|
@ -50,13 +50,15 @@ public interface HasEnhancedByteBufferAccess {
|
||||||
* Options to use when reading.
|
* Options to use when reading.
|
||||||
*
|
*
|
||||||
* @return
|
* @return
|
||||||
* We will return null on EOF (and only on EOF).
|
* We will always return an empty buffer if maxLength was 0,
|
||||||
* Otherwise, we will return a direct ByteBuffer containing at
|
* whether or not we are at EOF.
|
||||||
* least one byte. You must free this ByteBuffer when you are
|
* If maxLength > 0, we will return null if the stream has
|
||||||
* done with it by calling releaseBuffer on it.
|
* reached EOF.
|
||||||
* The buffer will continue to be readable until it is released
|
* Otherwise, we will return a ByteBuffer containing at least one
|
||||||
* in this manner. However, the input stream's close method may
|
* byte. You must free this ByteBuffer when you are done with it
|
||||||
* warn about unclosed buffers.
|
* by calling releaseBuffer on it. The buffer will continue to be
|
||||||
|
* readable until it is released in this manner. However, the
|
||||||
|
* input stream's close method may warn about unclosed buffers.
|
||||||
* @throws
|
* @throws
|
||||||
* IOException: if there was an error reading.
|
* IOException: if there was an error reading.
|
||||||
* UnsupportedOperationException: if factory was null, and we
|
* UnsupportedOperationException: if factory was null, and we
|
||||||
|
|
|
@ -323,6 +323,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6067. TestPread.testMaxOutHedgedReadPool is flaky (cmccabe)
|
HDFS-6067. TestPread.testMaxOutHedgedReadPool is flaky (cmccabe)
|
||||||
|
|
||||||
|
HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR
|
||||||
|
(cmccabe)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||||
|
|
|
@ -1556,13 +1556,27 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
closeCurrentBlockReader();
|
closeCurrentBlockReader();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The immutable empty buffer we return when we reach EOF when doing a
|
||||||
|
* zero-copy read.
|
||||||
|
*/
|
||||||
|
private static final ByteBuffer EMPTY_BUFFER =
|
||||||
|
ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
|
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
|
||||||
int maxLength, EnumSet<ReadOption> opts)
|
int maxLength, EnumSet<ReadOption> opts)
|
||||||
throws IOException, UnsupportedOperationException {
|
throws IOException, UnsupportedOperationException {
|
||||||
assert(maxLength > 0);
|
if (maxLength == 0) {
|
||||||
if (((blockReader == null) || (blockEnd == -1)) &&
|
return EMPTY_BUFFER;
|
||||||
(pos < getFileLength())) {
|
} else if (maxLength < 0) {
|
||||||
|
throw new IllegalArgumentException("can't read a negative " +
|
||||||
|
"number of bytes.");
|
||||||
|
}
|
||||||
|
if ((blockReader == null) || (blockEnd == -1)) {
|
||||||
|
if (pos >= getFileLength()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
/*
|
/*
|
||||||
* If we don't have a blockReader, or the one we have has no more bytes
|
* If we don't have a blockReader, or the one we have has no more bytes
|
||||||
* left to read, we call seekToBlockSource to get a new blockReader and
|
* left to read, we call seekToBlockSource to get a new blockReader and
|
||||||
|
@ -1645,6 +1659,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void releaseBuffer(ByteBuffer buffer) {
|
public synchronized void releaseBuffer(ByteBuffer buffer) {
|
||||||
|
if (buffer == EMPTY_BUFFER) return;
|
||||||
Object val = extendedReadBuffers.remove(buffer);
|
Object val = extendedReadBuffers.remove(buffer);
|
||||||
if (val == null) {
|
if (val == null) {
|
||||||
throw new IllegalArgumentException("tried to release a buffer " +
|
throw new IllegalArgumentException("tried to release a buffer " +
|
||||||
|
|
|
@ -277,7 +277,11 @@ public class ShortCircuitReplica {
|
||||||
MappedByteBuffer loadMmapInternal() {
|
MappedByteBuffer loadMmapInternal() {
|
||||||
try {
|
try {
|
||||||
FileChannel channel = dataStream.getChannel();
|
FileChannel channel = dataStream.getChannel();
|
||||||
return channel.map(MapMode.READ_ONLY, 0, channel.size());
|
MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, channel.size());
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": created mmap of size " + channel.size());
|
||||||
|
}
|
||||||
|
return mmap;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(this + ": mmap error", e);
|
LOG.warn(this + ": mmap error", e);
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -38,7 +38,7 @@ struct hdfsFile_internal;
|
||||||
|
|
||||||
#define EXPECT_NULL(x) \
|
#define EXPECT_NULL(x) \
|
||||||
do { \
|
do { \
|
||||||
void* __my_ret__ = x; \
|
const void* __my_ret__ = x; \
|
||||||
int __my_errno__ = errno; \
|
int __my_errno__ = errno; \
|
||||||
if (__my_ret__ != NULL) { \
|
if (__my_ret__ != NULL) { \
|
||||||
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
|
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
|
||||||
|
@ -50,7 +50,7 @@ struct hdfsFile_internal;
|
||||||
|
|
||||||
#define EXPECT_NONNULL(x) \
|
#define EXPECT_NONNULL(x) \
|
||||||
do { \
|
do { \
|
||||||
void* __my_ret__ = x; \
|
const void* __my_ret__ = x; \
|
||||||
int __my_errno__ = errno; \
|
int __my_errno__ = errno; \
|
||||||
if (__my_ret__ == NULL) { \
|
if (__my_ret__ == NULL) { \
|
||||||
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
|
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
|
||||||
|
|
|
@ -746,12 +746,16 @@ extern "C" {
|
||||||
* @param maxLength The maximum length to read. We may read fewer bytes
|
* @param maxLength The maximum length to read. We may read fewer bytes
|
||||||
* than this length.
|
* than this length.
|
||||||
*
|
*
|
||||||
* @return On success, returns a new hadoopRzBuffer.
|
* @return On success, we will return a new hadoopRzBuffer.
|
||||||
* This buffer will continue to be valid and readable
|
* This buffer will continue to be valid and readable
|
||||||
* until it is released by readZeroBufferFree. Failure to
|
* until it is released by readZeroBufferFree. Failure to
|
||||||
* release a buffer will lead to a memory leak.
|
* release a buffer will lead to a memory leak.
|
||||||
|
* You can access the data within the hadoopRzBuffer with
|
||||||
|
* hadoopRzBufferGet. If you have reached EOF, the data
|
||||||
|
* within the hadoopRzBuffer will be NULL. You must still
|
||||||
|
* free hadoopRzBuffer instances containing NULL.
|
||||||
*
|
*
|
||||||
* NULL plus an errno code on an error.
|
* On failure, we will return NULL plus an errno code.
|
||||||
* errno = EOPNOTSUPP indicates that we could not do a
|
* errno = EOPNOTSUPP indicates that we could not do a
|
||||||
* zero-copy read, and there was no ByteBufferPool
|
* zero-copy read, and there was no ByteBufferPool
|
||||||
* supplied.
|
* supplied.
|
||||||
|
|
|
@ -38,6 +38,9 @@
|
||||||
#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
|
#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
|
||||||
#define TEST_ZEROCOPY_NUM_BLOCKS 6
|
#define TEST_ZEROCOPY_NUM_BLOCKS 6
|
||||||
#define SMALL_READ_LEN 16
|
#define SMALL_READ_LEN 16
|
||||||
|
#define TEST_ZEROCOPY_FILE_LEN \
|
||||||
|
(((TEST_ZEROCOPY_NUM_BLOCKS - 1) * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + \
|
||||||
|
TEST_ZEROCOPY_LAST_BLOCK_SIZE)
|
||||||
|
|
||||||
#define ZC_BUF_LEN 32768
|
#define ZC_BUF_LEN 32768
|
||||||
|
|
||||||
|
@ -165,6 +168,22 @@ static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
|
||||||
EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
|
EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
|
||||||
(TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
|
(TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
|
||||||
hadoopRzBufferFree(file, buffer);
|
hadoopRzBufferFree(file, buffer);
|
||||||
|
|
||||||
|
/* Check the result of a zero-length read. */
|
||||||
|
buffer = hadoopReadZero(file, opts, 0);
|
||||||
|
EXPECT_NONNULL(buffer);
|
||||||
|
EXPECT_NONNULL(hadoopRzBufferGet(buffer));
|
||||||
|
EXPECT_INT_EQ(0, hadoopRzBufferLength(buffer));
|
||||||
|
hadoopRzBufferFree(file, buffer);
|
||||||
|
|
||||||
|
/* Check the result of reading past EOF */
|
||||||
|
EXPECT_INT_EQ(0, hdfsSeek(fs, file, TEST_ZEROCOPY_FILE_LEN));
|
||||||
|
buffer = hadoopReadZero(file, opts, 1);
|
||||||
|
EXPECT_NONNULL(buffer);
|
||||||
|
EXPECT_NULL(hadoopRzBufferGet(buffer));
|
||||||
|
hadoopRzBufferFree(file, buffer);
|
||||||
|
|
||||||
|
/* Cleanup */
|
||||||
free(block);
|
free(block);
|
||||||
hadoopRzOptionsFree(opts);
|
hadoopRzOptionsFree(opts);
|
||||||
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
||||||
|
|
|
@ -753,6 +753,10 @@ public class TestEnhancedByteBufferAccess {
|
||||||
fsIn = fs.open(TEST_PATH);
|
fsIn = fs.open(TEST_PATH);
|
||||||
ByteBuffer buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
ByteBuffer buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
fsIn.releaseBuffer(buf);
|
fsIn.releaseBuffer(buf);
|
||||||
|
// Test EOF behavior
|
||||||
|
IOUtils.skipFully(fsIn, TEST_FILE_LENGTH - 1);
|
||||||
|
buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
|
Assert.assertEquals(null, buf);
|
||||||
} finally {
|
} finally {
|
||||||
if (fsIn != null) fsIn.close();
|
if (fsIn != null) fsIn.close();
|
||||||
if (fs != null) fs.close();
|
if (fs != null) fs.close();
|
||||||
|
|
Loading…
Reference in New Issue