HDFS-6591. Merging change r1606927 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1606931 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-06-30 20:57:50 +00:00
parent 959f5ae65c
commit 5a918b6b09
4 changed files with 168 additions and 77 deletions

View File

@ -487,6 +487,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6558. Missing newline in the description of dfsadmin -rollingUpgrade.
(Chen He via kihwal)
HDFS-6591. while loop is executed tens of thousands of times in Hedged Read
(Liang Xie via cnauroth)
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
@ -29,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class DFSClientFaultInjector {
public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
public static AtomicLong exceptionNum = new AtomicLong(0);
public static DFSClientFaultInjector get() {
return instance;
@ -47,4 +50,6 @@ public class DFSClientFaultInjector {
}
public void startFetchFromDatanode() {}
public void fetchFromDatanodeException() {}
}

View File

@ -32,12 +32,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@ -81,6 +83,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess {
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
private final DFSClient dfsClient;
private boolean closed = false;
private final String src;
@ -976,20 +979,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final CountDownLatch latch) {
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
try {
byte[] buf = bb.array();
int offset = bb.position();
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
return bb;
} finally {
latch.countDown();
}
byte[] buf = bb.array();
int offset = bb.position();
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
return bb;
}
};
}
@ -1018,6 +1016,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
reader = new BlockReaderFactory(dfsClient.getConf()).
@ -1097,35 +1096,43 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
CompletionService<ByteBuffer> hedgedService =
new ExecutorCompletionService<ByteBuffer>(
dfsClient.getHedgedReadsThreadPool());
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null;
int len = (int) (end - start + 1);
block = getBlockAt(block.getStartOffset(), false);
// Latch shared by all outstanding reads. First to finish closes
CountDownLatch hasReceivedResult = new CountDownLatch(1);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
DNAddrPair chosenNode = null;
Future<ByteBuffer> future = null;
// futures is null if there is no request already executing.
// there is no request already executing.
if (futures.isEmpty()) {
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len);
future = getHedgedReadFuture(chosenNode, block, start, end, bb,
corruptedBlockMap, hasReceivedResult);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
try {
future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
return;
} catch (TimeoutException e) {
Future<ByteBuffer> future = hedgedService.poll(
dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
if (future != null) {
future.get();
return;
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() +
"ms to read from " + chosenNode.info + "; spawning hedged read");
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
+ "ms to read from " + chosenNode.info
+ "; spawning hedged read");
}
// Ignore this node on next go around.
ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
futures.add(future);
continue; // no need to refresh block locations
} catch (InterruptedException e) {
// Ignore
@ -1133,25 +1140,31 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// Ignore already logged in the call.
}
} else {
// We are starting up a 'hedged' read. We have a read already
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
try {
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
try {
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored);
}
bb = ByteBuffer.allocate(len);
future = getHedgedReadFuture(chosenNode, block, start, end, bb,
corruptedBlockMap, hasReceivedResult);
futures.add(future);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap);
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} catch (IOException ioe) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Failed getting node for hedged read: " +
ioe.getMessage());
DFSClient.LOG.debug("Failed getting node for hedged read: "
+ ioe.getMessage());
}
}
// if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one.
try {
ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
ByteBuffer result = getFirstToComplete(hedgedService, futures);
// cancel the rest.
cancelAll(futures);
if (result.array() != buf) { // compare the array pointers
@ -1163,50 +1176,43 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
return;
} catch (InterruptedException ie) {
// Ignore
} catch (ExecutionException e) {
// exception already handled in the call method. getFirstToComplete
// will remove the failing future from the list. nothing more to do.
// Ignore and retry
}
// We got here if exception. Ignore this node on next go around IFF
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) {
ignored.add(chosenNode.info);
}
}
// executed if we get an error from a data node
block = getBlockAt(block.getStartOffset(), false);
}
}
private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode,
final LocatedBlock block, long start,
final long end, final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final CountDownLatch hasReceivedResult) {
Callable<ByteBuffer> getFromDataNodeCallable =
getFromOneDataNode(chosenNode, block, start, end, bb,
corruptedBlockMap, hasReceivedResult);
return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
@VisibleForTesting
public long getHedgedReadOpsLoopNumForTesting() {
return hedgedReadOpsLoopNumForTesting;
}
private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures,
CountDownLatch latch) throws ExecutionException, InterruptedException {
latch.await();
for (Future<ByteBuffer> future : futures) {
if (future.isDone()) {
try {
return future.get();
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
throw e;
}
}
private ByteBuffer getFirstToComplete(
CompletionService<ByteBuffer> hedgedService,
ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("let's retry");
}
throw new InterruptedException("latch has counted down to zero but no"
+ "result available yet, for safety try to request another one from"
+ "outside loop, this should be rare");
Future<ByteBuffer> future = null;
try {
future = hedgedService.take();
ByteBuffer bb = future.get();
futures.remove(future);
return bb;
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
} catch (CancellationException ce) {
// already logged in the Callable
futures.remove(future);
}
throw new InterruptedException("let's retry");
}
private void cancelAll(List<Future<ByteBuffer>> futures) {

View File

@ -32,12 +32,16 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -50,7 +54,14 @@ import org.mockito.stubbing.Answer;
public class TestPread {
static final long seed = 0xDEADBEEFL;
static final int blockSize = 4096;
boolean simulatedStorage = false;
boolean simulatedStorage;
boolean isHedgedRead;
@Before
public void setup() {
simulatedStorage = false;
isHedgedRead = false;
}
private void writeFile(FileSystem fileSys, Path name) throws IOException {
int replication = 3;// We need > 1 blocks to test out the hedged reads.
@ -75,11 +86,8 @@ public class TestPread {
assertTrue("Cannot delete file", false);
// now create the real file
stm = fileSys.create(name, true, 4096, (short)1, blockSize);
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
stm.close();
DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize,
blockSize, (short) replication, seed);
}
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
@ -110,8 +118,13 @@ public class TestPread {
}
if (dfstm != null) {
assertEquals("Expected read statistic to be incremented", length, dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
if (isHedgedRead) {
assertTrue("Expected read statistic to be incremented", length <= dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
} else {
assertEquals("Expected read statistic to be incremented", length, dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
}
}
}
@ -214,7 +227,7 @@ public class TestPread {
stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
}
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
assertTrue(fileSys.delete(name, true));
@ -255,6 +268,7 @@ public class TestPread {
*/
@Test
public void testHedgedPreadDFSBasic() throws IOException {
isHedgedRead = true;
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1);
@ -263,9 +277,73 @@ public class TestPread {
// transferTo.
}
@Test
public void testHedgedReadLoopTooManyTimes() throws IOException {
Configuration conf = new Configuration();
int numHedgedReadPoolThreads = 5;
final int hedgedReadTimeoutMillis = 50;
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
numHedgedReadPoolThreads);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
hedgedReadTimeoutMillis);
// Set up the InjectionHandler
DFSClientFaultInjector.instance = Mockito
.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (true) {
Thread.sleep(hedgedReadTimeoutMillis + 1);
if (DFSClientFaultInjector.exceptionNum.compareAndSet(0, 1)) {
System.out.println("-------------- throw Checksum Exception");
throw new ChecksumException("ChecksumException test", 100);
}
}
return null;
}
}).when(injector).fetchFromDatanodeException();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
FSDataOutputStream output = null;
DFSInputStream input = null;
String filename = "/hedgedReadMaxOut.dat";
try {
Path file = new Path(filename);
output = fileSys.create(file, (short) 2);
byte[] data = new byte[64 * 1024];
output.write(data);
output.flush();
output.write(data);
output.flush();
output.write(data);
output.flush();
output.close();
byte[] buffer = new byte[64 * 1024];
input = dfsClient.open(filename);
input.read(0, buffer, 0, 1024);
input.close();
assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
} catch (BlockMissingException e) {
assertTrue(false);
} finally {
IOUtils.cleanup(null, input);
IOUtils.cleanup(null, output);
fileSys.close();
cluster.shutdown();
Mockito.reset(injector);
}
}
@Test
public void testMaxOutHedgedReadPool() throws IOException,
InterruptedException, ExecutionException {
isHedgedRead = true;
Configuration conf = new Configuration();
int numHedgedReadPoolThreads = 5;
final int initialHedgedReadTimeoutMillis = 50000;
@ -373,7 +451,6 @@ public class TestPread {
public void testPreadDFSSimulated() throws IOException {
simulatedStorage = true;
testPreadDFS();
simulatedStorage = false;
}
/**