From 44d800b353a56dd43c793b70f6a08146d38d4dd5 Mon Sep 17 00:00:00 2001 From: yliu Date: Thu, 5 Feb 2015 23:57:36 +0800 Subject: [PATCH] HDFS-7698. Fix locking on HDFS read statistics and add a method for clearing them. (Colin P. McCabe via yliu) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/DFSInputStream.java | 73 ++++++++++++------- .../hdfs/client/HdfsDataInputStream.java | 6 +- .../src/main/native/libhdfs/exception.c | 5 ++ .../src/main/native/libhdfs/hdfs.c | 33 ++++++++- .../src/main/native/libhdfs/hdfs.h | 13 ++++ .../native/libhdfs/test_libhdfs_threaded.c | 4 + 7 files changed, 108 insertions(+), 29 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 767bb51008f..fe9ddb6ab0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -566,6 +566,9 @@ Release 2.7.0 - UNRELEASED HDFS-7709. Fix findbug warnings in httpfs. (Rakesh R via ozawa) + HDFS-7698. Fix locking on HDFS read statistics and add a method for + clearing them. (Colin P. McCabe via yliu) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index c9b86d0741b..9e7533348d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -131,10 +131,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public static class ReadStatistics { public ReadStatistics() { - this.totalBytesRead = 0; - this.totalLocalBytesRead = 0; - this.totalShortCircuitBytesRead = 0; - this.totalZeroCopyBytesRead = 0; + clear(); } public ReadStatistics(ReadStatistics rhs) { @@ -203,6 +200,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, this.totalShortCircuitBytesRead += amt; this.totalZeroCopyBytesRead += amt; } + + void clear() { + this.totalBytesRead = 0; + this.totalLocalBytesRead = 0; + this.totalShortCircuitBytesRead = 0; + this.totalZeroCopyBytesRead = 0; + } private long totalBytesRead; @@ -412,7 +416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Return collection of blocks that has already been located. */ - public synchronized List getAllBlocks() throws IOException { + public List getAllBlocks() throws IOException { return getBlockRange(0, getFileLength()); } @@ -700,26 +704,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * strategy-agnostic. */ private interface ReaderStrategy { - public int doRead(BlockReader blockReader, int off, int len, - ReadStatistics readStatistics) throws ChecksumException, IOException; + public int doRead(BlockReader blockReader, int off, int len) + throws ChecksumException, IOException; } - private static void updateReadStatistics(ReadStatistics readStatistics, + private void updateReadStatistics(ReadStatistics readStatistics, int nRead, BlockReader blockReader) { if (nRead <= 0) return; - if (blockReader.isShortCircuit()) { - readStatistics.addShortCircuitBytes(nRead); - } else if (blockReader.isLocal()) { - readStatistics.addLocalBytes(nRead); - } else { - readStatistics.addRemoteBytes(nRead); + synchronized(infoLock) { + if (blockReader.isShortCircuit()) { + readStatistics.addShortCircuitBytes(nRead); + } else if (blockReader.isLocal()) { + readStatistics.addLocalBytes(nRead); + } else { + readStatistics.addRemoteBytes(nRead); + } } } /** * Used to read bytes into a byte[] */ - private static class ByteArrayStrategy implements ReaderStrategy { + private class ByteArrayStrategy implements ReaderStrategy { final byte[] buf; public ByteArrayStrategy(byte[] buf) { @@ -727,26 +733,26 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } @Override - public int doRead(BlockReader blockReader, int off, int len, - ReadStatistics readStatistics) throws ChecksumException, IOException { - int nRead = blockReader.read(buf, off, len); - updateReadStatistics(readStatistics, nRead, blockReader); - return nRead; + public int doRead(BlockReader blockReader, int off, int len) + throws ChecksumException, IOException { + int nRead = blockReader.read(buf, off, len); + updateReadStatistics(readStatistics, nRead, blockReader); + return nRead; } } /** * Used to read bytes into a user-supplied ByteBuffer */ - private static class ByteBufferStrategy implements ReaderStrategy { + private class ByteBufferStrategy implements ReaderStrategy { final ByteBuffer buf; ByteBufferStrategy(ByteBuffer buf) { this.buf = buf; } @Override - public int doRead(BlockReader blockReader, int off, int len, - ReadStatistics readStatistics) throws ChecksumException, IOException { + public int doRead(BlockReader blockReader, int off, int len) + throws ChecksumException, IOException { int oldpos = buf.position(); int oldlimit = buf.limit(); boolean success = false; @@ -785,7 +791,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, while (true) { // retry as many times as seekToNewSource allows. try { - return reader.doRead(blockReader, off, len, readStatistics); + return reader.doRead(blockReader, off, len); } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode @@ -1612,8 +1618,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Get statistics about the reads which this DFSInputStream has done. */ - public synchronized ReadStatistics getReadStatistics() { - return new ReadStatistics(readStatistics); + public ReadStatistics getReadStatistics() { + synchronized(infoLock) { + return new ReadStatistics(readStatistics); + } + } + + /** + * Clear statistics about the reads which this DFSInputStream has done. + */ + public void clearReadStatistics() { + synchronized(infoLock) { + readStatistics.clear(); + } } public FileEncryptionInfo getFileEncryptionInfo() { @@ -1775,7 +1792,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, buffer.position((int)blockPos); buffer.limit((int)(blockPos + length)); extendedReadBuffers.put(buffer, clientMmap); - readStatistics.addZeroCopyBytes(length); + synchronized (infoLock) { + readStatistics.addZeroCopyBytes(length); + } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("readZeroCopy read " + length + " bytes from offset " + curPos + " via the zero-copy read " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java index e1269c49a3b..72c57a860bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java @@ -103,7 +103,11 @@ public class HdfsDataInputStream extends FSDataInputStream { * be higher than you would expect just by adding up the number of * bytes read through HdfsDataInputStream. */ - public synchronized DFSInputStream.ReadStatistics getReadStatistics() { + public DFSInputStream.ReadStatistics getReadStatistics() { return getDFSInputStream().getReadStatistics(); } + + public void clearReadStatistics() { + getDFSInputStream().clearReadStatistics(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c index 2373aa78024..eb7115c5684 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c @@ -79,6 +79,11 @@ static const struct ExceptionInfo gExceptionInfo[] = { 0, EDQUOT, }, + { + "java.lang.UnsupportedOperationException", + 0, + ENOTSUP, + }, { "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException", 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c index 21f9c2ba970..6e6dd216473 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c @@ -181,7 +181,38 @@ done: int64_t hdfsReadStatisticsGetRemoteBytesRead( const struct hdfsReadStatistics *stats) { - return stats->totalBytesRead - stats->totalLocalBytesRead; + return stats->totalBytesRead - stats->totalLocalBytesRead; +} + +int hdfsFileClearReadStatistics(hdfsFile file) +{ + jthrowable jthr; + int ret; + JNIEnv* env = getJNIEnv(); + + if (env == NULL) { + errno = EINTERNAL; + return EINTERNAL; + } + if (file->type != HDFS_STREAM_INPUT) { + ret = EINVAL; + goto done; + } + jthr = invokeMethod(env, NULL, INSTANCE, file->file, + "org/apache/hadoop/hdfs/client/HdfsDataInputStream", + "clearReadStatistics", "()V"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsFileClearReadStatistics: clearReadStatistics failed"); + goto done; + } + ret = 0; +done: + if (ret) { + errno = ret; + return ret; + } + return 0; } void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h index 0625da37591..072051f2614 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h @@ -118,6 +118,19 @@ extern "C" { int64_t hdfsReadStatisticsGetRemoteBytesRead( const struct hdfsReadStatistics *stats); + /** + * Clear the read statistics for a file. + * + * @param file The file to clear the read statistics of. + * + * @return 0 on success; the error code otherwise. + * EINVAL: the file is not open for reading. + * ENOTSUP: the file does not support clearing the read + * statistics. + * Errno will also be set to this code on failure. + */ + int hdfsFileClearReadStatistics(hdfsFile file); + /** * Free some HDFS read statistics. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c index 016f0b19dde..17ff7a8367d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c @@ -205,6 +205,10 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs, errno = 0; EXPECT_UINT64_EQ((uint64_t)expected, readStats->totalBytesRead); hdfsFileFreeReadStatistics(readStats); + EXPECT_ZERO(hdfsFileClearReadStatistics(file)); + EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats)); + EXPECT_UINT64_EQ((uint64_t)0, readStats->totalBytesRead); + hdfsFileFreeReadStatistics(readStats); EXPECT_ZERO(memcmp(paths->prefix, tmp, expected)); EXPECT_ZERO(hdfsCloseFile(fs, file));