HDFS-5776 Support 'hedged' reads in DFSClient (Liang Xie via stack)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1571479 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2014-02-24 22:53:07 +00:00
parent ae456f408a
commit 5f3f90a640
7 changed files with 545 additions and 60 deletions

View File

@ -21,6 +21,8 @@ Release 2.4.0 - UNRELEASED
HDFS-5698. Use protobuf to serialize / deserialize FSImage. (See breakdown
of tasks below for features and contributors)
HDFS-5776 Support 'hedged' reads in DFSClient (Liang Xie via stack)
IMPROVEMENTS
HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and

View File

@ -82,6 +82,10 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
@ -172,6 +176,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
@ -221,6 +226,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private final ClientContext clientContext;
private volatile long hedgedReadThresholdMillis;
private static DFSHedgedReadMetrics HEDGED_READ_METRIC =
new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
/**
* DFSClient configuration
@ -573,6 +582,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
this.clientContext = ClientContext.get(
conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
dfsClientConf);
this.hedgedReadThresholdMillis = conf.getLong(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
int numThreads = conf.getInt(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads);
}
}
/**
@ -2640,4 +2658,64 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
}
}
}
/**
* Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
* it does not already exist.
* @param num Number of threads for hedged reads thread pool.
* If zero, skip hedged reads thread pool creation.
*/
private synchronized void initThreadsNumForHedgedReads(int num) {
if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return;
HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex =
new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("hedgedRead-" +
threadIndex.getAndIncrement());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution rejected, Executing in current thread");
HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Using hedged reads; pool threads=" + num);
}
}
long getHedgedReadTimeout() {
return this.hedgedReadThresholdMillis;
}
@VisibleForTesting
void setHedgedReadTimeout(long timeoutMillis) {
this.hedgedReadThresholdMillis = timeoutMillis;
}
ThreadPoolExecutor getHedgedReadsThreadPool() {
return HEDGED_READ_THREAD_POOL;
}
boolean isHedgedReadsEnabled() {
return (HEDGED_READ_THREAD_POOL != null) &&
HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
}
DFSHedgedReadMetrics getHedgedReadMetrics() {
return HEDGED_READ_METRIC;
}
}

View File

@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
@ -46,4 +45,6 @@ public class DFSClientFaultInjector {
public boolean failPacket() {
return false;
}
public void startFetchFromDatanode() {}
}

View File

@ -589,4 +589,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500;
public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000;
// hedged read properties
public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
"dfs.client.hedged.read.threshold.millis";
public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
500;
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
"dfs.client.hedged.read.threadpool.size";
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.concurrent.atomic.AtomicLong;
/**
* The client-side metrics for hedged read feature.
* This class has a number of metrics variables that are publicly accessible,
* we can grab them from client side, like HBase.
*/
public class DFSHedgedReadMetrics {
public AtomicLong hedgedReadOps = new AtomicLong();
public AtomicLong hedgedReadOpsWin = new AtomicLong();
public AtomicLong hedgedReadOpsInCurThread = new AtomicLong();
public void incHedgedReadOps() {
hedgedReadOps.incrementAndGet();
}
public void incHedgedReadOpsInCurThread() {
hedgedReadOpsInCurThread.incrementAndGet();
}
public void incHedgedReadWins() {
hedgedReadOpsWin.incrementAndGet();
}
public long getHedgedReadOps() {
return hedgedReadOps.longValue();
}
public long getHedgedReadOpsInCurThread() {
return hedgedReadOpsInCurThread.longValue();
}
public long getHedgedReadWins() {
return hedgedReadOpsWin.longValue();
}
}

View File

@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hdfs;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -32,9 +31,14 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ByteBufferUtil;
@ -54,15 +58,12 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
@ -555,7 +556,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
assert (target==pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
DNAddrPair retval = chooseDataNode(targetBlock);
DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
@ -864,31 +865,29 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
private DNAddrPair chooseDataNode(LocatedBlock block)
throws IOException {
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
while (true) {
DatanodeInfo[] nodes = block.getLocations();
try {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr);
return getBestNodeDNAddrPair(nodes, ignoredNodes);
} catch (IOException ie) {
String errMsg =
getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
block.getStartOffset());
String description = "Could not obtain block: " + blockInfo;
DFSClient.LOG.warn(description + errMsg
+ ". Throwing a BlockMissingException");
throw new BlockMissingException(src, description,
block.getStartOffset());
}
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ " from any node: " + ie
+ " from any node: " + ie + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
@ -916,13 +915,91 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
/**
* Get the best node.
* @param nodes Nodes to choose from.
* @param ignoredNodes Do not chose nodes in this array (may be null)
* @return The DNAddrPair of the best node.
* @throws IOException
*/
private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr);
}
private static String getBestNodeDNAddrPairErrorString(
DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {
StringBuilder errMsgr = new StringBuilder(
" No live nodes contain current block ");
errMsgr.append("Block locations:");
for (DatanodeInfo datanode : nodes) {
errMsgr.append(" ");
errMsgr.append(datanode.toString());
}
errMsgr.append(" Dead nodes: ");
for (DatanodeInfo datanode : deadNodes.keySet()) {
errMsgr.append(" ");
errMsgr.append(datanode.toString());
}
if (ignoredNodes != null) {
errMsgr.append(" Ignored nodes: ");
for (DatanodeInfo datanode : ignoredNodes) {
errMsgr.append(" ");
errMsgr.append(datanode.toString());
}
}
return errMsgr.toString();
}
private void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
//
// Connect to best DataNode for desired Block, with potential offset
//
block = getBlockAt(block.getStartOffset(), false);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
corruptedBlockMap);
return;
} catch (IOException e) {
// Ignore. Already processed inside the function.
// Loop through to try the next node.
}
}
}
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final CountDownLatch latch) {
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
byte[] buf = bb.array();
int offset = bb.position();
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
latch.countDown();
return bb;
}
};
}
private void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
@ -937,9 +1014,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
curCachingStrategy = cachingStrategy;
allowShortCircuitLocalReads = !shortCircuitForbidden();
}
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
DatanodeInfo chosenNode = datanode.info;
InetSocketAddress targetAddr = datanode.addr;
BlockReader reader = null;
try {
@ -969,11 +1045,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
return;
} catch (ChecksumException e) {
DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
src + " at " + block.getBlock() + ":" +
e.getPos() + " from " + chosenNode);
String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ chosenNode;
DFSClient.LOG.warn(msg);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
addToDeadNodes(chosenNode);
throw new IOException(msg);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
@ -985,22 +1064,164 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
continue;
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
fetchBlockAt(block.getStartOffset());
try {
fetchBlockAt(block.getStartOffset());
} catch (IOException fbae) {
// ignore IOE, since we can retry it later in a loop
}
continue;
} else {
DFSClient.LOG.warn("Failed to connect to " + targetAddr +
" for file " + src + " for block " + block.getBlock() + ":" + e);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connection failure ", e);
}
String msg = "Failed to connect to " + targetAddr + " for file "
+ src + " for block " + block.getBlock() + ":" + e;
DFSClient.LOG.warn("Connection failure: " + msg, e);
addToDeadNodes(chosenNode);
throw new IOException(msg);
}
} finally {
if (reader != null) {
reader.close();
}
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
}
/**
* Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
* int, Map)} except we start up a second, parallel, 'hedged' read
* if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*
* @param block
* @param start
* @param end
* @param buf
* @param offset
* @param corruptedBlockMap
* @throws IOException
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
ArrayList<Future<ByteBuffer>> futures = null;
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null;
int len = (int) (end - start + 1);
block = getBlockAt(block.getStartOffset(), false);
// Latch shared by all outstanding reads. First to finish closes
CountDownLatch hasReceivedResult = new CountDownLatch(1);
while (true) {
DNAddrPair chosenNode = null;
Future<ByteBuffer> future = null;
// futures is null if there is no request already executing.
if (futures == null) {
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len);
future = getHedgedReadFuture(chosenNode, block, start, end, bb,
corruptedBlockMap, hasReceivedResult);
try {
future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
return;
} catch (TimeoutException e) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() +
"ms to read from " + chosenNode.info + "; spawning hedged read");
}
// Ignore this node on next go around.
ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
futures = new ArrayList<Future<ByteBuffer>>();
futures.add(future);
continue; // no need to refresh block locations
} catch (InterruptedException e) {
// Ignore
} catch (ExecutionException e) {
// Ignore already logged in the call.
}
} else {
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
try {
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
bb = ByteBuffer.allocate(len);
future = getHedgedReadFuture(chosenNode, block, start, end, bb,
corruptedBlockMap, hasReceivedResult);
futures.add(future);
} catch (IOException ioe) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Failed getting node for hedged read: " +
ioe.getMessage());
}
}
// if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one.
try {
ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
// cancel the rest.
cancelAll(futures);
if (result.array() != buf) { // compare the array pointers
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
} else {
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
}
return;
} catch (InterruptedException ie) {
// Ignore
} catch (ExecutionException e) {
// exception already handled in the call method. getFirstToComplete
// will remove the failing future from the list. nothing more to do.
}
// We got here if exception. Ignore this node on next go around.
ignored.add(chosenNode.info);
}
// executed if we get an error from a data node
block = getBlockAt(block.getStartOffset(), false);
}
}
private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode,
final LocatedBlock block, long start,
final long end, final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final CountDownLatch hasReceivedResult) {
Callable<ByteBuffer> getFromDataNodeCallable =
getFromOneDataNode(chosenNode, block, start, end, bb,
corruptedBlockMap, hasReceivedResult);
return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
}
private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures,
CountDownLatch latch) throws ExecutionException, InterruptedException {
latch.await();
for (Future<ByteBuffer> future : futures) {
if (future.isDone()) {
try {
return future.get();
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
throw e;
}
}
}
throw new InterruptedException("latch has counted down to zero but no"
+ "result available yet, for safety try to request another one from"
+ "outside loop, this should be rare");
}
private void cancelAll(List<Future<ByteBuffer>> futures) {
for (Future<ByteBuffer> future : futures) {
// Unfortunately, hdfs reads do not take kindly to interruption.
// Threads return a variety of interrupted-type exceptions but
// also complaints about invalid pbs -- likely because read
// is interrupted before gets whole pb. Also verbose WARN
// logging. So, for now, do not interrupt running read.
future.cancel(false);
}
}
@ -1070,8 +1291,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
fetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
if (dfsClient.isHedgedReadsEnabled()) {
hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
- 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are
@ -1266,11 +1492,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Entries in <i>nodes</i> are already in the priority order
*/
static DatanodeInfo bestNode(DatanodeInfo nodes[],
AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
throws IOException {
AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
return nodes[i];
}
}

