HDFS-13926. ThreadLocal aggregations for FileSystem.Statistics are incorrect with striped reads.

Contributed by Xiao Chen, Hrishikesh Gadre.

Signed-off-by: Xiao Chen <xiao@apache.org>
(cherry picked from commit 08bb6c49a5)
This commit is contained in:
Hrishikesh Gadre 2018-10-08 20:30:53 -07:00 committed by Xiao Chen
parent b6698e2a82
commit a99658cd85
11 changed files with 178 additions and 46 deletions

View File

@ -90,6 +90,8 @@ import com.google.common.annotations.VisibleForTesting;
import javax.annotation.Nonnull;
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
* negotiation of the namenode and various datanodes as necessary.
@ -769,6 +771,12 @@ public class DFSInputStream extends FSInputStream
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
updateReadStatistics(readStatistics, result, blockReader);
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
result);
if (readStatistics.getBlockType() == BlockType.STRIPED) {
dfsClient.updateFileSystemECReadStats(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;

View File

@ -54,6 +54,8 @@ import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
/**
* DFSStripedInputStream reads from striped block groups.
*/
@ -328,6 +330,26 @@ public class DFSStripedInputStream extends DFSInputStream {
curStripeRange = stripeRange;
}
/**
* Update read statistics. Note that this has to be done on the thread that
* initiates the read, rather than inside each async thread, for
* {@link org.apache.hadoop.fs.FileSystem.Statistics} to work correctly with
* its ThreadLocal.
*
* @param stats striped read stats
*/
void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
if (stats == null) {
return;
}
updateReadStatistics(readStatistics, stats.getBytesRead(),
stats.isShortCircuit(), stats.getNetworkDistance());
dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
stats.getBytesRead());
assert readStatistics.getBlockType() == BlockType.STRIPED;
dfsClient.updateFileSystemECReadStats(stats.getBytesRead());
}
/**
* Seek to a new arbitrary location.
*/

View File

