HDFS-4698. Provide client-side metrics for remote reads, local reads, and short-circuit reads. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481121 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-05-10 17:58:07 +00:00
parent 1161ceb2c9
commit 4ed1fc58c0
14 changed files with 411 additions and 7 deletions

View File

@ -844,6 +844,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4804. WARN when users set the block balanced preference percent below
0.5 or above 1.0. (Stephen Chu via atm)
HDFS-4698. Provide client-side metrics for remote reads, local reads, and
short-circuit reads. (Colin Patrick McCabe via atm)
OPTIMIZATIONS
BUG FIXES

View File

@ -1197,6 +1197,24 @@ int hdfsFileIsOpenForRead(hdfsFile file)
return (file->type == INPUT);
}
int hdfsFileGetReadStatistics(hdfsFile file,
struct hdfsReadStatistics **stats)
{
errno = ENOTSUP;
return -1;
}
int64_t hdfsReadStatisticsGetRemoteBytesRead(
const struct hdfsReadStatistics *stats)
{
return stats->totalBytesRead - stats->totalLocalBytesRead;
}
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
{
free(stats);
}
int hdfsFileIsOpenForWrite(hdfsFile file)
{
return (file->type == OUTPUT);

View File

@ -70,4 +70,15 @@ public interface BlockReader extends ByteBufferReadable {
* filled or the next call will return EOF.
*/
int readAll(byte[] buf, int offset, int len) throws IOException;
/**
* @return true only if this is a local read.
*/
boolean isLocal();
/**
* @return true only if this is a short-circuit read.
* All short-circuit reads are also local.
*/
boolean isShortCircuit();
}

View File

@ -531,4 +531,14 @@ public int available() throws IOException {
// We never do network I/O in BlockReaderLocal.
return Integer.MAX_VALUE;
}
@Override
public boolean isLocal() {
return true;
}
@Override
public boolean isShortCircuit() {
return true;
}
}

View File

@ -700,4 +700,14 @@ public int available() throws IOException {
// We never do network I/O in BlockReaderLocalLegacy.
return Integer.MAX_VALUE;
}
@Override
public boolean isLocal() {
return true;
}
@Override
public boolean isShortCircuit() {
return true;
}
}

View File

@ -81,7 +81,74 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
private LocatedBlock currentLocatedBlock = null;
private long pos = 0;
private long blockEnd = -1;
private final ReadStatistics readStatistics = new ReadStatistics();
public static class ReadStatistics {
public ReadStatistics() {
this.totalBytesRead = 0;
this.totalLocalBytesRead = 0;
this.totalShortCircuitBytesRead = 0;
}
public ReadStatistics(ReadStatistics rhs) {
this.totalBytesRead = rhs.getTotalBytesRead();
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
}
/**
* @return The total bytes read. This will always be at least as
* high as the other numbers, since it includes all of them.
*/
public long getTotalBytesRead() {
return totalBytesRead;
}
/**
* @return The total local bytes read. This will always be at least
* as high as totalShortCircuitBytesRead, since all short-circuit
* reads are also local.
*/
public long getTotalLocalBytesRead() {
return totalLocalBytesRead;
}
/**
* @return The total short-circuit local bytes read.
*/
public long getTotalShortCircuitBytesRead() {
return totalShortCircuitBytesRead;
}
/**
* @return The total number of bytes read which were not local.
*/
public long getRemoteBytesRead() {
return totalBytesRead - totalLocalBytesRead;
}
void addRemoteBytes(long amt) {
this.totalBytesRead += amt;
}
void addLocalBytes(long amt) {
this.totalBytesRead += amt;
this.totalLocalBytesRead += amt;
}
void addShortCircuitBytes(long amt) {
this.totalBytesRead += amt;
this.totalLocalBytesRead += amt;
this.totalShortCircuitBytesRead += amt;
}
private long totalBytesRead;
private long totalLocalBytesRead;
private long totalShortCircuitBytesRead;
}
private final FileInputStreamCache fileInputStreamCache;
/**
@ -546,9 +613,25 @@ public synchronized int read() throws IOException {
* strategy-agnostic.
*/
private interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException;
public int doRead(BlockReader blockReader, int off, int len,
ReadStatistics readStatistics) throws ChecksumException, IOException;
}
private static void updateReadStatistics(ReadStatistics readStatistics,
int nRead, BlockReader blockReader) {
if (nRead <= 0) return;
if (blockReader.isShortCircuit()) {
readStatistics.totalBytesRead += nRead;
readStatistics.totalLocalBytesRead += nRead;
readStatistics.totalShortCircuitBytesRead += nRead;
} else if (blockReader.isLocal()) {
readStatistics.totalBytesRead += nRead;
readStatistics.totalLocalBytesRead += nRead;
} else {
readStatistics.totalBytesRead += nRead;
}
}
/**
* Used to read bytes into a byte[]
*/
@ -560,8 +643,11 @@ public ByteArrayStrategy(byte[] buf) {
}
@Override
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
return blockReader.read(buf, off, len);
public int doRead(BlockReader blockReader, int off, int len,
ReadStatistics readStatistics) throws ChecksumException, IOException {
int nRead = blockReader.read(buf, off, len);
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
}
@ -575,13 +661,15 @@ private static class ByteBufferStrategy implements ReaderStrategy {
}
@Override
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
public int doRead(BlockReader blockReader, int off, int len,
ReadStatistics readStatistics) throws ChecksumException, IOException {
int oldpos = buf.position();
int oldlimit = buf.limit();
boolean success = false;
try {
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
return ret;
} finally {
if (!success) {
@ -613,7 +701,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
while (true) {
// retry as many times as seekToNewSource allows.
try {
return reader.doRead(blockReader, off, len);
return reader.doRead(blockReader, off, len, readStatistics);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
@ -1275,4 +1363,11 @@ static class DNAddrPair {
this.addr = addr;
}
}
/**
* Get statistics about the reads which this DFSInputStream has done.
*/
public synchronized ReadStatistics getReadStatistics() {
return new ReadStatistics(readStatistics);
}
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -78,6 +79,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
* at the beginning so that the read can begin on a chunk boundary.
*/
private final long bytesNeededToFinish;
/**
* True if we are reading from a local DataNode.
*/
private final boolean isLocal;
private boolean eos = false;
private boolean sentStatusCode = false;
@ -329,6 +335,9 @@ private RemoteBlockReader(String file, String bpid, long blockId,
checksum.getChecksumSize() > 0? checksum : null,
checksum.getBytesPerChecksum(),
checksum.getChecksumSize());
this.isLocal = DFSClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr()));
this.peer = peer;
this.datanodeID = datanodeID;
@ -477,4 +486,14 @@ public int available() throws IOException {
// to us without doing network I/O.
return DFSClient.TCP_WINDOW_SIZE;
}
@Override
public boolean isLocal() {
return isLocal;
}
@Override
public boolean isShortCircuit() {
return false;
}
}

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -106,6 +107,11 @@ public class RemoteBlockReader2 implements BlockReader {
*/
private long bytesNeededToFinish;
/**
* True if we are reading from a local DataNode.
*/
private final boolean isLocal;
private final boolean verifyChecksum;
private boolean sentStatusCode = false;
@ -255,6 +261,8 @@ protected RemoteBlockReader2(String file, String bpid, long blockId,
DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID, PeerCache peerCache) {
this.isLocal = DFSClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr()));
// Path is used only for printing block and file information in debug
this.peer = peer;
this.datanodeID = datanodeID;
@ -431,4 +439,14 @@ public int available() throws IOException {
// to us without doing network I/O.
return DFSClient.TCP_WINDOW_SIZE;
}
@Override
public boolean isLocal() {
return isLocal;
}
@Override
public boolean isShortCircuit() {
return false;
}
}

View File

@ -68,4 +68,14 @@ public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
public long getVisibleLength() throws IOException {
return ((DFSInputStream) in).getFileLength();
}
/**
* Get statistics about the reads which this DFSInputStream has done.
* Note that because HdfsDataInputStream is buffered, these stats may
* be higher than you would expect just by adding up the number of
* bytes read through HdfsDataInputStream.
*/
public synchronized DFSInputStream.ReadStatistics getReadStatistics() {
return ((DFSInputStream) in).getReadStatistics();
}
}

View File

@ -214,7 +214,7 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().toString(),
new DatanodeID(addr.getAddress().getHostAddress(),
addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
null, null, false);

View File

