diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4df47a92f6a..13fd0334bcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -487,6 +487,9 @@ Release 2.5.0 - UNRELEASED HDFS-6558. Missing newline in the description of dfsadmin -rollingUpgrade. (Chen He via kihwal) + HDFS-6591. while loop is executed tens of thousands of times in Hedged Read + (Liang Xie via cnauroth) + BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index dba0c36d339..ebdc98f95f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import java.util.concurrent.atomic.AtomicLong; + import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -29,6 +31,7 @@ @InterfaceAudience.Private public class DFSClientFaultInjector { public static DFSClientFaultInjector instance = new DFSClientFaultInjector(); + public static AtomicLong exceptionNum = new AtomicLong(0); public static DFSClientFaultInjector get() { return instance; @@ -47,4 +50,6 @@ public boolean failPacket() { } public void startFetchFromDatanode() {} + + public void fetchFromDatanodeException() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 1750aa7a30b..1dcf373896a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -32,12 +32,14 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -81,6 +83,7 @@ public class DFSInputStream extends FSInputStream HasEnhancedByteBufferAccess { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; + private long hedgedReadOpsLoopNumForTesting = 0; private final DFSClient dfsClient; private boolean closed = false; private final String src; @@ -976,20 +979,15 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, private Callable getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final ByteBuffer bb, - final Map> corruptedBlockMap, - final CountDownLatch latch) { + final Map> corruptedBlockMap) { return new Callable() { @Override public ByteBuffer call() throws Exception { - try { - byte[] buf = bb.array(); - int offset = bb.position(); - actualGetFromOneDataNode(datanode, block, start, end, buf, offset, - corruptedBlockMap); - return bb; - } finally { - latch.countDown(); - } + byte[] buf = bb.array(); + int offset = bb.position(); + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, + corruptedBlockMap); + return bb; } }; } @@ -1018,6 +1016,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode, BlockReader reader = null; try { + DFSClientFaultInjector.get().fetchFromDatanodeException(); Token blockToken = block.getBlockToken(); int len = (int) (end - start + 1); reader = new BlockReaderFactory(dfsClient.getConf()). @@ -1097,35 +1096,43 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, Map> corruptedBlockMap) throws IOException { ArrayList> futures = new ArrayList>(); + CompletionService hedgedService = + new ExecutorCompletionService( + dfsClient.getHedgedReadsThreadPool()); ArrayList ignored = new ArrayList(); ByteBuffer bb = null; int len = (int) (end - start + 1); block = getBlockAt(block.getStartOffset(), false); - // Latch shared by all outstanding reads. First to finish closes - CountDownLatch hasReceivedResult = new CountDownLatch(1); while (true) { + // see HDFS-6591, this metric is used to verify/catch unnecessary loops + hedgedReadOpsLoopNumForTesting++; DNAddrPair chosenNode = null; - Future future = null; - // futures is null if there is no request already executing. + // there is no request already executing. if (futures.isEmpty()) { - // chooseDataNode is a commitment. If no node, we go to - // the NN to reget block locations. Only go here on first read. + // chooseDataNode is a commitment. If no node, we go to + // the NN to reget block locations. Only go here on first read. chosenNode = chooseDataNode(block, ignored); bb = ByteBuffer.wrap(buf, offset, len); - future = getHedgedReadFuture(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); + Callable getFromDataNodeCallable = getFromOneDataNode( + chosenNode, block, start, end, bb, corruptedBlockMap); + Future firstRequest = hedgedService + .submit(getFromDataNodeCallable); + futures.add(firstRequest); try { - future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); - return; - } catch (TimeoutException e) { + Future future = hedgedService.poll( + dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); + if (future != null) { + future.get(); + return; + } if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + - "ms to read from " + chosenNode.info + "; spawning hedged read"); + DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + + "ms to read from " + chosenNode.info + + "; spawning hedged read"); } // Ignore this node on next go around. ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); - futures.add(future); continue; // no need to refresh block locations } catch (InterruptedException e) { // Ignore @@ -1133,25 +1140,31 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, // Ignore already logged in the call. } } else { - // We are starting up a 'hedged' read. We have a read already + // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // If no nodes to do hedged reads against, pass. try { - chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + try { + chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + } catch (IOException ioe) { + chosenNode = chooseDataNode(block, ignored); + } bb = ByteBuffer.allocate(len); - future = getHedgedReadFuture(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); - futures.add(future); + Callable getFromDataNodeCallable = getFromOneDataNode( + chosenNode, block, start, end, bb, corruptedBlockMap); + Future oneMoreRequest = hedgedService + .submit(getFromDataNodeCallable); + futures.add(oneMoreRequest); } catch (IOException ioe) { if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Failed getting node for hedged read: " + - ioe.getMessage()); + DFSClient.LOG.debug("Failed getting node for hedged read: " + + ioe.getMessage()); } } // if not succeeded. Submit callables for each datanode in a loop, wait // for a fixed interval and get the result from the fastest one. try { - ByteBuffer result = getFirstToComplete(futures, hasReceivedResult); + ByteBuffer result = getFirstToComplete(hedgedService, futures); // cancel the rest. cancelAll(futures); if (result.array() != buf) { // compare the array pointers @@ -1163,50 +1176,43 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, } return; } catch (InterruptedException ie) { - // Ignore - } catch (ExecutionException e) { - // exception already handled in the call method. getFirstToComplete - // will remove the failing future from the list. nothing more to do. + // Ignore and retry } - // We got here if exception. Ignore this node on next go around IFF + // We got here if exception. Ignore this node on next go around IFF // we found a chosenNode to hedge read against. if (chosenNode != null && chosenNode.info != null) { ignored.add(chosenNode.info); } } - // executed if we get an error from a data node - block = getBlockAt(block.getStartOffset(), false); } } - private Future getHedgedReadFuture(final DNAddrPair chosenNode, - final LocatedBlock block, long start, - final long end, final ByteBuffer bb, - final Map> corruptedBlockMap, - final CountDownLatch hasReceivedResult) { - Callable getFromDataNodeCallable = - getFromOneDataNode(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); - return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable); + @VisibleForTesting + public long getHedgedReadOpsLoopNumForTesting() { + return hedgedReadOpsLoopNumForTesting; } - private ByteBuffer getFirstToComplete(ArrayList> futures, - CountDownLatch latch) throws ExecutionException, InterruptedException { - latch.await(); - for (Future future : futures) { - if (future.isDone()) { - try { - return future.get(); - } catch (ExecutionException e) { - // already logged in the Callable - futures.remove(future); - throw e; - } - } + private ByteBuffer getFirstToComplete( + CompletionService hedgedService, + ArrayList> futures) throws InterruptedException { + if (futures.isEmpty()) { + throw new InterruptedException("let's retry"); } - throw new InterruptedException("latch has counted down to zero but no" - + "result available yet, for safety try to request another one from" - + "outside loop, this should be rare"); + Future future = null; + try { + future = hedgedService.take(); + ByteBuffer bb = future.get(); + futures.remove(future); + return bb; + } catch (ExecutionException e) { + // already logged in the Callable + futures.remove(future); + } catch (CancellationException ce) { + // already logged in the Callable + futures.remove(future); + } + + throw new InterruptedException("let's retry"); } private void cancelAll(List> futures) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index 50b95eaa56e..ab665fd2911 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -32,12 +32,16 @@ import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -50,7 +54,14 @@ public class TestPread { static final long seed = 0xDEADBEEFL; static final int blockSize = 4096; - boolean simulatedStorage = false; + boolean simulatedStorage; + boolean isHedgedRead; + + @Before + public void setup() { + simulatedStorage = false; + isHedgedRead = false; + } private void writeFile(FileSystem fileSys, Path name) throws IOException { int replication = 3;// We need > 1 blocks to test out the hedged reads. @@ -75,11 +86,8 @@ private void writeFile(FileSystem fileSys, Path name) throws IOException { assertTrue("Cannot delete file", false); // now create the real file - stm = fileSys.create(name, true, 4096, (short)1, blockSize); - Random rand = new Random(seed); - rand.nextBytes(buffer); - stm.write(buffer); - stm.close(); + DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize, + blockSize, (short) replication, seed); } private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { @@ -110,8 +118,13 @@ private void doPread(FSDataInputStream stm, long position, byte[] buffer, } if (dfstm != null) { - assertEquals("Expected read statistic to be incremented", length, dfstm - .getReadStatistics().getTotalBytesRead() - totalRead); + if (isHedgedRead) { + assertTrue("Expected read statistic to be incremented", length <= dfstm + .getReadStatistics().getTotalBytesRead() - totalRead); + } else { + assertEquals("Expected read statistic to be incremented", length, dfstm + .getReadStatistics().getTotalBytesRead() - totalRead); + } } } @@ -214,7 +227,7 @@ private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys, stm.readFully(0, actual); checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test"); } - + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { assertTrue(fileSys.exists(name)); assertTrue(fileSys.delete(name, true)); @@ -255,6 +268,7 @@ public void testPreadDFSNoChecksum() throws IOException { */ @Test public void testHedgedPreadDFSBasic() throws IOException { + isHedgedRead = true; Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1); @@ -263,9 +277,73 @@ public void testHedgedPreadDFSBasic() throws IOException { // transferTo. } + @Test + public void testHedgedReadLoopTooManyTimes() throws IOException { + Configuration conf = new Configuration(); + int numHedgedReadPoolThreads = 5; + final int hedgedReadTimeoutMillis = 50; + + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, + numHedgedReadPoolThreads); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, + hedgedReadTimeoutMillis); + // Set up the InjectionHandler + DFSClientFaultInjector.instance = Mockito + .mock(DFSClientFaultInjector.class); + DFSClientFaultInjector injector = DFSClientFaultInjector.instance; + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (true) { + Thread.sleep(hedgedReadTimeoutMillis + 1); + if (DFSClientFaultInjector.exceptionNum.compareAndSet(0, 1)) { + System.out.println("-------------- throw Checksum Exception"); + throw new ChecksumException("ChecksumException test", 100); + } + } + return null; + } + }).when(injector).fetchFromDatanodeException(); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .format(true).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + DFSClient dfsClient = fileSys.getClient(); + FSDataOutputStream output = null; + DFSInputStream input = null; + String filename = "/hedgedReadMaxOut.dat"; + try { + + Path file = new Path(filename); + output = fileSys.create(file, (short) 2); + byte[] data = new byte[64 * 1024]; + output.write(data); + output.flush(); + output.write(data); + output.flush(); + output.write(data); + output.flush(); + output.close(); + byte[] buffer = new byte[64 * 1024]; + input = dfsClient.open(filename); + input.read(0, buffer, 0, 1024); + input.close(); + assertEquals(3, input.getHedgedReadOpsLoopNumForTesting()); + } catch (BlockMissingException e) { + assertTrue(false); + } finally { + IOUtils.cleanup(null, input); + IOUtils.cleanup(null, output); + fileSys.close(); + cluster.shutdown(); + Mockito.reset(injector); + } + } + @Test public void testMaxOutHedgedReadPool() throws IOException, InterruptedException, ExecutionException { + isHedgedRead = true; Configuration conf = new Configuration(); int numHedgedReadPoolThreads = 5; final int initialHedgedReadTimeoutMillis = 50000; @@ -373,7 +451,6 @@ private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean public void testPreadDFSSimulated() throws IOException { simulatedStorage = true; testPreadDFS(); - simulatedStorage = false; } /**