@ -17,11 +17,8 @@
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.protocol.BlockType;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
/**
* Wraps different possible read implementations so that callers can be
@ -120,12 +117,6 @@ class ByteArrayStrategy implements ReaderStrategy {
int length) throws IOException {
int nRead = blockReader.read(readBuf, offset, length);
if (nRead > 0) {
updateReadStatistics(readStatistics, nRead, blockReader);
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
nRead);
if (readStatistics.getBlockType() == BlockType.STRIPED) {
dfsClient.updateFileSystemECReadStats(nRead);
}
offset += nRead;
}
return nRead;
@ -190,12 +181,6 @@ class ByteBufferStrategy implements ReaderStrategy {
// Only when data are read, update the position
if (nRead > 0) {
readBuf.position(readBuf.position() + nRead);
updateReadStatistics(readStatistics, nRead, blockReader);
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
nRead);
if (readStatistics.getBlockType() == BlockType.STRIPED) {
dfsClient.updateFileSystemECReadStats(nRead);
}
}
return nRead;

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
@ -105,9 +106,10 @@ abstract class StripeReader {
}
}
protected final Map<Future<Void>, Integer> futures = new HashMap<>();
private final Map<Future<BlockReadStats>, Integer> futures =
new HashMap<>();
protected final AlignedStripe alignedStripe;
protected final CompletionService<Void> service;
private final CompletionService<BlockReadStats> service;
protected final LocatedBlock[] targetBlocks;
protected final CorruptedBlocks corruptedBlocks;
protected final BlockReaderInfo[] readerInfos;
@ -257,7 +259,7 @@ abstract class StripeReader {
}
}
private Callable<Void> readCells(final BlockReader reader,
private Callable<BlockReadStats> readCells(final BlockReader reader,
final DatanodeInfo datanode, final long currentReaderOffset,
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
final ExtendedBlock currentBlock) {
@ -275,10 +277,13 @@ abstract class StripeReader {
skipped == targetReaderOffset - currentReaderOffset);
}
int ret = 0;
for (ByteBufferStrategy strategy : strategies) {
readToBuffer(reader, datanode, strategy, currentBlock);
int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock);
ret += bytesReead;
}
return null;
return new BlockReadStats(ret, reader.isShortCircuit(),
reader.getNetworkDistance());
};
}
@ -303,13 +308,14 @@ abstract class StripeReader {
}
chunk.state = StripingChunk.PENDING;
Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
Callable<BlockReadStats> readCallable =
readCells(readerInfos[chunkIndex].reader,
readerInfos[chunkIndex].datanode,
readerInfos[chunkIndex].blockReaderOffset,
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
block.getBlock());
Future<Void> request = service.submit(readCallable);
Future<BlockReadStats> request = service.submit(readCallable);
futures.put(request, chunkIndex);
return true;
}
@ -342,6 +348,7 @@ abstract class StripeReader {
try {
StripingChunkReadResult r = StripedBlockUtil
.getNextCompletedStripedRead(service, futures, 0);
dfsStripedInputStream.updateReadStats(r.getReadStats());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
+ alignedStripe);
@ -460,7 +467,7 @@ abstract class StripeReader {
}
void clearFutures() {
for (Future<Void> future : futures.keySet()) {
for (Future future : futures.keySet()) {
future.cancel(false);
}
futures.clear();

View File

@ -48,13 +48,19 @@ public class IOUtilsClient {
public static void updateReadStatistics(ReadStatistics readStatistics,
int nRead, BlockReader blockReader) {
updateReadStatistics(readStatistics, nRead, blockReader.isShortCircuit(),
blockReader.getNetworkDistance());
}
public static void updateReadStatistics(ReadStatistics readStatistics,
int nRead, boolean isShortCircuit, int networkDistance) {
if (nRead <= 0) {
return;
}
if (blockReader.isShortCircuit()) {
if (isShortCircuit) {
readStatistics.addShortCircuitBytes(nRead);
} else if (blockReader.getNetworkDistance() == 0) {
} else if (networkDistance == 0) {
readStatistics.addLocalBytes(nRead);
} else {
readStatistics.addRemoteBytes(nRead);

View File

@ -76,6 +76,48 @@ public class StripedBlockUtil {
public static final Logger LOG =
LoggerFactory.getLogger(StripedBlockUtil.class);
/**
* Struct holding the read statistics. This is used when reads are done
* asynchronously, to allow the async threads return the read stats and let
* the main reading thread to update the stats. This is important for the
* ThreadLocal stats for the main reading thread to be correct.
*/
public static class BlockReadStats {
private final int bytesRead;
private final boolean isShortCircuit;
private final int networkDistance;
public BlockReadStats(int numBytesRead, boolean shortCircuit,
int distance) {
bytesRead = numBytesRead;
isShortCircuit = shortCircuit;
networkDistance = distance;
}
public int getBytesRead() {
return bytesRead;
}
public boolean isShortCircuit() {
return isShortCircuit;
}
public int getNetworkDistance() {
return networkDistance;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("bytesRead=").append(bytesRead);
sb.append(',');
sb.append("isShortCircuit=").append(isShortCircuit);
sb.append(',');
sb.append("networkDistance=").append(networkDistance);
return sb.toString();
}
}
/**
* This method parses a striped block group into individual blocks.
*
@ -245,10 +287,11 @@ public class StripedBlockUtil {
* @throws InterruptedException
*/
public static StripingChunkReadResult getNextCompletedStripedRead(
CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
CompletionService<BlockReadStats> readService,
Map<Future<BlockReadStats>, Integer> futures,
final long timeoutMillis) throws InterruptedException {
Preconditions.checkArgument(!futures.isEmpty());
Future<Void> future = null;
Future<BlockReadStats> future = null;
try {
if (timeoutMillis > 0) {
future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS);
@ -256,9 +299,9 @@ public class StripedBlockUtil {
future = readService.take();
}
if (future != null) {
future.get();
final BlockReadStats stats = future.get();
return new StripingChunkReadResult(futures.remove(future),
StripingChunkReadResult.SUCCESSFUL);
StripingChunkReadResult.SUCCESSFUL, stats);
} else {
return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
}
@ -881,24 +924,36 @@ public class StripedBlockUtil {
public final int index;
public final int state;
private final BlockReadStats readStats;
public StripingChunkReadResult(int state) {
Preconditions.checkArgument(state == TIMEOUT,
"Only timeout result should return negative index.");
this.index = -1;
this.state = state;
this.readStats = null;
}
public StripingChunkReadResult(int index, int state) {
this(index, state, null);
}
public StripingChunkReadResult(int index, int state, BlockReadStats stats) {
Preconditions.checkArgument(state != TIMEOUT,
"Timeout result should return negative index.");
this.index = index;
this.state = state;
this.readStats = stats;
}
public BlockReadStats getReadStats() {
return readStats;
}
@Override
public String toString() {
return "(index=" + index + ", state =" + state + ")";
return "(index=" + index + ", state =" + state + ", readStats ="
+ readStats + ")";
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
@ -161,7 +162,7 @@ public final class ErasureCodingWorker {
return conf;
}
CompletionService<Void> createReadService() {
CompletionService<BlockReadStats> createReadService() {
return new ExecutorCompletionService<>(stripedReadPool);
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -161,16 +162,15 @@ class StripedBlockReader {
}
}
Callable<Void> readFromBlock(final int length,
Callable<BlockReadStats> readFromBlock(final int length,
final CorruptedBlocks corruptedBlocks) {
return new Callable<Void>() {
return new Callable<BlockReadStats>() {
@Override
public Void call() throws Exception {
public BlockReadStats call() throws Exception {
try {
getReadBuffer().limit(length);
actualReadFromBlock();
return null;
return actualReadFromBlock();
} catch (ChecksumException e) {
LOG.warn("Found Checksum error for {} from {} at {}", block,
source, e.getPos());
@ -187,7 +187,7 @@ class StripedBlockReader {
/**
* Perform actual reading of bytes from block.
*/
private void actualReadFromBlock() throws IOException {
private BlockReadStats actualReadFromBlock() throws IOException {
int len = buffer.remaining();
int n = 0;
while (n < len) {
@ -198,6 +198,8 @@ class StripedBlockReader {
n += nread;
stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
}
return new BlockReadStats(n, blockReader.isShortCircuit(),
blockReader.getNetworkDistance());
}
// close block reader

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
@ -80,8 +81,8 @@ class StripedReader {
private final List<StripedBlockReader> readers;
private final Map<Future<Void>, Integer> futures = new HashMap<>();
private final CompletionService<Void> readService;
private final Map<Future<BlockReadStats>, Integer> futures = new HashMap<>();
private final CompletionService<BlockReadStats> readService;
StripedReader(StripedReconstructor reconstructor, DataNode datanode,
Configuration conf, StripedReconstructionInfo stripedReconInfo) {
@ -289,9 +290,9 @@ class StripedReader {
int toRead = getReadLength(liveIndices[successList[i]],
reconstructLength);
if (toRead > 0) {
Callable<Void> readCallable =
Callable<BlockReadStats> readCallable =
reader.readFromBlock(toRead, corruptedBlocks);
Future<Void> f = readService.submit(readCallable);
Future<BlockReadStats> f = readService.submit(readCallable);
futures.put(f, successList[i]);
} else {
// If the read length is 0, we don't need to do real read
@ -411,9 +412,9 @@ class StripedReader {
// step3: schedule if find a correct source DN and need to do real read.
if (reader != null) {
Callable<Void> readCallable =
Callable<BlockReadStats> readCallable =
reader.readFromBlock(toRead, corruptedBlocks);
Future<Void> f = readService.submit(readCallable);
Future<BlockReadStats> f = readService.submit(readCallable);
futures.put(f, m);
used.set(m);
}
@ -422,8 +423,8 @@ class StripedReader {
}
// Cancel all reads.
private static void cancelReads(Collection<Future<Void>> futures) {
for (Future<Void> future : futures) {
private static void cancelReads(Collection<Future<BlockReadStats>> futures) {
for (Future<BlockReadStats> future : futures) {
future.cancel(true);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
@ -222,7 +223,7 @@ abstract class StripedReconstructor {
return cachingStrategy;
}
CompletionService<Void> createReadService() {
CompletionService<BlockReadStats> createReadService() {
return erasureCodingWorker.createReadService();
}

View File

@ -21,16 +21,21 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@ -61,6 +66,9 @@ public class TestDistributedFileSystemWithECFile {
return StripedFileTestUtil.getDefaultECPolicy();
}
@Rule
public final Timeout globalTimeout = new Timeout(60000 * 3);
@Before
public void setup() throws IOException {
ecPolicy = getEcPolicy();
@ -249,4 +257,40 @@ public class TestDistributedFileSystemWithECFile {
assertEquals(rs63, fs.getErasureCodingPolicy(ecFile));
assertEquals(rs32, fs.getErasureCodingPolicy(ecFile2));
}
@SuppressWarnings("deprecation")
@Test
public void testStatistics() throws Exception {
final String fileName = "/ec/file";
final int size = 3200;
createFile(fileName, size);
InputStream in = null;
try {
in = fs.open(new Path(fileName));
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
// verify stats are correct
Long totalBytesRead = 0L;
Long ecBytesRead = 0L;
for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
totalBytesRead += stat.getBytesRead();
ecBytesRead += stat.getBytesReadErasureCoded();
}
assertEquals(Long.valueOf(size), totalBytesRead);
assertEquals(Long.valueOf(size), ecBytesRead);
// verify thread local stats are correct
Long totalBytesReadThread = 0L;
Long ecBytesReadThread = 0L;
for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
FileSystem.Statistics.StatisticsData data = stat.getThreadStatistics();
totalBytesReadThread += data.getBytesRead();
ecBytesReadThread += data.getBytesReadErasureCoded();
}
assertEquals(Long.valueOf(size), totalBytesReadThread);
assertEquals(Long.valueOf(size), ecBytesReadThread);
}
}