View File

@ -22,7 +22,13 @@ import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@ -33,6 +39,9 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* This class tests the DFS positional read functionality in a single node
@ -44,9 +53,10 @@ public class TestPread {
boolean simulatedStorage = false;
private void writeFile(FileSystem fileSys, Path name) throws IOException {
int replication = 3;// We need > 1 blocks to test out the hedged reads.
// create and write a file that contains three blocks of data
DataOutputStream stm = fileSys.create(name, true, 4096, (short)1,
blockSize);
DataOutputStream stm = fileSys.create(name, true, 4096,
(short)replication, blockSize);
// test empty file open and read
stm.close();
FSDataInputStream in = fileSys.open(name);
@ -197,25 +207,127 @@ public class TestPread {
assertTrue(!fileSys.exists(name));
}
private Callable<Void> getPReadFileCallable(final FileSystem fileSys,
final Path file) {
return new Callable<Void>() {
public Void call() throws IOException {
pReadFile(fileSys, file);
return null;
}
};
}
/**
* Tests positional read in DFS.
*/
@Test
public void testPreadDFS() throws IOException {
dfsPreadTest(false, true); //normal pread
dfsPreadTest(true, true); //trigger read code path without transferTo.
Configuration conf = new Configuration();
dfsPreadTest(conf, false, true); // normal pread
dfsPreadTest(conf, true, true); // trigger read code path without
// transferTo.
}
@Test
public void testPreadDFSNoChecksum() throws IOException {
Configuration conf = new Configuration();
((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
dfsPreadTest(false, false);
dfsPreadTest(true, false);
dfsPreadTest(conf, false, false);
dfsPreadTest(conf, true, false);
}
private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
/**
* Tests positional read in DFS, with hedged reads enabled.
*/
@Test
public void testHedgedPreadDFSBasic() throws IOException {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 100);
dfsPreadTest(conf, false, true); // normal pread
dfsPreadTest(conf, true, true); // trigger read code path without
// transferTo.
}
@Test
public void testMaxOutHedgedReadPool() throws IOException,
InterruptedException, ExecutionException {
Configuration conf = new Configuration();
int numHedgedReadPoolThreads = 5;
final int initialHedgedReadTimeoutMillis = 500;
final int fixedSleepIntervalMillis = 50;
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
numHedgedReadPoolThreads);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
initialHedgedReadTimeoutMillis);
// Set up the InjectionHandler
DFSClientFaultInjector.instance = Mockito
.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
// make preads sleep for 50ms
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(fixedSleepIntervalMillis);
return null;
}
}).when(injector).startFetchFromDatanode();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
try {
Path file1 = new Path("hedgedReadMaxOut.dat");
writeFile(fileSys, file1);
// Basic test. Reads complete within timeout. Assert that there were no
// hedged reads.
pReadFile(fileSys, file1);
// assert that there were no hedged reads. 50ms + delta < 500ms
assertTrue(metrics.getHedgedReadOps() == 0);
assertTrue(metrics.getHedgedReadOpsInCurThread() == 0);
/*
* Reads take longer than timeout. But, only one thread reading. Assert
* that there were hedged reads. But, none of the reads had to run in the
* current thread.
*/
dfsClient.setHedgedReadTimeout(50); // 50ms
pReadFile(fileSys, file1);
// assert that there were hedged reads
assertTrue(metrics.getHedgedReadOps() > 0);
assertTrue(metrics.getHedgedReadOpsInCurThread() == 0);
/*
* Multiple threads reading. Reads take longer than timeout. Assert that
* there were hedged reads. And that reads had to run in the current
* thread.
*/
int factor = 10;
int numHedgedReads = numHedgedReadPoolThreads * factor;
long initialReadOpsValue = metrics.getHedgedReadOps();
ExecutorService executor = Executors.newFixedThreadPool(numHedgedReads);
ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
for (int i = 0; i < numHedgedReads; i++) {
futures.add(executor.submit(getPReadFileCallable(fileSys, file1)));
}
for (int i = 0; i < numHedgedReads; i++) {
futures.get(i).get();
}
assertTrue(metrics.getHedgedReadOps() > initialReadOpsValue);
assertTrue(metrics.getHedgedReadOpsInCurThread() > 0);
cleanupFile(fileSys, file1);
executor.shutdown();
} finally {
fileSys.close();
cluster.shutdown();
Mockito.reset(injector);
}
}
private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum)
throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
if (simulatedStorage) {