HDFS-7698. Fix locking on HDFS read statistics and add a method for clearing them. (Colin P. McCabe via yliu)
This commit is contained in:
parent
3ffe5a1ed2
commit
44d800b353
|
@ -566,6 +566,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7709. Fix findbug warnings in httpfs. (Rakesh R via ozawa)
|
HDFS-7709. Fix findbug warnings in httpfs. (Rakesh R via ozawa)
|
||||||
|
|
||||||
|
HDFS-7698. Fix locking on HDFS read statistics and add a method for
|
||||||
|
clearing them. (Colin P. McCabe via yliu)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -131,10 +131,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
|
|
||||||
public static class ReadStatistics {
|
public static class ReadStatistics {
|
||||||
public ReadStatistics() {
|
public ReadStatistics() {
|
||||||
this.totalBytesRead = 0;
|
clear();
|
||||||
this.totalLocalBytesRead = 0;
|
|
||||||
this.totalShortCircuitBytesRead = 0;
|
|
||||||
this.totalZeroCopyBytesRead = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReadStatistics(ReadStatistics rhs) {
|
public ReadStatistics(ReadStatistics rhs) {
|
||||||
|
@ -203,6 +200,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
this.totalShortCircuitBytesRead += amt;
|
this.totalShortCircuitBytesRead += amt;
|
||||||
this.totalZeroCopyBytesRead += amt;
|
this.totalZeroCopyBytesRead += amt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void clear() {
|
||||||
|
this.totalBytesRead = 0;
|
||||||
|
this.totalLocalBytesRead = 0;
|
||||||
|
this.totalShortCircuitBytesRead = 0;
|
||||||
|
this.totalZeroCopyBytesRead = 0;
|
||||||
|
}
|
||||||
|
|
||||||
private long totalBytesRead;
|
private long totalBytesRead;
|
||||||
|
|
||||||
|
@ -412,7 +416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
/**
|
/**
|
||||||
* Return collection of blocks that has already been located.
|
* Return collection of blocks that has already been located.
|
||||||
*/
|
*/
|
||||||
public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
|
public List<LocatedBlock> getAllBlocks() throws IOException {
|
||||||
return getBlockRange(0, getFileLength());
|
return getBlockRange(0, getFileLength());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -700,26 +704,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
* strategy-agnostic.
|
* strategy-agnostic.
|
||||||
*/
|
*/
|
||||||
private interface ReaderStrategy {
|
private interface ReaderStrategy {
|
||||||
public int doRead(BlockReader blockReader, int off, int len,
|
public int doRead(BlockReader blockReader, int off, int len)
|
||||||
ReadStatistics readStatistics) throws ChecksumException, IOException;
|
throws ChecksumException, IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void updateReadStatistics(ReadStatistics readStatistics,
|
private void updateReadStatistics(ReadStatistics readStatistics,
|
||||||
int nRead, BlockReader blockReader) {
|
int nRead, BlockReader blockReader) {
|
||||||
if (nRead <= 0) return;
|
if (nRead <= 0) return;
|
||||||
if (blockReader.isShortCircuit()) {
|
synchronized(infoLock) {
|
||||||
readStatistics.addShortCircuitBytes(nRead);
|
if (blockReader.isShortCircuit()) {
|
||||||
} else if (blockReader.isLocal()) {
|
readStatistics.addShortCircuitBytes(nRead);
|
||||||
readStatistics.addLocalBytes(nRead);
|
} else if (blockReader.isLocal()) {
|
||||||
} else {
|
readStatistics.addLocalBytes(nRead);
|
||||||
readStatistics.addRemoteBytes(nRead);
|
} else {
|
||||||
|
readStatistics.addRemoteBytes(nRead);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to read bytes into a byte[]
|
* Used to read bytes into a byte[]
|
||||||
*/
|
*/
|
||||||
private static class ByteArrayStrategy implements ReaderStrategy {
|
private class ByteArrayStrategy implements ReaderStrategy {
|
||||||
final byte[] buf;
|
final byte[] buf;
|
||||||
|
|
||||||
public ByteArrayStrategy(byte[] buf) {
|
public ByteArrayStrategy(byte[] buf) {
|
||||||
|
@ -727,26 +733,26 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int doRead(BlockReader blockReader, int off, int len,
|
public int doRead(BlockReader blockReader, int off, int len)
|
||||||
ReadStatistics readStatistics) throws ChecksumException, IOException {
|
throws ChecksumException, IOException {
|
||||||
int nRead = blockReader.read(buf, off, len);
|
int nRead = blockReader.read(buf, off, len);
|
||||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||||
return nRead;
|
return nRead;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to read bytes into a user-supplied ByteBuffer
|
* Used to read bytes into a user-supplied ByteBuffer
|
||||||
*/
|
*/
|
||||||
private static class ByteBufferStrategy implements ReaderStrategy {
|
private class ByteBufferStrategy implements ReaderStrategy {
|
||||||
final ByteBuffer buf;
|
final ByteBuffer buf;
|
||||||
ByteBufferStrategy(ByteBuffer buf) {
|
ByteBufferStrategy(ByteBuffer buf) {
|
||||||
this.buf = buf;
|
this.buf = buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int doRead(BlockReader blockReader, int off, int len,
|
public int doRead(BlockReader blockReader, int off, int len)
|
||||||
ReadStatistics readStatistics) throws ChecksumException, IOException {
|
throws ChecksumException, IOException {
|
||||||
int oldpos = buf.position();
|
int oldpos = buf.position();
|
||||||
int oldlimit = buf.limit();
|
int oldlimit = buf.limit();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
@ -785,7 +791,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
while (true) {
|
while (true) {
|
||||||
// retry as many times as seekToNewSource allows.
|
// retry as many times as seekToNewSource allows.
|
||||||
try {
|
try {
|
||||||
return reader.doRead(blockReader, off, len, readStatistics);
|
return reader.doRead(blockReader, off, len);
|
||||||
} catch ( ChecksumException ce ) {
|
} catch ( ChecksumException ce ) {
|
||||||
DFSClient.LOG.warn("Found Checksum error for "
|
DFSClient.LOG.warn("Found Checksum error for "
|
||||||
+ getCurrentBlock() + " from " + currentNode
|
+ getCurrentBlock() + " from " + currentNode
|
||||||
|
@ -1612,8 +1618,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
/**
|
/**
|
||||||
* Get statistics about the reads which this DFSInputStream has done.
|
* Get statistics about the reads which this DFSInputStream has done.
|
||||||
*/
|
*/
|
||||||
public synchronized ReadStatistics getReadStatistics() {
|
public ReadStatistics getReadStatistics() {
|
||||||
return new ReadStatistics(readStatistics);
|
synchronized(infoLock) {
|
||||||
|
return new ReadStatistics(readStatistics);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear statistics about the reads which this DFSInputStream has done.
|
||||||
|
*/
|
||||||
|
public void clearReadStatistics() {
|
||||||
|
synchronized(infoLock) {
|
||||||
|
readStatistics.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileEncryptionInfo getFileEncryptionInfo() {
|
public FileEncryptionInfo getFileEncryptionInfo() {
|
||||||
|
@ -1775,7 +1792,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
buffer.position((int)blockPos);
|
buffer.position((int)blockPos);
|
||||||
buffer.limit((int)(blockPos + length));
|
buffer.limit((int)(blockPos + length));
|
||||||
extendedReadBuffers.put(buffer, clientMmap);
|
extendedReadBuffers.put(buffer, clientMmap);
|
||||||
readStatistics.addZeroCopyBytes(length);
|
synchronized (infoLock) {
|
||||||
|
readStatistics.addZeroCopyBytes(length);
|
||||||
|
}
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("readZeroCopy read " + length +
|
DFSClient.LOG.debug("readZeroCopy read " + length +
|
||||||
" bytes from offset " + curPos + " via the zero-copy read " +
|
" bytes from offset " + curPos + " via the zero-copy read " +
|
||||||
|
|
|
@ -103,7 +103,11 @@ public class HdfsDataInputStream extends FSDataInputStream {
|
||||||
* be higher than you would expect just by adding up the number of
|
* be higher than you would expect just by adding up the number of
|
||||||
* bytes read through HdfsDataInputStream.
|
* bytes read through HdfsDataInputStream.
|
||||||
*/
|
*/
|
||||||
public synchronized DFSInputStream.ReadStatistics getReadStatistics() {
|
public DFSInputStream.ReadStatistics getReadStatistics() {
|
||||||
return getDFSInputStream().getReadStatistics();
|
return getDFSInputStream().getReadStatistics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void clearReadStatistics() {
|
||||||
|
getDFSInputStream().clearReadStatistics();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,11 @@ static const struct ExceptionInfo gExceptionInfo[] = {
|
||||||
0,
|
0,
|
||||||
EDQUOT,
|
EDQUOT,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"java.lang.UnsupportedOperationException",
|
||||||
|
0,
|
||||||
|
ENOTSUP,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
|
"org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
|
||||||
0,
|
0,
|
||||||
|
|
|
@ -181,7 +181,38 @@ done:
|
||||||
int64_t hdfsReadStatisticsGetRemoteBytesRead(
|
int64_t hdfsReadStatisticsGetRemoteBytesRead(
|
||||||
const struct hdfsReadStatistics *stats)
|
const struct hdfsReadStatistics *stats)
|
||||||
{
|
{
|
||||||
return stats->totalBytesRead - stats->totalLocalBytesRead;
|
return stats->totalBytesRead - stats->totalLocalBytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
int hdfsFileClearReadStatistics(hdfsFile file)
|
||||||
|
{
|
||||||
|
jthrowable jthr;
|
||||||
|
int ret;
|
||||||
|
JNIEnv* env = getJNIEnv();
|
||||||
|
|
||||||
|
if (env == NULL) {
|
||||||
|
errno = EINTERNAL;
|
||||||
|
return EINTERNAL;
|
||||||
|
}
|
||||||
|
if (file->type != HDFS_STREAM_INPUT) {
|
||||||
|
ret = EINVAL;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jthr = invokeMethod(env, NULL, INSTANCE, file->file,
|
||||||
|
"org/apache/hadoop/hdfs/client/HdfsDataInputStream",
|
||||||
|
"clearReadStatistics", "()V");
|
||||||
|
if (jthr) {
|
||||||
|
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||||
|
"hdfsFileClearReadStatistics: clearReadStatistics failed");
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
ret = 0;
|
||||||
|
done:
|
||||||
|
if (ret) {
|
||||||
|
errno = ret;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
|
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
|
||||||
|
|
|
@ -118,6 +118,19 @@ extern "C" {
|
||||||
int64_t hdfsReadStatisticsGetRemoteBytesRead(
|
int64_t hdfsReadStatisticsGetRemoteBytesRead(
|
||||||
const struct hdfsReadStatistics *stats);
|
const struct hdfsReadStatistics *stats);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear the read statistics for a file.
|
||||||
|
*
|
||||||
|
* @param file The file to clear the read statistics of.
|
||||||
|
*
|
||||||
|
* @return 0 on success; the error code otherwise.
|
||||||
|
* EINVAL: the file is not open for reading.
|
||||||
|
* ENOTSUP: the file does not support clearing the read
|
||||||
|
* statistics.
|
||||||
|
* Errno will also be set to this code on failure.
|
||||||
|
*/
|
||||||
|
int hdfsFileClearReadStatistics(hdfsFile file);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free some HDFS read statistics.
|
* Free some HDFS read statistics.
|
||||||
*
|
*
|
||||||
|
|
|
@ -205,6 +205,10 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
|
||||||
errno = 0;
|
errno = 0;
|
||||||
EXPECT_UINT64_EQ((uint64_t)expected, readStats->totalBytesRead);
|
EXPECT_UINT64_EQ((uint64_t)expected, readStats->totalBytesRead);
|
||||||
hdfsFileFreeReadStatistics(readStats);
|
hdfsFileFreeReadStatistics(readStats);
|
||||||
|
EXPECT_ZERO(hdfsFileClearReadStatistics(file));
|
||||||
|
EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
|
||||||
|
EXPECT_UINT64_EQ((uint64_t)0, readStats->totalBytesRead);
|
||||||
|
hdfsFileFreeReadStatistics(readStats);
|
||||||
EXPECT_ZERO(memcmp(paths->prefix, tmp, expected));
|
EXPECT_ZERO(memcmp(paths->prefix, tmp, expected));
|
||||||
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue