HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the DFSClient about it because of a network error (cmccabe)

(cherry picked from commit bc9cb3e271)
(cherry picked from commit c71b54fa32)
This commit is contained in:
Colin Patrick Mccabe 2015-03-14 22:36:46 -07:00 committed by Vinod Kumar Vavilapalli
parent e0e93e9f8d
commit 9df4b7c10e
10 changed files with 179 additions and 43 deletions

View File

@ -123,6 +123,9 @@ Release 2.6.1 - UNRELEASED
HDFS-8127. NameNode Failover during HA upgrade can cause DataNode to HDFS-8127. NameNode Failover during HA upgrade can cause DataNode to
finalize upgrade. (jing9) finalize upgrade. (jing9)
HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and
fail to tell the DFSClient about it because of a network error (cmccabe)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -68,12 +70,23 @@ import com.google.common.base.Preconditions;
public class BlockReaderFactory implements ShortCircuitReplicaCreator { public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class); static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
public static class FailureInjector {
public void injectRequestFileDescriptorsFailure() throws IOException {
// do nothing
}
}
@VisibleForTesting @VisibleForTesting
static ShortCircuitReplicaCreator static ShortCircuitReplicaCreator
createShortCircuitReplicaInfoCallback = null; createShortCircuitReplicaInfoCallback = null;
private final DFSClient.Conf conf; private final DFSClient.Conf conf;
/**
* Injects failures into specific operations during unit tests.
*/
private final FailureInjector failureInjector;
/** /**
* The file name, for logging and debugging purposes. * The file name, for logging and debugging purposes.
*/ */
@ -168,6 +181,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
public BlockReaderFactory(DFSClient.Conf conf) { public BlockReaderFactory(DFSClient.Conf conf) {
this.conf = conf; this.conf = conf;
this.failureInjector = conf.brfFailureInjector;
this.remainingCacheTries = conf.nCachedConnRetry; this.remainingCacheTries = conf.nCachedConnRetry;
} }
@ -517,11 +531,12 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
SlotId slotId = slot == null ? null : slot.getSlotId(); SlotId slotId = slot == null ? null : slot.getSlotId();
new Sender(out).requestShortCircuitFds(block, token, slotId, 1); new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true);
DataInputStream in = new DataInputStream(peer.getInputStream()); DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in)); PBHelper.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket(); DomainSocket sock = peer.getDomainSocket();
failureInjector.injectRequestFileDescriptorsFailure();
switch (resp.getStatus()) { switch (resp.getStatus()) {
case SUCCESS: case SUCCESS:
byte buf[] = new byte[1]; byte buf[] = new byte[1];
@ -531,8 +546,13 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
try { try {
ExtendedBlockId key = ExtendedBlockId key =
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
LOG.trace("Sending receipt verification byte for slot " + slot);
sock.getOutputStream().write(0);
}
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
Time.monotonicNow(), slot); Time.monotonicNow(), slot);
return new ShortCircuitReplicaInfo(replica);
} catch (IOException e) { } catch (IOException e) {
// This indicates an error reading from disk, or a format error. Since // This indicates an error reading from disk, or a format error. Since
// it's not a socket communication problem, we return null rather than // it's not a socket communication problem, we return null rather than
@ -544,7 +564,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
} }
} }
return new ShortCircuitReplicaInfo(replica);
case ERROR_UNSUPPORTED: case ERROR_UNSUPPORTED:
if (!resp.hasShortCircuitAccessVersion()) { if (!resp.hasShortCircuitAccessVersion()) {
LOG.warn("short-circuit read access is disabled for " + LOG.warn("short-circuit read access is disabled for " +

View File

@ -328,6 +328,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final long shortCircuitMmapCacheRetryTimeout; final long shortCircuitMmapCacheRetryTimeout;
final long shortCircuitCacheStaleThresholdMs; final long shortCircuitCacheStaleThresholdMs;
public BlockReaderFactory.FailureInjector brfFailureInjector =
new BlockReaderFactory.FailureInjector();
public Conf(Configuration conf) { public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout // The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf); hdfsTimeout = Client.getTimeout(conf);

View File

@ -134,10 +134,13 @@ public interface DataTransferProtocol {
* to use no slot id. * to use no slot id.
* @param maxVersion Maximum version of the block data the client * @param maxVersion Maximum version of the block data the client
* can understand. * can understand.
* @param supportsReceiptVerification True if the client supports
* receipt verification.
*/ */
public void requestShortCircuitFds(final ExtendedBlock blk, public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
SlotId slotId, int maxVersion) throws IOException; SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException;
/** /**
* Release a pair of short-circuit FDs requested earlier. * Release a pair of short-circuit FDs requested earlier.

View File

@ -184,7 +184,7 @@ public abstract class Receiver implements DataTransferProtocol {
try { try {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()), PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion()); slotId, proto.getMaxVersion(), true);
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }

View File

@ -177,7 +177,8 @@ public class Sender implements DataTransferProtocol {
@Override @Override
public void requestShortCircuitFds(final ExtendedBlock blk, public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
SlotId slotId, int maxVersion) throws IOException { SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException {
OpRequestShortCircuitAccessProto.Builder builder = OpRequestShortCircuitAccessProto.Builder builder =
OpRequestShortCircuitAccessProto.newBuilder() OpRequestShortCircuitAccessProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader( .setHeader(DataTransferProtoUtil.buildBaseHeader(
@ -185,6 +186,7 @@ public class Sender implements DataTransferProtocol {
if (slotId != null) { if (slotId != null) {
builder.setSlotId(PBHelper.convert(slotId)); builder.setSlotId(PBHelper.convert(slotId));
} }
builder.setSupportsReceiptVerification(supportsReceiptVerification);
OpRequestShortCircuitAccessProto proto = builder.build(); OpRequestShortCircuitAccessProto proto = builder.build();
send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
} }

View File

@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ER
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.now;
@ -268,10 +270,14 @@ class DataXceiver extends Receiver implements Runnable {
@Override @Override
public void requestShortCircuitFds(final ExtendedBlock blk, public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token, final Token<BlockTokenIdentifier> token,
SlotId slotId, int maxVersion) throws IOException { SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException {
updateCurrentThreadName("Passing file descriptors for block " + blk); updateCurrentThreadName("Passing file descriptors for block " + blk);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null; FileInputStream fis[] = null;
SlotId registeredSlotId = null;
boolean success = false;
try {
try { try {
if (peer.getDomainSocket() == null) { if (peer.getDomainSocket() == null) {
throw new IOException("You cannot pass file descriptors over " + throw new IOException("You cannot pass file descriptors over " +
@ -282,14 +288,10 @@ class DataXceiver extends Receiver implements Runnable {
isCached(blk.getBlockPoolId(), blk.getBlockId()); isCached(blk.getBlockPoolId(), blk.getBlockId());
datanode.shortCircuitRegistry.registerSlot( datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached); ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
registeredSlotId = slotId;
} }
try {
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
} finally { Preconditions.checkState(fis != null);
if ((fis == null) && (slotId != null)) {
datanode.shortCircuitRegistry.unregisterSlot(slotId);
}
}
bld.setStatus(SUCCESS); bld.setStatus(SUCCESS);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
} catch (ShortCircuitFdsVersionException e) { } catch (ShortCircuitFdsVersionException e) {
@ -306,26 +308,45 @@ class DataXceiver extends Receiver implements Runnable {
bld.setStatus(ERROR); bld.setStatus(ERROR);
bld.setMessage(e.getMessage()); bld.setMessage(e.getMessage());
} }
try {
bld.build().writeDelimitedTo(socketOut); bld.build().writeDelimitedTo(socketOut);
if (fis != null) { if (fis != null) {
FileDescriptor fds[] = new FileDescriptor[fis.length]; FileDescriptor fds[] = new FileDescriptor[fis.length];
for (int i = 0; i < fds.length; i++) { for (int i = 0; i < fds.length; i++) {
fds[i] = fis[i].getFD(); fds[i] = fis[i].getFD();
} }
byte buf[] = new byte[] { (byte)0 }; byte buf[] = new byte[1];
peer.getDomainSocket(). if (supportsReceiptVerification) {
sendFileDescriptors(fds, buf, 0, buf.length); buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber();
} else {
buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber();
}
DomainSocket sock = peer.getDomainSocket();
sock.sendFileDescriptors(fds, buf, 0, buf.length);
if (supportsReceiptVerification) {
LOG.trace("Reading receipt verification byte for " + slotId);
int val = sock.getInputStream().read();
if (val < 0) {
throw new EOFException();
}
} else {
LOG.trace("Receipt verification is not enabled on the DataNode. " +
"Not verifying " + slotId);
}
success = true;
} }
} finally { } finally {
if ((!success) && (registeredSlotId != null)) {
LOG.info("Unregistering " + registeredSlotId + " because the " +
"requestShortCircuitFdsForRead operation failed.");
datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
}
if (ClientTraceLog.isInfoEnabled()) { if (ClientTraceLog.isInfoEnabled()) {
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId()); .getBlockPoolId());
BlockSender.ClientTraceLog.info(String.format( BlockSender.ClientTraceLog.info(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," + "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
" blockid: %s, srvID: %s, success: %b", " blockid: %s, srvID: %s, success: %b",
blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null) blk.getBlockId(), dnR.getDatanodeUuid(), success));
));
} }
if (fis != null) { if (fis != null) {
IOUtils.cleanup(LOG, fis); IOUtils.cleanup(LOG, fis);

View File

@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -83,7 +84,7 @@ public class ShortCircuitRegistry {
private static final int SHM_LENGTH = 8192; private static final int SHM_LENGTH = 8192;
private static class RegisteredShm extends ShortCircuitShm public static class RegisteredShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler { implements DomainSocketWatcher.Handler {
private final String clientName; private final String clientName;
private final ShortCircuitRegistry registry; private final ShortCircuitRegistry registry;
@ -383,4 +384,14 @@ public class ShortCircuitRegistry {
} }
IOUtils.closeQuietly(watcher); IOUtils.closeQuietly(watcher);
} }
public static interface Visitor {
void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots);
}
@VisibleForTesting
public synchronized void visit(Visitor visitor) {
visitor.accept(segments, slots);
}
} }

View File

@ -176,6 +176,12 @@ message OpRequestShortCircuitAccessProto {
* The shared memory slot to use, if we are using one. * The shared memory slot to use, if we are using one.
*/ */
optional ShortCircuitShmSlotProto slotId = 3; optional ShortCircuitShmSlotProto slotId = 3;
/**
* True if the client supports verifying that the file descriptor has been
* sent successfully.
*/
optional bool supportsReceiptVerification = 4 [default = false];
} }
message ReleaseShortCircuitAccessRequestProto { message ReleaseShortCircuitAccessRequestProto {
@ -226,6 +232,11 @@ enum Status {
IN_PROGRESS = 12; IN_PROGRESS = 12;
} }
enum ShortCircuitFdResponse {
DO_NOT_USE_RECEIPT_VERIFICATION = 0;
USE_RECEIPT_VERIFICATION = 1;
}
message PipelineAckProto { message PipelineAckProto {
required sint64 seqno = 1; required sint64 seqno = 1;
repeated Status status = 2; repeated Status status = 2;

View File

@ -36,13 +36,16 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import com.google.common.collect.HashMultimap;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -52,11 +55,14 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@ -614,4 +620,61 @@ public class TestShortCircuitCache {
cluster.shutdown(); cluster.shutdown();
sockDir.close(); sockDir.close();
} }
public static class TestCleanupFailureInjector
extends BlockReaderFactory.FailureInjector {
@Override
public void injectRequestFileDescriptorsFailure() throws IOException {
throw new IOException("injected I/O error");
}
}
// Regression test for HDFS-7915
@Test(timeout=60000)
public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testDataXceiverCleansUpSlotsOnFailure", sockDir);
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final Path TEST_PATH1 = new Path("/test_file1");
final Path TEST_PATH2 = new Path("/test_file2");
final int TEST_FILE_LEN = 4096;
final int SEED = 0xFADE1;
DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN,
(short)1, SEED);
DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN,
(short)1, SEED);
// The first read should allocate one shared memory segment and slot.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
// The second read should fail, and we should only have 1 segment and 1 slot
// left.
fs.getClient().getConf().brfFailureInjector =
new TestCleanupFailureInjector();
try {
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
}
ShortCircuitRegistry registry =
cluster.getDataNodes().get(0).getShortCircuitRegistry();
registry.visit(new ShortCircuitRegistry.Visitor() {
@Override
public void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots) {
Assert.assertEquals(1, segments.size());
Assert.assertEquals(1, slots.size());
}
});
cluster.shutdown();
sockDir.close();
}
} }