@ -81,6 +81,93 @@ int hdfsFileIsOpenForRead(hdfsFile file)
return (file->type == INPUT);
}
int hdfsFileGetReadStatistics(hdfsFile file,
struct hdfsReadStatistics **stats)
{
jthrowable jthr;
jobject readStats = NULL;
jvalue jVal;
struct hdfsReadStatistics *s = NULL;
int ret;
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
if (file->type != INPUT) {
ret = EINVAL;
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
"org/apache/hadoop/hdfs/client/HdfsDataInputStream",
"getReadStatistics",
"()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getReadStatistics failed");
goto done;
}
readStats = jVal.l;
s = malloc(sizeof(struct hdfsReadStatistics));
if (!s) {
ret = ENOMEM;
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalBytesRead failed");
goto done;
}
s->totalBytesRead = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalLocalBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalLocalBytesRead failed");
goto done;
}
s->totalLocalBytesRead = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalShortCircuitBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed");
goto done;
}
s->totalShortCircuitBytesRead = jVal.j;
*stats = s;
s = NULL;
ret = 0;
done:
destroyLocalReference(env, readStats);
free(s);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int64_t hdfsReadStatisticsGetRemoteBytesRead(
const struct hdfsReadStatistics *stats)
{
return stats->totalBytesRead - stats->totalLocalBytesRead;
}
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
{
free(stats);
}
int hdfsFileIsOpenForWrite(hdfsFile file)
{
return (file->type == OUTPUT);

View File

@ -81,6 +81,43 @@ extern "C" {
*/
int hdfsFileIsOpenForWrite(hdfsFile file);
struct hdfsReadStatistics {
uint64_t totalBytesRead;
uint64_t totalLocalBytesRead;
uint64_t totalShortCircuitBytesRead;
};
/**
* Get read statistics about a file. This is only applicable to files
* opened for reading.
*
* @param file The HDFS file
* @param stats (out parameter) on a successful return, the read
* statistics. Unchanged otherwise. You must free the
* returned statistics with hdfsFileFreeReadStatistics.
* @return 0 if the statistics were successfully returned,
* -1 otherwise. On a failure, please check errno against
* ENOTSUP. webhdfs, LocalFilesystem, and so forth may
* not support read statistics.
*/
int hdfsFileGetReadStatistics(hdfsFile file,
struct hdfsReadStatistics **stats);
/**
* @param stats HDFS read statistics for a file.
*
* @return the number of remote bytes read.
*/
int64_t hdfsReadStatisticsGetRemoteBytesRead(
const struct hdfsReadStatistics *stats);
/**
* Free some HDFS read statistics.
*
* @param stats The HDFS read statistics to free.
*/
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats);
/**
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
* Connect to the hdfs.

View File

@ -116,6 +116,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
hdfsFile file;
int ret, expected;
hdfsFileInfo *fileInfo;
struct hdfsReadStatistics *readStats = NULL;
snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx);
@ -157,6 +158,12 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
errno = 0;
EXPECT_ZERO(readStats->totalBytesRead);
EXPECT_ZERO(readStats->totalLocalBytesRead);
EXPECT_ZERO(readStats->totalShortCircuitBytesRead);
hdfsFileFreeReadStatistics(readStats);
/* TODO: implement readFully and use it here */
ret = hdfsRead(fs, file, tmp, sizeof(tmp));
if (ret < 0) {
@ -169,6 +176,10 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
"it read %d\n", ret, expected);
return EIO;
}
EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
errno = 0;
EXPECT_INT_EQ(expected, readStats->totalBytesRead);
hdfsFileFreeReadStatistics(readStats);
EXPECT_ZERO(memcmp(prefix, tmp, expected));
EXPECT_ZERO(hdfsCloseFile(fs, file));

View File

@ -25,14 +25,19 @@
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
public class TestBlockReaderLocal {
@ -339,11 +344,81 @@ public void doTest(BlockReaderLocal reader, byte original[])
}
}
}
@Test
public void testBlockReaderLocalReadCorrupt()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
}
@Test(timeout=60000)
public void TestStatisticsForShortCircuitLocalRead() throws Exception {
testStatistics(true);
}
@Test(timeout=60000)
public void TestStatisticsForLocalRead() throws Exception {
testStatistics(false);
}
private void testStatistics(boolean isShortCircuit) throws Exception {
Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
HdfsConfiguration conf = new HdfsConfiguration();
TemporarySocketDirectory sockDir = null;
if (isShortCircuit) {
DFSInputStream.tcpReadsDisabledForTesting = true;
sockDir = new TemporarySocketDirectory();
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
DomainSocket.disableBindPathValidation();
} else {
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
}
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
IOUtils.readFully(fsIn, original, 0,
BlockReaderLocalTest.TEST_LENGTH);
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalLocalBytesRead());
if (isShortCircuit) {
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
} else {
Assert.assertEquals(0,
dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
}
fsIn.close();
fsIn = null;
} finally {
DFSInputStream.tcpReadsDisabledForTesting = false;
if (fsIn != null) fsIn.close();
if (cluster != null) cluster.shutdown();
if (sockDir != null) sockDir.close();
}
}
}