HDFS-13926. ThreadLocal aggregations for FileSystem.Statistics are incorrect with striped reads.
Contributed by Xiao Chen, Hrishikesh Gadre.
Signed-off-by: Xiao Chen <xiao@apache.org>
(cherry picked from commit 323b76bccf
)
This commit is contained in:
parent
af85ce6ae4
commit
b170de8be5
|
@ -89,6 +89,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
|
||||
|
||||
/****************************************************************
|
||||
* DFSInputStream provides bytes from a named file. It handles
|
||||
* negotiation of the namenode and various datanodes as necessary.
|
||||
|
@ -768,6 +770,9 @@ public class DFSInputStream extends FSInputStream
|
|||
// got a EOS from reader though we expect more data on it.
|
||||
throw new IOException("Unexpected EOS from the reader");
|
||||
}
|
||||
updateReadStatistics(readStatistics, result, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
result);
|
||||
return result;
|
||||
} catch (ChecksumException ce) {
|
||||
throw ce;
|
||||
|
|
|
@ -53,6 +53,8 @@ import java.util.Collection;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
|
||||
|
||||
/**
|
||||
* DFSStripedInputStream reads from striped block groups.
|
||||
*/
|
||||
|
@ -326,6 +328,24 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
curStripeRange = stripeRange;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update read statistics. Note that this has to be done on the thread that
|
||||
* initiates the read, rather than inside each async thread, for
|
||||
* {@link org.apache.hadoop.fs.FileSystem.Statistics} to work correctly with
|
||||
* its ThreadLocal.
|
||||
*
|
||||
* @param stats striped read stats
|
||||
*/
|
||||
void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
|
||||
if (stats == null) {
|
||||
return;
|
||||
}
|
||||
updateReadStatistics(readStatistics, stats.getBytesRead(),
|
||||
stats.isShortCircuit(), stats.getNetworkDistance());
|
||||
dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
|
||||
stats.getBytesRead());
|
||||
}
|
||||
|
||||
/**
|
||||
* Seek to a new arbitrary location.
|
||||
*/
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
|
||||
|
||||
/**
|
||||
* Wraps different possible read implementations so that callers can be
|
||||
|
@ -118,9 +117,6 @@ class ByteArrayStrategy implements ReaderStrategy {
|
|||
int length) throws IOException {
|
||||
int nRead = blockReader.read(readBuf, offset, length);
|
||||
if (nRead > 0) {
|
||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
nRead);
|
||||
offset += nRead;
|
||||
}
|
||||
return nRead;
|
||||
|
@ -185,9 +181,6 @@ class ByteBufferStrategy implements ReaderStrategy {
|
|||
// Only when data are read, update the position
|
||||
if (nRead > 0) {
|
||||
readBuf.position(readBuf.position() + nRead);
|
||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
nRead);
|
||||
}
|
||||
|
||||
return nRead;
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||
|
@ -104,9 +105,10 @@ abstract class StripeReader {
|
|||
}
|
||||
}
|
||||
|
||||
protected final Map<Future<Void>, Integer> futures = new HashMap<>();
|
||||
private final Map<Future<BlockReadStats>, Integer> futures =
|
||||
new HashMap<>();
|
||||
protected final AlignedStripe alignedStripe;
|
||||
protected final CompletionService<Void> service;
|
||||
private final CompletionService<BlockReadStats> service;
|
||||
protected final LocatedBlock[] targetBlocks;
|
||||
protected final CorruptedBlocks corruptedBlocks;
|
||||
protected final BlockReaderInfo[] readerInfos;
|
||||
|
@ -256,7 +258,7 @@ abstract class StripeReader {
|
|||
}
|
||||
}
|
||||
|
||||
private Callable<Void> readCells(final BlockReader reader,
|
||||
private Callable<BlockReadStats> readCells(final BlockReader reader,
|
||||
final DatanodeInfo datanode, final long currentReaderOffset,
|
||||
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
|
||||
final ExtendedBlock currentBlock) {
|
||||
|
@ -274,10 +276,13 @@ abstract class StripeReader {
|
|||
skipped == targetReaderOffset - currentReaderOffset);
|
||||
}
|
||||
|
||||
int ret = 0;
|
||||
for (ByteBufferStrategy strategy : strategies) {
|
||||
readToBuffer(reader, datanode, strategy, currentBlock);
|
||||
int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock);
|
||||
ret += bytesReead;
|
||||
}
|
||||
return null;
|
||||
return new BlockReadStats(ret, reader.isShortCircuit(),
|
||||
reader.getNetworkDistance());
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -302,13 +307,14 @@ abstract class StripeReader {
|
|||
}
|
||||
|
||||
chunk.state = StripingChunk.PENDING;
|
||||
Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
|
||||
Callable<BlockReadStats> readCallable =
|
||||
readCells(readerInfos[chunkIndex].reader,
|
||||
readerInfos[chunkIndex].datanode,
|
||||
readerInfos[chunkIndex].blockReaderOffset,
|
||||
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
|
||||
block.getBlock());
|
||||
|
||||
Future<Void> request = service.submit(readCallable);
|
||||
Future<BlockReadStats> request = service.submit(readCallable);
|
||||
futures.put(request, chunkIndex);
|
||||
return true;
|
||||
}
|
||||
|
@ -341,6 +347,7 @@ abstract class StripeReader {
|
|||
try {
|
||||
StripingChunkReadResult r = StripedBlockUtil
|
||||
.getNextCompletedStripedRead(service, futures, 0);
|
||||
dfsStripedInputStream.updateReadStats(r.getReadStats());
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
|
||||
+ alignedStripe);
|
||||
|
@ -452,7 +459,7 @@ abstract class StripeReader {
|
|||
}
|
||||
|
||||
void clearFutures() {
|
||||
for (Future<Void> future : futures.keySet()) {
|
||||
for (Future future : futures.keySet()) {
|
||||
future.cancel(false);
|
||||
}
|
||||
futures.clear();
|
||||
|
|
|
@ -47,13 +47,19 @@ public class IOUtilsClient {
|
|||
|
||||
public static void updateReadStatistics(ReadStatistics readStatistics,
|
||||
int nRead, BlockReader blockReader) {
|
||||
updateReadStatistics(readStatistics, nRead, blockReader.isShortCircuit(),
|
||||
blockReader.getNetworkDistance());
|
||||
}
|
||||
|
||||
public static void updateReadStatistics(ReadStatistics readStatistics,
|
||||
int nRead, boolean isShortCircuit, int networkDistance) {
|
||||
if (nRead <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (blockReader.isShortCircuit()) {
|
||||
if (isShortCircuit) {
|
||||
readStatistics.addShortCircuitBytes(nRead);
|
||||
} else if (blockReader.getNetworkDistance() == 0) {
|
||||
} else if (networkDistance == 0) {
|
||||
readStatistics.addLocalBytes(nRead);
|
||||
} else {
|
||||
readStatistics.addRemoteBytes(nRead);
|
||||
|
|
|
@ -75,6 +75,48 @@ public class StripedBlockUtil {
|
|||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(StripedBlockUtil.class);
|
||||
|
||||
/**
|
||||
* Struct holding the read statistics. This is used when reads are done
|
||||
* asynchronously, to allow the async threads return the read stats and let
|
||||
* the main reading thread to update the stats. This is important for the
|
||||
* ThreadLocal stats for the main reading thread to be correct.
|
||||
*/
|
||||
public static class BlockReadStats {
|
||||
private final int bytesRead;
|
||||
private final boolean isShortCircuit;
|
||||
private final int networkDistance;
|
||||
|
||||
public BlockReadStats(int numBytesRead, boolean shortCircuit,
|
||||
int distance) {
|
||||
bytesRead = numBytesRead;
|
||||
isShortCircuit = shortCircuit;
|
||||
networkDistance = distance;
|
||||
}
|
||||
|
||||
public int getBytesRead() {
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
public boolean isShortCircuit() {
|
||||
return isShortCircuit;
|
||||
}
|
||||
|
||||
public int getNetworkDistance() {
|
||||
return networkDistance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("bytesRead=").append(bytesRead);
|
||||
sb.append(',');
|
||||
sb.append("isShortCircuit=").append(isShortCircuit);
|
||||
sb.append(',');
|
||||
sb.append("networkDistance=").append(networkDistance);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method parses a striped block group into individual blocks.
|
||||
*
|
||||
|
@ -244,10 +286,11 @@ public class StripedBlockUtil {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
public static StripingChunkReadResult getNextCompletedStripedRead(
|
||||
CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
|
||||
CompletionService<BlockReadStats> readService,
|
||||
Map<Future<BlockReadStats>, Integer> futures,
|
||||
final long timeoutMillis) throws InterruptedException {
|
||||
Preconditions.checkArgument(!futures.isEmpty());
|
||||
Future<Void> future = null;
|
||||
Future<BlockReadStats> future = null;
|
||||
try {
|
||||
if (timeoutMillis > 0) {
|
||||
future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
|
@ -255,9 +298,9 @@ public class StripedBlockUtil {
|
|||
future = readService.take();
|
||||
}
|
||||
if (future != null) {
|
||||
future.get();
|
||||
final BlockReadStats stats = future.get();
|
||||
return new StripingChunkReadResult(futures.remove(future),
|
||||
StripingChunkReadResult.SUCCESSFUL);
|
||||
StripingChunkReadResult.SUCCESSFUL, stats);
|
||||
} else {
|
||||
return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
|
||||
}
|
||||
|
@ -874,24 +917,36 @@ public class StripedBlockUtil {
|
|||
|
||||
public final int index;
|
||||
public final int state;
|
||||
private final BlockReadStats readStats;
|
||||
|
||||
public StripingChunkReadResult(int state) {
|
||||
Preconditions.checkArgument(state == TIMEOUT,
|
||||
"Only timeout result should return negative index.");
|
||||
this.index = -1;
|
||||
this.state = state;
|
||||
this.readStats = null;
|
||||
}
|
||||
|
||||
public StripingChunkReadResult(int index, int state) {
|
||||
this(index, state, null);
|
||||
}
|
||||
|
||||
public StripingChunkReadResult(int index, int state, BlockReadStats stats) {
|
||||
Preconditions.checkArgument(state != TIMEOUT,
|
||||
"Timeout result should return negative index.");
|
||||
this.index = index;
|
||||
this.state = state;
|
||||
this.readStats = stats;
|
||||
}
|
||||
|
||||
public BlockReadStats getReadStats() {
|
||||
return readStats;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "(index=" + index + ", state =" + state + ")";
|
||||
return "(index=" + index + ", state =" + state + ", readStats ="
|
||||
+ readStats + ")";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
|
@ -161,7 +162,7 @@ public final class ErasureCodingWorker {
|
|||
return conf;
|
||||
}
|
||||
|
||||
CompletionService<Void> createReadService() {
|
||||
CompletionService<BlockReadStats> createReadService() {
|
||||
return new ExecutorCompletionService<>(stripedReadPool);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -161,16 +162,15 @@ class StripedBlockReader {
|
|||
}
|
||||
}
|
||||
|
||||
Callable<Void> readFromBlock(final int length,
|
||||
Callable<BlockReadStats> readFromBlock(final int length,
|
||||
final CorruptedBlocks corruptedBlocks) {
|
||||
return new Callable<Void>() {
|
||||
return new Callable<BlockReadStats>() {
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
public BlockReadStats call() throws Exception {
|
||||
try {
|
||||
getReadBuffer().limit(length);
|
||||
actualReadFromBlock();
|
||||
return null;
|
||||
return actualReadFromBlock();
|
||||
} catch (ChecksumException e) {
|
||||
LOG.warn("Found Checksum error for {} from {} at {}", block,
|
||||
source, e.getPos());
|
||||
|
@ -187,7 +187,7 @@ class StripedBlockReader {
|
|||
/**
|
||||
* Perform actual reading of bytes from block.
|
||||
*/
|
||||
private void actualReadFromBlock() throws IOException {
|
||||
private BlockReadStats actualReadFromBlock() throws IOException {
|
||||
int len = buffer.remaining();
|
||||
int n = 0;
|
||||
while (n < len) {
|
||||
|
@ -198,6 +198,8 @@ class StripedBlockReader {
|
|||
n += nread;
|
||||
stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
|
||||
}
|
||||
return new BlockReadStats(n, blockReader.isShortCircuit(),
|
||||
blockReader.getNetworkDistance());
|
||||
}
|
||||
|
||||
// close block reader
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -80,8 +81,8 @@ class StripedReader {
|
|||
|
||||
private final List<StripedBlockReader> readers;
|
||||
|
||||
private final Map<Future<Void>, Integer> futures = new HashMap<>();
|
||||
private final CompletionService<Void> readService;
|
||||
private final Map<Future<BlockReadStats>, Integer> futures = new HashMap<>();
|
||||
private final CompletionService<BlockReadStats> readService;
|
||||
|
||||
StripedReader(StripedReconstructor reconstructor, DataNode datanode,
|
||||
Configuration conf, StripedReconstructionInfo stripedReconInfo) {
|
||||
|
@ -289,9 +290,9 @@ class StripedReader {
|
|||
int toRead = getReadLength(liveIndices[successList[i]],
|
||||
reconstructLength);
|
||||
if (toRead > 0) {
|
||||
Callable<Void> readCallable =
|
||||
Callable<BlockReadStats> readCallable =
|
||||
reader.readFromBlock(toRead, corruptedBlocks);
|
||||
Future<Void> f = readService.submit(readCallable);
|
||||
Future<BlockReadStats> f = readService.submit(readCallable);
|
||||
futures.put(f, successList[i]);
|
||||
} else {
|
||||
// If the read length is 0, we don't need to do real read
|
||||
|
@ -411,9 +412,9 @@ class StripedReader {
|
|||
|
||||
// step3: schedule if find a correct source DN and need to do real read.
|
||||
if (reader != null) {
|
||||
Callable<Void> readCallable =
|
||||
Callable<BlockReadStats> readCallable =
|
||||
reader.readFromBlock(toRead, corruptedBlocks);
|
||||
Future<Void> f = readService.submit(readCallable);
|
||||
Future<BlockReadStats> f = readService.submit(readCallable);
|
||||
futures.put(f, m);
|
||||
used.set(m);
|
||||
}
|
||||
|
@ -422,8 +423,8 @@ class StripedReader {
|
|||
}
|
||||
|
||||
// Cancel all reads.
|
||||
private static void cancelReads(Collection<Future<Void>> futures) {
|
||||
for (Future<Void> future : futures) {
|
||||
private static void cancelReads(Collection<Future<BlockReadStats>> futures) {
|
||||
for (Future<BlockReadStats> future : futures) {
|
||||
future.cancel(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||
|
@ -222,7 +223,7 @@ abstract class StripedReconstructor {
|
|||
return cachingStrategy;
|
||||
}
|
||||
|
||||
CompletionService<Void> createReadService() {
|
||||
CompletionService<BlockReadStats> createReadService() {
|
||||
return erasureCodingWorker.createReadService();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,16 +21,21 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -61,6 +66,9 @@ public class TestDistributedFileSystemWithECFile {
|
|||
return StripedFileTestUtil.getDefaultECPolicy();
|
||||
}
|
||||
|
||||
@Rule
|
||||
public final Timeout globalTimeout = new Timeout(60000 * 3);
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
ecPolicy = getEcPolicy();
|
||||
|
@ -249,4 +257,34 @@ public class TestDistributedFileSystemWithECFile {
|
|||
assertEquals(rs63, fs.getErasureCodingPolicy(ecFile));
|
||||
assertEquals(rs32, fs.getErasureCodingPolicy(ecFile2));
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testStatistics() throws Exception {
|
||||
final String fileName = "/ec/file";
|
||||
final int size = 3200;
|
||||
createFile(fileName, size);
|
||||
InputStream in = null;
|
||||
try {
|
||||
in = fs.open(new Path(fileName));
|
||||
IOUtils.copyBytes(in, System.out, 4096, false);
|
||||
} finally {
|
||||
IOUtils.closeStream(in);
|
||||
}
|
||||
|
||||
// verify stats are correct
|
||||
Long totalBytesRead = 0L;
|
||||
for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
|
||||
totalBytesRead += stat.getBytesRead();
|
||||
}
|
||||
assertEquals(Long.valueOf(size), totalBytesRead);
|
||||
|
||||
// verify thread local stats are correct
|
||||
Long totalBytesReadThread = 0L;
|
||||
for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
|
||||
FileSystem.Statistics.StatisticsData data = stat.getThreadStatistics();
|
||||
totalBytesReadThread += data.getBytesRead();
|
||||
}
|
||||
assertEquals(Long.valueOf(size), totalBytesReadThread);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue