From a99658cd8504edfe7b80f979eb25795c16726dcd Mon Sep 17 00:00:00 2001 From: Hrishikesh Gadre Date: Mon, 8 Oct 2018 20:30:53 -0700 Subject: [PATCH] HDFS-13926. ThreadLocal aggregations for FileSystem.Statistics are incorrect with striped reads. Contributed by Xiao Chen, Hrishikesh Gadre. Signed-off-by: Xiao Chen (cherry picked from commit 08bb6c49a5aec32b7d9f29238560f947420405d6) --- .../apache/hadoop/hdfs/DFSInputStream.java | 8 +++ .../hadoop/hdfs/DFSStripedInputStream.java | 22 +++++++ .../apache/hadoop/hdfs/ReaderStrategy.java | 15 ----- .../org/apache/hadoop/hdfs/StripeReader.java | 23 ++++--- .../hadoop/hdfs/util/IOUtilsClient.java | 10 ++- .../hadoop/hdfs/util/StripedBlockUtil.java | 65 +++++++++++++++++-- .../erasurecode/ErasureCodingWorker.java | 3 +- .../erasurecode/StripedBlockReader.java | 14 ++-- .../datanode/erasurecode/StripedReader.java | 17 ++--- .../erasurecode/StripedReconstructor.java | 3 +- .../TestDistributedFileSystemWithECFile.java | 44 +++++++++++++ 11 files changed, 178 insertions(+), 46 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index e5640d2ba6a..52ed1d432ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -90,6 +90,8 @@ import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nonnull; +import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics; + /**************************************************************** * DFSInputStream provides bytes from a named file. It handles * negotiation of the namenode and various datanodes as necessary. @@ -769,6 +771,12 @@ public class DFSInputStream extends FSInputStream // got a EOS from reader though we expect more data on it. throw new IOException("Unexpected EOS from the reader"); } + updateReadStatistics(readStatistics, result, blockReader); + dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), + result); + if (readStatistics.getBlockType() == BlockType.STRIPED) { + dfsClient.updateFileSystemECReadStats(result); + } return result; } catch (ChecksumException ce) { throw ce; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 5557a502cef..3f688d410be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -54,6 +54,8 @@ import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; +import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics; + /** * DFSStripedInputStream reads from striped block groups. */ @@ -328,6 +330,26 @@ public class DFSStripedInputStream extends DFSInputStream { curStripeRange = stripeRange; } + /** + * Update read statistics. Note that this has to be done on the thread that + * initiates the read, rather than inside each async thread, for + * {@link org.apache.hadoop.fs.FileSystem.Statistics} to work correctly with + * its ThreadLocal. + * + * @param stats striped read stats + */ + void updateReadStats(final StripedBlockUtil.BlockReadStats stats) { + if (stats == null) { + return; + } + updateReadStatistics(readStatistics, stats.getBytesRead(), + stats.isShortCircuit(), stats.getNetworkDistance()); + dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(), + stats.getBytesRead()); + assert readStatistics.getBlockType() == BlockType.STRIPED; + dfsClient.updateFileSystemECReadStats(stats.getBytesRead()); + } + /** * Seek to a new arbitrary location. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java index 39ad2ff4a9e..4d5e741befb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hdfs; -import org.apache.hadoop.hdfs.protocol.BlockType; - import java.io.IOException; import java.nio.ByteBuffer; -import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics; /** * Wraps different possible read implementations so that callers can be @@ -120,12 +117,6 @@ class ByteArrayStrategy implements ReaderStrategy { int length) throws IOException { int nRead = blockReader.read(readBuf, offset, length); if (nRead > 0) { - updateReadStatistics(readStatistics, nRead, blockReader); - dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), - nRead); - if (readStatistics.getBlockType() == BlockType.STRIPED) { - dfsClient.updateFileSystemECReadStats(nRead); - } offset += nRead; } return nRead; @@ -190,12 +181,6 @@ class ByteBufferStrategy implements ReaderStrategy { // Only when data are read, update the position if (nRead > 0) { readBuf.position(readBuf.position() + nRead); - updateReadStatistics(readStatistics, nRead, blockReader); - dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), - nRead); - if (readStatistics.getBlockType() == BlockType.STRIPED) { - dfsClient.updateFileSystemECReadStats(nRead); - } } return nRead; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 0554ebed0ee..e90af846d7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; @@ -105,9 +106,10 @@ abstract class StripeReader { } } - protected final Map, Integer> futures = new HashMap<>(); + private final Map, Integer> futures = + new HashMap<>(); protected final AlignedStripe alignedStripe; - protected final CompletionService service; + private final CompletionService service; protected final LocatedBlock[] targetBlocks; protected final CorruptedBlocks corruptedBlocks; protected final BlockReaderInfo[] readerInfos; @@ -257,7 +259,7 @@ abstract class StripeReader { } } - private Callable readCells(final BlockReader reader, + private Callable readCells(final BlockReader reader, final DatanodeInfo datanode, final long currentReaderOffset, final long targetReaderOffset, final ByteBufferStrategy[] strategies, final ExtendedBlock currentBlock) { @@ -275,10 +277,13 @@ abstract class StripeReader { skipped == targetReaderOffset - currentReaderOffset); } + int ret = 0; for (ByteBufferStrategy strategy : strategies) { - readToBuffer(reader, datanode, strategy, currentBlock); + int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock); + ret += bytesReead; } - return null; + return new BlockReadStats(ret, reader.isShortCircuit(), + reader.getNetworkDistance()); }; } @@ -303,13 +308,14 @@ abstract class StripeReader { } chunk.state = StripingChunk.PENDING; - Callable readCallable = readCells(readerInfos[chunkIndex].reader, + Callable readCallable = + readCells(readerInfos[chunkIndex].reader, readerInfos[chunkIndex].datanode, readerInfos[chunkIndex].blockReaderOffset, alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), block.getBlock()); - Future request = service.submit(readCallable); + Future request = service.submit(readCallable); futures.put(request, chunkIndex); return true; } @@ -342,6 +348,7 @@ abstract class StripeReader { try { StripingChunkReadResult r = StripedBlockUtil .getNextCompletedStripedRead(service, futures, 0); + dfsStripedInputStream.updateReadStats(r.getReadStats()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe); @@ -460,7 +467,7 @@ abstract class StripeReader { } void clearFutures() { - for (Future future : futures.keySet()) { + for (Future future : futures.keySet()) { future.cancel(false); } futures.clear(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java index 85e9cee748c..25d8a0f17a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java @@ -48,13 +48,19 @@ public class IOUtilsClient { public static void updateReadStatistics(ReadStatistics readStatistics, int nRead, BlockReader blockReader) { + updateReadStatistics(readStatistics, nRead, blockReader.isShortCircuit(), + blockReader.getNetworkDistance()); + } + + public static void updateReadStatistics(ReadStatistics readStatistics, + int nRead, boolean isShortCircuit, int networkDistance) { if (nRead <= 0) { return; } - if (blockReader.isShortCircuit()) { + if (isShortCircuit) { readStatistics.addShortCircuitBytes(nRead); - } else if (blockReader.getNetworkDistance() == 0) { + } else if (networkDistance == 0) { readStatistics.addLocalBytes(nRead); } else { readStatistics.addRemoteBytes(nRead); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 042592779b6..22457570e44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -76,6 +76,48 @@ public class StripedBlockUtil { public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class); + /** + * Struct holding the read statistics. This is used when reads are done + * asynchronously, to allow the async threads return the read stats and let + * the main reading thread to update the stats. This is important for the + * ThreadLocal stats for the main reading thread to be correct. + */ + public static class BlockReadStats { + private final int bytesRead; + private final boolean isShortCircuit; + private final int networkDistance; + + public BlockReadStats(int numBytesRead, boolean shortCircuit, + int distance) { + bytesRead = numBytesRead; + isShortCircuit = shortCircuit; + networkDistance = distance; + } + + public int getBytesRead() { + return bytesRead; + } + + public boolean isShortCircuit() { + return isShortCircuit; + } + + public int getNetworkDistance() { + return networkDistance; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("bytesRead=").append(bytesRead); + sb.append(','); + sb.append("isShortCircuit=").append(isShortCircuit); + sb.append(','); + sb.append("networkDistance=").append(networkDistance); + return sb.toString(); + } + } + /** * This method parses a striped block group into individual blocks. * @@ -245,10 +287,11 @@ public class StripedBlockUtil { * @throws InterruptedException */ public static StripingChunkReadResult getNextCompletedStripedRead( - CompletionService readService, Map, Integer> futures, + CompletionService readService, + Map, Integer> futures, final long timeoutMillis) throws InterruptedException { Preconditions.checkArgument(!futures.isEmpty()); - Future future = null; + Future future = null; try { if (timeoutMillis > 0) { future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS); @@ -256,9 +299,9 @@ public class StripedBlockUtil { future = readService.take(); } if (future != null) { - future.get(); + final BlockReadStats stats = future.get(); return new StripingChunkReadResult(futures.remove(future), - StripingChunkReadResult.SUCCESSFUL); + StripingChunkReadResult.SUCCESSFUL, stats); } else { return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); } @@ -881,24 +924,36 @@ public class StripedBlockUtil { public final int index; public final int state; + private final BlockReadStats readStats; public StripingChunkReadResult(int state) { Preconditions.checkArgument(state == TIMEOUT, "Only timeout result should return negative index."); this.index = -1; this.state = state; + this.readStats = null; } public StripingChunkReadResult(int index, int state) { + this(index, state, null); + } + + public StripingChunkReadResult(int index, int state, BlockReadStats stats) { Preconditions.checkArgument(state != TIMEOUT, "Timeout result should return negative index."); this.index = index; this.state = state; + this.readStats = stats; + } + + public BlockReadStats getReadStats() { + return readStats; } @Override public String toString() { - return "(index=" + index + ", state =" + state + ")"; + return "(index=" + index + ", state =" + state + ", readStats =" + + readStats + ")"; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 45e29fffe0c..f9063b7a892 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -161,7 +162,7 @@ public final class ErasureCodingWorker { return conf; } - CompletionService createReadService() { + CompletionService createReadService() { return new ExecutorCompletionService<>(stripedReadPool); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index cbef31807ea..0db8a6f499b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -161,16 +162,15 @@ class StripedBlockReader { } } - Callable readFromBlock(final int length, + Callable readFromBlock(final int length, final CorruptedBlocks corruptedBlocks) { - return new Callable() { + return new Callable() { @Override - public Void call() throws Exception { + public BlockReadStats call() throws Exception { try { getReadBuffer().limit(length); - actualReadFromBlock(); - return null; + return actualReadFromBlock(); } catch (ChecksumException e) { LOG.warn("Found Checksum error for {} from {} at {}", block, source, e.getPos()); @@ -187,7 +187,7 @@ class StripedBlockReader { /** * Perform actual reading of bytes from block. */ - private void actualReadFromBlock() throws IOException { + private BlockReadStats actualReadFromBlock() throws IOException { int len = buffer.remaining(); int n = 0; while (n < len) { @@ -198,6 +198,8 @@ class StripedBlockReader { n += nread; stripedReader.getReconstructor().incrBytesRead(isLocal, nread); } + return new BlockReadStats(n, blockReader.isShortCircuit(), + blockReader.getNetworkDistance()); } // close block reader diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index 96f97915455..98edf724a8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; @@ -80,8 +81,8 @@ class StripedReader { private final List readers; - private final Map, Integer> futures = new HashMap<>(); - private final CompletionService readService; + private final Map, Integer> futures = new HashMap<>(); + private final CompletionService readService; StripedReader(StripedReconstructor reconstructor, DataNode datanode, Configuration conf, StripedReconstructionInfo stripedReconInfo) { @@ -289,9 +290,9 @@ class StripedReader { int toRead = getReadLength(liveIndices[successList[i]], reconstructLength); if (toRead > 0) { - Callable readCallable = + Callable readCallable = reader.readFromBlock(toRead, corruptedBlocks); - Future f = readService.submit(readCallable); + Future f = readService.submit(readCallable); futures.put(f, successList[i]); } else { // If the read length is 0, we don't need to do real read @@ -411,9 +412,9 @@ class StripedReader { // step3: schedule if find a correct source DN and need to do real read. if (reader != null) { - Callable readCallable = + Callable readCallable = reader.readFromBlock(toRead, corruptedBlocks); - Future f = readService.submit(readCallable); + Future f = readService.submit(readCallable); futures.put(f, m); used.set(m); } @@ -422,8 +423,8 @@ class StripedReader { } // Cancel all reads. - private static void cancelReads(Collection> futures) { - for (Future future : futures) { + private static void cancelReads(Collection> futures) { + for (Future future : futures) { future.cancel(true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index bbffcf52716..a1f4c7ff55e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; @@ -222,7 +223,7 @@ abstract class StripedReconstructor { return cachingStrategy; } - CompletionService createReadService() { + CompletionService createReadService() { return erasureCodingWorker.createReadService(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java index 0a3010f0e49..1a2c4de3974 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java @@ -21,16 +21,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; +import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -61,6 +66,9 @@ public class TestDistributedFileSystemWithECFile { return StripedFileTestUtil.getDefaultECPolicy(); } + @Rule + public final Timeout globalTimeout = new Timeout(60000 * 3); + @Before public void setup() throws IOException { ecPolicy = getEcPolicy(); @@ -249,4 +257,40 @@ public class TestDistributedFileSystemWithECFile { assertEquals(rs63, fs.getErasureCodingPolicy(ecFile)); assertEquals(rs32, fs.getErasureCodingPolicy(ecFile2)); } + + @SuppressWarnings("deprecation") + @Test + public void testStatistics() throws Exception { + final String fileName = "/ec/file"; + final int size = 3200; + createFile(fileName, size); + InputStream in = null; + try { + in = fs.open(new Path(fileName)); + IOUtils.copyBytes(in, System.out, 4096, false); + } finally { + IOUtils.closeStream(in); + } + + // verify stats are correct + Long totalBytesRead = 0L; + Long ecBytesRead = 0L; + for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) { + totalBytesRead += stat.getBytesRead(); + ecBytesRead += stat.getBytesReadErasureCoded(); + } + assertEquals(Long.valueOf(size), totalBytesRead); + assertEquals(Long.valueOf(size), ecBytesRead); + + // verify thread local stats are correct + Long totalBytesReadThread = 0L; + Long ecBytesReadThread = 0L; + for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) { + FileSystem.Statistics.StatisticsData data = stat.getThreadStatistics(); + totalBytesReadThread += data.getBytesRead(); + ecBytesReadThread += data.getBytesReadErasureCoded(); + } + assertEquals(Long.valueOf(size), totalBytesReadThread); + assertEquals(Long.valueOf(size), ecBytesReadThread); + } }