HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the DFSClient about it because of a network error (cmccabe)
This commit is contained in:
parent
6fdef76cc3
commit
5aa892ed48
|
@ -1177,6 +1177,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7722. DataNode#checkDiskError should also remove Storage when error
|
HDFS-7722. DataNode#checkDiskError should also remove Storage when error
|
||||||
is found. (Lei Xu via Colin P. McCabe)
|
is found. (Lei Xu via Colin P. McCabe)
|
||||||
|
|
||||||
|
HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and
|
||||||
|
fail to tell the DFSClient about it because of a network error (cmccabe)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
@ -69,12 +71,23 @@ import com.google.common.base.Preconditions;
|
||||||
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
|
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
|
||||||
|
|
||||||
|
public static class FailureInjector {
|
||||||
|
public void injectRequestFileDescriptorsFailure() throws IOException {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static ShortCircuitReplicaCreator
|
static ShortCircuitReplicaCreator
|
||||||
createShortCircuitReplicaInfoCallback = null;
|
createShortCircuitReplicaInfoCallback = null;
|
||||||
|
|
||||||
private final DFSClient.Conf conf;
|
private final DFSClient.Conf conf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Injects failures into specific operations during unit tests.
|
||||||
|
*/
|
||||||
|
private final FailureInjector failureInjector;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The file name, for logging and debugging purposes.
|
* The file name, for logging and debugging purposes.
|
||||||
*/
|
*/
|
||||||
|
@ -169,6 +182,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
|
|
||||||
public BlockReaderFactory(DFSClient.Conf conf) {
|
public BlockReaderFactory(DFSClient.Conf conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
this.failureInjector = conf.brfFailureInjector;
|
||||||
this.remainingCacheTries = conf.nCachedConnRetry;
|
this.remainingCacheTries = conf.nCachedConnRetry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -518,11 +532,12 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
final DataOutputStream out =
|
final DataOutputStream out =
|
||||||
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
|
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
|
||||||
SlotId slotId = slot == null ? null : slot.getSlotId();
|
SlotId slotId = slot == null ? null : slot.getSlotId();
|
||||||
new Sender(out).requestShortCircuitFds(block, token, slotId, 1);
|
new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true);
|
||||||
DataInputStream in = new DataInputStream(peer.getInputStream());
|
DataInputStream in = new DataInputStream(peer.getInputStream());
|
||||||
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
|
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
|
||||||
PBHelper.vintPrefixed(in));
|
PBHelper.vintPrefixed(in));
|
||||||
DomainSocket sock = peer.getDomainSocket();
|
DomainSocket sock = peer.getDomainSocket();
|
||||||
|
failureInjector.injectRequestFileDescriptorsFailure();
|
||||||
switch (resp.getStatus()) {
|
switch (resp.getStatus()) {
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
byte buf[] = new byte[1];
|
byte buf[] = new byte[1];
|
||||||
|
@ -532,8 +547,13 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
try {
|
try {
|
||||||
ExtendedBlockId key =
|
ExtendedBlockId key =
|
||||||
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
||||||
|
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
|
||||||
|
LOG.trace("Sending receipt verification byte for slot " + slot);
|
||||||
|
sock.getOutputStream().write(0);
|
||||||
|
}
|
||||||
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
|
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
|
||||||
Time.monotonicNow(), slot);
|
Time.monotonicNow(), slot);
|
||||||
|
return new ShortCircuitReplicaInfo(replica);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// This indicates an error reading from disk, or a format error. Since
|
// This indicates an error reading from disk, or a format error. Since
|
||||||
// it's not a socket communication problem, we return null rather than
|
// it's not a socket communication problem, we return null rather than
|
||||||
|
@ -545,7 +565,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
|
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new ShortCircuitReplicaInfo(replica);
|
|
||||||
case ERROR_UNSUPPORTED:
|
case ERROR_UNSUPPORTED:
|
||||||
if (!resp.hasShortCircuitAccessVersion()) {
|
if (!resp.hasShortCircuitAccessVersion()) {
|
||||||
LOG.warn("short-circuit read access is disabled for " +
|
LOG.warn("short-circuit read access is disabled for " +
|
||||||
|
|
|
@ -337,6 +337,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
final long shortCircuitCacheStaleThresholdMs;
|
final long shortCircuitCacheStaleThresholdMs;
|
||||||
|
|
||||||
final long keyProviderCacheExpiryMs;
|
final long keyProviderCacheExpiryMs;
|
||||||
|
public BlockReaderFactory.FailureInjector brfFailureInjector =
|
||||||
|
new BlockReaderFactory.FailureInjector();
|
||||||
|
|
||||||
public Conf(Configuration conf) {
|
public Conf(Configuration conf) {
|
||||||
// The hdfsTimeout is currently the same as the ipc timeout
|
// The hdfsTimeout is currently the same as the ipc timeout
|
||||||
|
|
|
@ -138,10 +138,13 @@ public interface DataTransferProtocol {
|
||||||
* to use no slot id.
|
* to use no slot id.
|
||||||
* @param maxVersion Maximum version of the block data the client
|
* @param maxVersion Maximum version of the block data the client
|
||||||
* can understand.
|
* can understand.
|
||||||
|
* @param supportsReceiptVerification True if the client supports
|
||||||
|
* receipt verification.
|
||||||
*/
|
*/
|
||||||
public void requestShortCircuitFds(final ExtendedBlock blk,
|
public void requestShortCircuitFds(final ExtendedBlock blk,
|
||||||
final Token<BlockTokenIdentifier> blockToken,
|
final Token<BlockTokenIdentifier> blockToken,
|
||||||
SlotId slotId, int maxVersion) throws IOException;
|
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release a pair of short-circuit FDs requested earlier.
|
* Release a pair of short-circuit FDs requested earlier.
|
||||||
|
|
|
@ -186,7 +186,7 @@ public abstract class Receiver implements DataTransferProtocol {
|
||||||
try {
|
try {
|
||||||
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
|
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
|
||||||
PBHelper.convert(proto.getHeader().getToken()),
|
PBHelper.convert(proto.getHeader().getToken()),
|
||||||
slotId, proto.getMaxVersion());
|
slotId, proto.getMaxVersion(), true);
|
||||||
} finally {
|
} finally {
|
||||||
if (traceScope != null) traceScope.close();
|
if (traceScope != null) traceScope.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -181,7 +181,8 @@ public class Sender implements DataTransferProtocol {
|
||||||
@Override
|
@Override
|
||||||
public void requestShortCircuitFds(final ExtendedBlock blk,
|
public void requestShortCircuitFds(final ExtendedBlock blk,
|
||||||
final Token<BlockTokenIdentifier> blockToken,
|
final Token<BlockTokenIdentifier> blockToken,
|
||||||
SlotId slotId, int maxVersion) throws IOException {
|
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
||||||
|
throws IOException {
|
||||||
OpRequestShortCircuitAccessProto.Builder builder =
|
OpRequestShortCircuitAccessProto.Builder builder =
|
||||||
OpRequestShortCircuitAccessProto.newBuilder()
|
OpRequestShortCircuitAccessProto.newBuilder()
|
||||||
.setHeader(DataTransferProtoUtil.buildBaseHeader(
|
.setHeader(DataTransferProtoUtil.buildBaseHeader(
|
||||||
|
@ -189,6 +190,7 @@ public class Sender implements DataTransferProtocol {
|
||||||
if (slotId != null) {
|
if (slotId != null) {
|
||||||
builder.setSlotId(PBHelper.convert(slotId));
|
builder.setSlotId(PBHelper.convert(slotId));
|
||||||
}
|
}
|
||||||
|
builder.setSupportsReceiptVerification(supportsReceiptVerification);
|
||||||
OpRequestShortCircuitAccessProto proto = builder.build();
|
OpRequestShortCircuitAccessProto proto = builder.build();
|
||||||
send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
|
send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ER
|
||||||
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
|
||||||
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
|
||||||
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
|
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
|
||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
|
@ -291,10 +293,14 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void requestShortCircuitFds(final ExtendedBlock blk,
|
public void requestShortCircuitFds(final ExtendedBlock blk,
|
||||||
final Token<BlockTokenIdentifier> token,
|
final Token<BlockTokenIdentifier> token,
|
||||||
SlotId slotId, int maxVersion) throws IOException {
|
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
||||||
|
throws IOException {
|
||||||
updateCurrentThreadName("Passing file descriptors for block " + blk);
|
updateCurrentThreadName("Passing file descriptors for block " + blk);
|
||||||
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
|
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
|
||||||
FileInputStream fis[] = null;
|
FileInputStream fis[] = null;
|
||||||
|
SlotId registeredSlotId = null;
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
try {
|
try {
|
||||||
if (peer.getDomainSocket() == null) {
|
if (peer.getDomainSocket() == null) {
|
||||||
throw new IOException("You cannot pass file descriptors over " +
|
throw new IOException("You cannot pass file descriptors over " +
|
||||||
|
@ -305,14 +311,10 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
isCached(blk.getBlockPoolId(), blk.getBlockId());
|
isCached(blk.getBlockPoolId(), blk.getBlockId());
|
||||||
datanode.shortCircuitRegistry.registerSlot(
|
datanode.shortCircuitRegistry.registerSlot(
|
||||||
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
|
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
|
||||||
|
registeredSlotId = slotId;
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
|
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
|
||||||
} finally {
|
Preconditions.checkState(fis != null);
|
||||||
if ((fis == null) && (slotId != null)) {
|
|
||||||
datanode.shortCircuitRegistry.unregisterSlot(slotId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bld.setStatus(SUCCESS);
|
bld.setStatus(SUCCESS);
|
||||||
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
|
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
|
||||||
} catch (ShortCircuitFdsVersionException e) {
|
} catch (ShortCircuitFdsVersionException e) {
|
||||||
|
@ -329,26 +331,45 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
bld.setStatus(ERROR);
|
bld.setStatus(ERROR);
|
||||||
bld.setMessage(e.getMessage());
|
bld.setMessage(e.getMessage());
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
bld.build().writeDelimitedTo(socketOut);
|
bld.build().writeDelimitedTo(socketOut);
|
||||||
if (fis != null) {
|
if (fis != null) {
|
||||||
FileDescriptor fds[] = new FileDescriptor[fis.length];
|
FileDescriptor fds[] = new FileDescriptor[fis.length];
|
||||||
for (int i = 0; i < fds.length; i++) {
|
for (int i = 0; i < fds.length; i++) {
|
||||||
fds[i] = fis[i].getFD();
|
fds[i] = fis[i].getFD();
|
||||||
}
|
}
|
||||||
byte buf[] = new byte[] { (byte)0 };
|
byte buf[] = new byte[1];
|
||||||
peer.getDomainSocket().
|
if (supportsReceiptVerification) {
|
||||||
sendFileDescriptors(fds, buf, 0, buf.length);
|
buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber();
|
||||||
|
} else {
|
||||||
|
buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber();
|
||||||
|
}
|
||||||
|
DomainSocket sock = peer.getDomainSocket();
|
||||||
|
sock.sendFileDescriptors(fds, buf, 0, buf.length);
|
||||||
|
if (supportsReceiptVerification) {
|
||||||
|
LOG.trace("Reading receipt verification byte for " + slotId);
|
||||||
|
int val = sock.getInputStream().read();
|
||||||
|
if (val < 0) {
|
||||||
|
throw new EOFException();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.trace("Receipt verification is not enabled on the DataNode. " +
|
||||||
|
"Not verifying " + slotId);
|
||||||
|
}
|
||||||
|
success = true;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if ((!success) && (registeredSlotId != null)) {
|
||||||
|
LOG.info("Unregistering " + registeredSlotId + " because the " +
|
||||||
|
"requestShortCircuitFdsForRead operation failed.");
|
||||||
|
datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
|
||||||
|
}
|
||||||
if (ClientTraceLog.isInfoEnabled()) {
|
if (ClientTraceLog.isInfoEnabled()) {
|
||||||
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
|
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
|
||||||
.getBlockPoolId());
|
.getBlockPoolId());
|
||||||
BlockSender.ClientTraceLog.info(String.format(
|
BlockSender.ClientTraceLog.info(String.format(
|
||||||
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
|
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
|
||||||
" blockid: %s, srvID: %s, success: %b",
|
" blockid: %s, srvID: %s, success: %b",
|
||||||
blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
|
blk.getBlockId(), dnR.getDatanodeUuid(), success));
|
||||||
));
|
|
||||||
}
|
}
|
||||||
if (fis != null) {
|
if (fis != null) {
|
||||||
IOUtils.cleanup(LOG, fis);
|
IOUtils.cleanup(LOG, fis);
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -83,7 +84,7 @@ public class ShortCircuitRegistry {
|
||||||
|
|
||||||
private static final int SHM_LENGTH = 8192;
|
private static final int SHM_LENGTH = 8192;
|
||||||
|
|
||||||
private static class RegisteredShm extends ShortCircuitShm
|
public static class RegisteredShm extends ShortCircuitShm
|
||||||
implements DomainSocketWatcher.Handler {
|
implements DomainSocketWatcher.Handler {
|
||||||
private final String clientName;
|
private final String clientName;
|
||||||
private final ShortCircuitRegistry registry;
|
private final ShortCircuitRegistry registry;
|
||||||
|
@ -383,4 +384,14 @@ public class ShortCircuitRegistry {
|
||||||
}
|
}
|
||||||
IOUtils.closeQuietly(watcher);
|
IOUtils.closeQuietly(watcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static interface Visitor {
|
||||||
|
void accept(HashMap<ShmId, RegisteredShm> segments,
|
||||||
|
HashMultimap<ExtendedBlockId, Slot> slots);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized void visit(Visitor visitor) {
|
||||||
|
visitor.accept(segments, slots);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -179,6 +179,12 @@ message OpRequestShortCircuitAccessProto {
|
||||||
* The shared memory slot to use, if we are using one.
|
* The shared memory slot to use, if we are using one.
|
||||||
*/
|
*/
|
||||||
optional ShortCircuitShmSlotProto slotId = 3;
|
optional ShortCircuitShmSlotProto slotId = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* True if the client supports verifying that the file descriptor has been
|
||||||
|
* sent successfully.
|
||||||
|
*/
|
||||||
|
optional bool supportsReceiptVerification = 4 [default = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message ReleaseShortCircuitAccessRequestProto {
|
message ReleaseShortCircuitAccessRequestProto {
|
||||||
|
@ -230,6 +236,11 @@ enum Status {
|
||||||
IN_PROGRESS = 12;
|
IN_PROGRESS = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum ShortCircuitFdResponse {
|
||||||
|
DO_NOT_USE_RECEIPT_VERIFICATION = 0;
|
||||||
|
USE_RECEIPT_VERIFICATION = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message PipelineAckProto {
|
message PipelineAckProto {
|
||||||
required sint64 seqno = 1;
|
required sint64 seqno = 1;
|
||||||
repeated uint32 reply = 2;
|
repeated uint32 reply = 2;
|
||||||
|
|
|
@ -36,13 +36,16 @@ import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.google.common.collect.HashMultimap;
|
||||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -52,11 +55,14 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
|
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
|
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||||
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
|
@ -615,4 +621,61 @@ public class TestShortCircuitCache {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestCleanupFailureInjector
|
||||||
|
extends BlockReaderFactory.FailureInjector {
|
||||||
|
@Override
|
||||||
|
public void injectRequestFileDescriptorsFailure() throws IOException {
|
||||||
|
throw new IOException("injected I/O error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regression test for HDFS-7915
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
|
||||||
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
||||||
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
||||||
|
Configuration conf = createShortCircuitConf(
|
||||||
|
"testDataXceiverCleansUpSlotsOnFailure", sockDir);
|
||||||
|
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
||||||
|
1000000000L);
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path TEST_PATH1 = new Path("/test_file1");
|
||||||
|
final Path TEST_PATH2 = new Path("/test_file2");
|
||||||
|
final int TEST_FILE_LEN = 4096;
|
||||||
|
final int SEED = 0xFADE1;
|
||||||
|
DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN,
|
||||||
|
(short)1, SEED);
|
||||||
|
DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN,
|
||||||
|
(short)1, SEED);
|
||||||
|
|
||||||
|
// The first read should allocate one shared memory segment and slot.
|
||||||
|
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
|
||||||
|
|
||||||
|
// The second read should fail, and we should only have 1 segment and 1 slot
|
||||||
|
// left.
|
||||||
|
fs.getClient().getConf().brfFailureInjector =
|
||||||
|
new TestCleanupFailureInjector();
|
||||||
|
try {
|
||||||
|
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
|
||||||
|
"testing, but we failed to do a non-TCP read.", t);
|
||||||
|
}
|
||||||
|
ShortCircuitRegistry registry =
|
||||||
|
cluster.getDataNodes().get(0).getShortCircuitRegistry();
|
||||||
|
registry.visit(new ShortCircuitRegistry.Visitor() {
|
||||||
|
@Override
|
||||||
|
public void accept(HashMap<ShmId, RegisteredShm> segments,
|
||||||
|
HashMultimap<ExtendedBlockId, Slot> slots) {
|
||||||
|
Assert.assertEquals(1, segments.size());
|
||||||
|
Assert.assertEquals(1, slots.size());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cluster.shutdown();
|
||||||
|
sockDir.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue