diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index acc6e4df855..edf805f29fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -353,6 +353,8 @@ Release 2.4.0 - UNRELEASED HDFS-5698. Use protobuf to serialize / deserialize FSImage. (See breakdown of tasks below for features and contributors) + HDFS-5776 Support 'hedged' reads in DFSClient (Liang Xie via stack) + IMPROVEMENTS HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 62d2706ae47..a8fe6d132c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -80,6 +80,10 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; @@ -173,6 +177,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; @@ -222,6 +227,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy; private final ClientContext clientContext; + private volatile long hedgedReadThresholdMillis; + private static DFSHedgedReadMetrics HEDGED_READ_METRIC = + new DFSHedgedReadMetrics(); + private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; /** * DFSClient configuration @@ -574,6 +583,15 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, this.clientContext = ClientContext.get( conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT), dfsClientConf); + this.hedgedReadThresholdMillis = conf.getLong( + DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, + DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS); + int numThreads = conf.getInt( + DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, + DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE); + if (numThreads > 0) { + this.initThreadsNumForHedgedReads(numThreads); + } } /** @@ -2714,4 +2732,64 @@ public Peer newConnectedPeer(InetSocketAddress addr) throws IOException { } } } + + /** + * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if + * it does not already exist. + * @param num Number of threads for hedged reads thread pool. + * If zero, skip hedged reads thread pool creation. + */ + private synchronized void initThreadsNumForHedgedReads(int num) { + if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return; + HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = + new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("hedgedRead-" + + threadIndex.getAndIncrement()); + return t; + } + }, + new ThreadPoolExecutor.CallerRunsPolicy() { + + @Override + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info("Execution rejected, Executing in current thread"); + HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + if (LOG.isDebugEnabled()) { + LOG.debug("Using hedged reads; pool threads=" + num); + } + } + + long getHedgedReadTimeout() { + return this.hedgedReadThresholdMillis; + } + + @VisibleForTesting + void setHedgedReadTimeout(long timeoutMillis) { + this.hedgedReadThresholdMillis = timeoutMillis; + } + + ThreadPoolExecutor getHedgedReadsThreadPool() { + return HEDGED_READ_THREAD_POOL; + } + + boolean isHedgedReadsEnabled() { + return (HEDGED_READ_THREAD_POOL != null) && + HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0; + } + + DFSHedgedReadMetrics getHedgedReadMetrics() { + return HEDGED_READ_METRIC; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index c5c6d5c5492..dba0c36d339 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -16,7 +16,6 @@ * limitations under the License. */ package org.apache.hadoop.hdfs; -import java.io.IOException; import com.google.common.annotations.VisibleForTesting; @@ -46,4 +45,6 @@ public boolean uncorruptPacket() { public boolean failPacket() { return false; } + + public void startFetchFromDatanode() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9f2a76a16ee..1d31d57533b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -591,4 +591,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500; public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis"; public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000; + + // hedged read properties + public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS = + "dfs.client.hedged.read.threshold.millis"; + public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS = + 500; + + public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = + "dfs.client.hedged.read.threadpool.size"; + public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 10420035d26..541d4c8b2e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -17,13 +17,12 @@ */ package org.apache.hadoop.hdfs; -import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -32,9 +31,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferUtil; @@ -54,15 +58,12 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; @@ -555,7 +556,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { assert (target==pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset(); - DNAddrPair retval = chooseDataNode(targetBlock); + DNAddrPair retval = chooseDataNode(targetBlock, null); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; @@ -863,32 +864,30 @@ private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, corruptedBlockMap.put(blk, dnSet); } } - - private DNAddrPair chooseDataNode(LocatedBlock block) - throws IOException { + + private DNAddrPair chooseDataNode(LocatedBlock block, + Collection ignoredNodes) throws IOException { while (true) { DatanodeInfo[] nodes = block.getLocations(); try { - DatanodeInfo chosenNode = bestNode(nodes, deadNodes); - final String dnAddr = - chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connecting to datanode " + dnAddr); - } - InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); - return new DNAddrPair(chosenNode, targetAddr); + return getBestNodeDNAddrPair(nodes, ignoredNodes); } catch (IOException ie) { + String errMsg = + getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getMaxBlockAcquireFailures()) { - throw new BlockMissingException(src, "Could not obtain block: " + blockInfo, - block.getStartOffset()); + String description = "Could not obtain block: " + blockInfo; + DFSClient.LOG.warn(description + errMsg + + ". Throwing a BlockMissingException"); + throw new BlockMissingException(src, description, + block.getStartOffset()); } if (nodes == null || nodes.length == 0) { DFSClient.LOG.info("No node available for " + blockInfo); } DFSClient.LOG.info("Could not obtain " + block.getBlock() - + " from any node: " + ie + + " from any node: " + ie + errMsg + ". Will get new block locations from namenode and retry..."); try { // Introducing a random factor to the wait time before another retry. @@ -914,21 +913,99 @@ private DNAddrPair chooseDataNode(LocatedBlock block) continue; } } - } - + } + + /** + * Get the best node. + * @param nodes Nodes to choose from. + * @param ignoredNodes Do not chose nodes in this array (may be null) + * @return The DNAddrPair of the best node. + * @throws IOException + */ + private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes, + Collection ignoredNodes) throws IOException { + DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes); + final String dnAddr = + chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Connecting to datanode " + dnAddr); + } + InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); + return new DNAddrPair(chosenNode, targetAddr); + } + + private static String getBestNodeDNAddrPairErrorString( + DatanodeInfo nodes[], AbstractMap deadNodes, Collection ignoredNodes) { + StringBuilder errMsgr = new StringBuilder( + " No live nodes contain current block "); + errMsgr.append("Block locations:"); + for (DatanodeInfo datanode : nodes) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + errMsgr.append(" Dead nodes: "); + for (DatanodeInfo datanode : deadNodes.keySet()) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + if (ignoredNodes != null) { + errMsgr.append(" Ignored nodes: "); + for (DatanodeInfo datanode : ignoredNodes) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + } + return errMsgr.toString(); + } + private void fetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { - // - // Connect to best DataNode for desired Block, with potential offset - // + block = getBlockAt(block.getStartOffset(), false); + while (true) { + DNAddrPair addressPair = chooseDataNode(block, null); + try { + actualGetFromOneDataNode(addressPair, block, start, end, buf, offset, + corruptedBlockMap); + return; + } catch (IOException e) { + // Ignore. Already processed inside the function. + // Loop through to try the next node. + } + } + } + + private Callable getFromOneDataNode(final DNAddrPair datanode, + final LocatedBlock block, final long start, final long end, + final ByteBuffer bb, + final Map> corruptedBlockMap, + final CountDownLatch latch) { + return new Callable() { + @Override + public ByteBuffer call() throws Exception { + byte[] buf = bb.array(); + int offset = bb.position(); + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, + corruptedBlockMap); + latch.countDown(); + return bb; + } + }; + } + + private void actualGetFromOneDataNode(final DNAddrPair datanode, + LocatedBlock block, final long start, final long end, byte[] buf, + int offset, Map> corruptedBlockMap) + throws IOException { + DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once - + while (true) { // cached block locations may have been updated by chooseDataNode() - // or fetchBlockAt(). Always get the latest list of locations at the + // or fetchBlockAt(). Always get the latest list of locations at the // start of the loop. CachingStrategy curCachingStrategy; boolean allowShortCircuitLocalReads; @@ -937,11 +1014,10 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, curCachingStrategy = cachingStrategy; allowShortCircuitLocalReads = !shortCircuitForbidden(); } - DNAddrPair retval = chooseDataNode(block); - DatanodeInfo chosenNode = retval.info; - InetSocketAddress targetAddr = retval.addr; + DatanodeInfo chosenNode = datanode.info; + InetSocketAddress targetAddr = datanode.addr; BlockReader reader = null; - + try { Token blockToken = block.getBlockToken(); int len = (int) (end - start + 1); @@ -969,11 +1045,14 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, } return; } catch (ChecksumException e) { - DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + - src + " at " + block.getBlock() + ":" + - e.getPos() + " from " + chosenNode); + String msg = "fetchBlockByteRange(). Got a checksum exception for " + + src + " at " + block.getBlock() + ":" + e.getPos() + " from " + + chosenNode; + DFSClient.LOG.warn(msg); // we want to remember what we have tried addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); + addToDeadNodes(chosenNode); + throw new IOException(msg); } catch (IOException e) { if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " @@ -985,22 +1064,164 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, continue; } else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) { refetchToken--; - fetchBlockAt(block.getStartOffset()); + try { + fetchBlockAt(block.getStartOffset()); + } catch (IOException fbae) { + // ignore IOE, since we can retry it later in a loop + } continue; } else { - DFSClient.LOG.warn("Failed to connect to " + targetAddr + - " for file " + src + " for block " + block.getBlock() + ":" + e); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connection failure ", e); - } + String msg = "Failed to connect to " + targetAddr + " for file " + + src + " for block " + block.getBlock() + ":" + e; + DFSClient.LOG.warn("Connection failure: " + msg, e); + addToDeadNodes(chosenNode); + throw new IOException(msg); } } finally { if (reader != null) { reader.close(); } } - // Put chosen node into dead list, continue - addToDeadNodes(chosenNode); + } + } + + /** + * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[], + * int, Map)} except we start up a second, parallel, 'hedged' read + * if the first read is taking longer than configured amount of + * time. We then wait on which ever read returns first. + * + * @param block + * @param start + * @param end + * @param buf + * @param offset + * @param corruptedBlockMap + * @throws IOException + */ + private void hedgedFetchBlockByteRange(LocatedBlock block, long start, + long end, byte[] buf, int offset, + Map> corruptedBlockMap) + throws IOException { + ArrayList> futures = null; + ArrayList ignored = new ArrayList(); + ByteBuffer bb = null; + int len = (int) (end - start + 1); + block = getBlockAt(block.getStartOffset(), false); + // Latch shared by all outstanding reads. First to finish closes + CountDownLatch hasReceivedResult = new CountDownLatch(1); + while (true) { + DNAddrPair chosenNode = null; + Future future = null; + // futures is null if there is no request already executing. + if (futures == null) { + // chooseDataNode is a commitment. If no node, we go to + // the NN to reget block locations. Only go here on first read. + chosenNode = chooseDataNode(block, ignored); + bb = ByteBuffer.wrap(buf, offset, len); + future = getHedgedReadFuture(chosenNode, block, start, end, bb, + corruptedBlockMap, hasReceivedResult); + try { + future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); + return; + } catch (TimeoutException e) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + + "ms to read from " + chosenNode.info + "; spawning hedged read"); + } + // Ignore this node on next go around. + ignored.add(chosenNode.info); + dfsClient.getHedgedReadMetrics().incHedgedReadOps(); + futures = new ArrayList>(); + futures.add(future); + continue; // no need to refresh block locations + } catch (InterruptedException e) { + // Ignore + } catch (ExecutionException e) { + // Ignore already logged in the call. + } + } else { + // We are starting up a 'hedged' read. We have a read already + // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. + // If no nodes to do hedged reads against, pass. + try { + chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + bb = ByteBuffer.allocate(len); + future = getHedgedReadFuture(chosenNode, block, start, end, bb, + corruptedBlockMap, hasReceivedResult); + futures.add(future); + } catch (IOException ioe) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Failed getting node for hedged read: " + + ioe.getMessage()); + } + } + // if not succeeded. Submit callables for each datanode in a loop, wait + // for a fixed interval and get the result from the fastest one. + try { + ByteBuffer result = getFirstToComplete(futures, hasReceivedResult); + // cancel the rest. + cancelAll(futures); + if (result.array() != buf) { // compare the array pointers + dfsClient.getHedgedReadMetrics().incHedgedReadWins(); + System.arraycopy(result.array(), result.position(), buf, offset, + len); + } else { + dfsClient.getHedgedReadMetrics().incHedgedReadOps(); + } + return; + } catch (InterruptedException ie) { + // Ignore + } catch (ExecutionException e) { + // exception already handled in the call method. getFirstToComplete + // will remove the failing future from the list. nothing more to do. + } + // We got here if exception. Ignore this node on next go around. + ignored.add(chosenNode.info); + } + // executed if we get an error from a data node + block = getBlockAt(block.getStartOffset(), false); + } + } + + private Future getHedgedReadFuture(final DNAddrPair chosenNode, + final LocatedBlock block, long start, + final long end, final ByteBuffer bb, + final Map> corruptedBlockMap, + final CountDownLatch hasReceivedResult) { + Callable getFromDataNodeCallable = + getFromOneDataNode(chosenNode, block, start, end, bb, + corruptedBlockMap, hasReceivedResult); + return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable); + } + + private ByteBuffer getFirstToComplete(ArrayList> futures, + CountDownLatch latch) throws ExecutionException, InterruptedException { + latch.await(); + for (Future future : futures) { + if (future.isDone()) { + try { + return future.get(); + } catch (ExecutionException e) { + // already logged in the Callable + futures.remove(future); + throw e; + } + } + } + throw new InterruptedException("latch has counted down to zero but no" + + "result available yet, for safety try to request another one from" + + "outside loop, this should be rare"); + } + + private void cancelAll(List> futures) { + for (Future future : futures) { + // Unfortunately, hdfs reads do not take kindly to interruption. + // Threads return a variety of interrupted-type exceptions but + // also complaints about invalid pbs -- likely because read + // is interrupted before gets whole pb. Also verbose WARN + // logging. So, for now, do not interrupt running read. + future.cancel(false); } } @@ -1070,8 +1291,13 @@ public int read(long position, byte[] buffer, int offset, int length) long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); try { - fetchBlockByteRange(blk, targetStart, - targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); + if (dfsClient.isHedgedReadsEnabled()) { + hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead + - 1, buffer, offset, corruptedBlockMap); + } else { + fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, + buffer, offset, corruptedBlockMap); + } } finally { // Check and report if any block replicas are corrupted. // BlockMissingException may be caught if all block replicas are @@ -1265,12 +1491,13 @@ public void reset() throws IOException { * Pick the best node from which to stream the data. * Entries in nodes are already in the priority order */ - static DatanodeInfo bestNode(DatanodeInfo nodes[], - AbstractMap deadNodes) - throws IOException { - if (nodes != null) { + static DatanodeInfo bestNode(DatanodeInfo nodes[], + AbstractMap deadNodes, + Collection ignoredNodes) throws IOException { + if (nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (!deadNodes.containsKey(nodes[i])) { + if (!deadNodes.containsKey(nodes[i]) + && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { return nodes[i]; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index 9afa493391a..e3b321c4abf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; @@ -33,6 +38,9 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.log4j.Level; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * This class tests the DFS positional read functionality in a single node @@ -44,9 +52,10 @@ public class TestPread { boolean simulatedStorage = false; private void writeFile(FileSystem fileSys, Path name) throws IOException { + int replication = 3;// We need > 1 blocks to test out the hedged reads. // test empty file open and read DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0, - blockSize, (short) 1, seed); + blockSize, (short)replication, seed); FSDataInputStream in = fileSys.open(name); byte[] buffer = new byte[12 * blockSize]; in.readFully(0, buffer, 0, 0); @@ -191,26 +200,128 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException { assertTrue(fileSys.delete(name, true)); assertTrue(!fileSys.exists(name)); } - + + private Callable getPReadFileCallable(final FileSystem fileSys, + final Path file) { + return new Callable() { + public Void call() throws IOException { + pReadFile(fileSys, file); + return null; + } + }; + } + /** * Tests positional read in DFS. */ @Test public void testPreadDFS() throws IOException { - dfsPreadTest(false, true); //normal pread - dfsPreadTest(true, true); //trigger read code path without transferTo. + Configuration conf = new Configuration(); + dfsPreadTest(conf, false, true); // normal pread + dfsPreadTest(conf, true, true); // trigger read code path without + // transferTo. } @Test public void testPreadDFSNoChecksum() throws IOException { + Configuration conf = new Configuration(); ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL); - dfsPreadTest(false, false); - dfsPreadTest(true, false); + dfsPreadTest(conf, false, false); + dfsPreadTest(conf, true, false); } - private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum) + /** + * Tests positional read in DFS, with hedged reads enabled. + */ + @Test + public void testHedgedPreadDFSBasic() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 100); + dfsPreadTest(conf, false, true); // normal pread + dfsPreadTest(conf, true, true); // trigger read code path without + // transferTo. + } + + @Test + public void testMaxOutHedgedReadPool() throws IOException, + InterruptedException, ExecutionException { + Configuration conf = new Configuration(); + int numHedgedReadPoolThreads = 5; + final int initialHedgedReadTimeoutMillis = 500; + final int fixedSleepIntervalMillis = 50; + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, + numHedgedReadPoolThreads); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, + initialHedgedReadTimeoutMillis); + + // Set up the InjectionHandler + DFSClientFaultInjector.instance = Mockito + .mock(DFSClientFaultInjector.class); + DFSClientFaultInjector injector = DFSClientFaultInjector.instance; + // make preads sleep for 50ms + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(fixedSleepIntervalMillis); + return null; + } + }).when(injector).startFetchFromDatanode(); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .format(true).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + DFSClient dfsClient = fileSys.getClient(); + DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); + + try { + Path file1 = new Path("hedgedReadMaxOut.dat"); + writeFile(fileSys, file1); + // Basic test. Reads complete within timeout. Assert that there were no + // hedged reads. + pReadFile(fileSys, file1); + // assert that there were no hedged reads. 50ms + delta < 500ms + assertTrue(metrics.getHedgedReadOps() == 0); + assertTrue(metrics.getHedgedReadOpsInCurThread() == 0); + /* + * Reads take longer than timeout. But, only one thread reading. Assert + * that there were hedged reads. But, none of the reads had to run in the + * current thread. + */ + dfsClient.setHedgedReadTimeout(50); // 50ms + pReadFile(fileSys, file1); + // assert that there were hedged reads + assertTrue(metrics.getHedgedReadOps() > 0); + assertTrue(metrics.getHedgedReadOpsInCurThread() == 0); + /* + * Multiple threads reading. Reads take longer than timeout. Assert that + * there were hedged reads. And that reads had to run in the current + * thread. + */ + int factor = 10; + int numHedgedReads = numHedgedReadPoolThreads * factor; + long initialReadOpsValue = metrics.getHedgedReadOps(); + ExecutorService executor = Executors.newFixedThreadPool(numHedgedReads); + ArrayList> futures = new ArrayList>(); + for (int i = 0; i < numHedgedReads; i++) { + futures.add(executor.submit(getPReadFileCallable(fileSys, file1))); + } + for (int i = 0; i < numHedgedReads; i++) { + futures.get(i).get(); + } + assertTrue(metrics.getHedgedReadOps() > initialReadOpsValue); + assertTrue(metrics.getHedgedReadOpsInCurThread() > 0); + cleanupFile(fileSys, file1); + executor.shutdown(); + } finally { + fileSys.close(); + cluster.shutdown(); + Mockito.reset(injector); + } + } + + private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum) throws IOException { - Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); if (simulatedStorage) {