HDFS-9574. Reduce client failures during datanode restart. Contributed by Kihwal Lee.

This commit is contained in:
Kihwal Lee 2016-01-08 11:13:25 -06:00
parent ed18527e38
commit 38c4c14472
10 changed files with 224 additions and 61 deletions

View File

@ -29,6 +29,7 @@ import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -72,10 +73,12 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore; import org.apache.hadoop.util.IdentityHashStore;
import org.apache.hadoop.util.StopWatch;
import org.apache.htrace.core.SpanId; import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer; import org.apache.htrace.core.Tracer;
@ -357,12 +360,18 @@ public class DFSInputStream extends FSInputStream
int replicaNotFoundCount = locatedblock.getLocations().length; int replicaNotFoundCount = locatedblock.getLocations().length;
final DfsClientConf conf = dfsClient.getConf(); final DfsClientConf conf = dfsClient.getConf();
for(DatanodeInfo datanode : locatedblock.getLocations()) { final int timeout = conf.getSocketTimeout();
LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>(
Arrays.asList(locatedblock.getLocations()));
LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>();
boolean isRetry = false;
StopWatch sw = new StopWatch();
while (nodeList.size() > 0) {
DatanodeInfo datanode = nodeList.pop();
ClientDatanodeProtocol cdp = null; ClientDatanodeProtocol cdp = null;
try { try {
cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode, cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
dfsClient.getConfiguration(), conf.getSocketTimeout(), dfsClient.getConfiguration(), timeout,
conf.isConnectToDnViaHostname(), locatedblock); conf.isConnectToDnViaHostname(), locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@ -370,15 +379,19 @@ public class DFSInputStream extends FSInputStream
if (n >= 0) { if (n >= 0) {
return n; return n;
} }
} } catch (IOException ioe) {
catch(IOException ioe) { if (ioe instanceof RemoteException) {
if (ioe instanceof RemoteException && if (((RemoteException) ioe).unwrapRemoteException() instanceof
(((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) {
ReplicaNotFoundException)) { // replica is not on the DN. We will treat it as 0 length
// special case : replica might not be on the DN, treat as 0 length // if no one actually has a replica.
replicaNotFoundCount--; replicaNotFoundCount--;
} else if (((RemoteException) ioe).unwrapRemoteException() instanceof
RetriableException) {
// add to the list to be retried if necessary.
retryList.add(datanode);
}
} }
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}" DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
+ " for block {}", datanode, locatedblock.getBlock(), ioe); + " for block {}", datanode, locatedblock.getBlock(), ioe);
} finally { } finally {
@ -386,6 +399,30 @@ public class DFSInputStream extends FSInputStream
RPC.stopProxy(cdp); RPC.stopProxy(cdp);
} }
} }
// Ran out of nodes, but there are retriable nodes.
if (nodeList.size() == 0 && retryList.size() > 0) {
nodeList.addAll(retryList);
retryList.clear();
isRetry = true;
}
if (isRetry) {
// start the stop watch if not already running.
if (!sw.isRunning()) {
sw.start();
}
try {
Thread.sleep(500); // delay between retries.
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting the length.");
}
}
// see if we ran out of retry time
if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) {
break;
}
} }
// Namenode told us about these locations, but none know about the replica // Namenode told us about these locations, but none know about the replica

View File

@ -2581,6 +2581,8 @@ Release 2.7.3 - UNRELEASED
HDFS-7163. WebHdfsFileSystem should retry reads according to the configured HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
retry policy. (Eric Payne via kihwal) retry policy. (Eric Payne via kihwal)
HDFS-9574. Reduce client failures during datanode restart (kihwal)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -507,6 +507,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version"; public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT"; public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class"; public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
public static final String DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout";
public static final long DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20;
public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable"; public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false; public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;

View File

@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -104,6 +106,8 @@ public class DNConf {
final long maxLockedMemory; final long maxLockedMemory;
private final long bpReadyTimeout;
// Allow LAZY_PERSIST writes from non-local clients? // Allow LAZY_PERSIST writes from non-local clients?
private final boolean allowNonLocalLazyPersist; private final boolean allowNonLocalLazyPersist;
@ -210,6 +214,10 @@ public class DNConf {
this.allowNonLocalLazyPersist = conf.getBoolean( this.allowNonLocalLazyPersist = conf.getBoolean(
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT); DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
this.bpReadyTimeout = conf.getLong(
DFS_DATANODE_BP_READY_TIMEOUT_KEY,
DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT);
} }
// We get minimumNameNodeVersion via a method so it can be mocked out in tests. // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@ -322,4 +330,8 @@ public class DNConf {
public int getTransferSocketSendBufferSize() { public int getTransferSocketSendBufferSize() {
return transferSocketSendBufferSize; return transferSocketSendBufferSize;
} }
public long getBpReadyTimeout() {
return bpReadyTimeout;
}
} }

View File

@ -1594,6 +1594,7 @@ public class DataNode extends ReconfigurableBase
@VisibleForTesting @VisibleForTesting
public DatanodeRegistration getDNRegistrationForBP(String bpid) public DatanodeRegistration getDNRegistrationForBP(String bpid)
throws IOException { throws IOException {
DataNodeFaultInjector.get().noRegistration();
BPOfferService bpos = blockPoolManager.get(bpid); BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos==null || bpos.bpRegistration==null) { if(bpos==null || bpos.bpRegistration==null) {
throw new IOException("cannot find BPOfferService for bpid="+bpid); throw new IOException("cannot find BPOfferService for bpid="+bpid);
@ -1721,7 +1722,6 @@ public class DataNode extends ReconfigurableBase
throw new ShortCircuitFdsUnsupportedException( throw new ShortCircuitFdsUnsupportedException(
fileDescriptorPassingDisabledReason); fileDescriptorPassingDisabledReason);
} }
checkBlockToken(blk, token, BlockTokenIdentifier.AccessMode.READ);
int blkVersion = CURRENT_BLOCK_FORMAT_VERSION; int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
if (maxVersion < blkVersion) { if (maxVersion < blkVersion) {
throw new ShortCircuitFdsVersionException("Your client is too old " + throw new ShortCircuitFdsVersionException("Your client is too old " +
@ -2709,6 +2709,15 @@ public class DataNode extends ReconfigurableBase
} }
private void checkReadAccess(final ExtendedBlock block) throws IOException { private void checkReadAccess(final ExtendedBlock block) throws IOException {
// Make sure this node has registered for the block pool.
try {
getDNRegistrationForBP(block.getBlockPoolId());
} catch (IOException e) {
// if it has not registered with the NN, throw an exception back.
throw new org.apache.hadoop.ipc.RetriableException(
"Datanode not registered. Try again later.");
}
if (isBlockTokenEnabled) { if (isBlockTokenEnabled) {
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
.getTokenIdentifiers(); .getTokenIdentifiers();

View File

@ -49,4 +49,6 @@ public class DataNodeFaultInjector {
public boolean dropHeartbeatPacket() { public boolean dropHeartbeatPacket() {
return false; return false;
} }
public void noRegistration() throws IOException { }
} }

View File

@ -45,6 +45,7 @@ import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -85,6 +86,7 @@ import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StopWatch;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -298,6 +300,9 @@ class DataXceiver extends Receiver implements Runnable {
SlotId slotId, int maxVersion, boolean supportsReceiptVerification) SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException { throws IOException {
updateCurrentThreadName("Passing file descriptors for block " + blk); updateCurrentThreadName("Passing file descriptors for block " + blk);
DataOutputStream out = getBufferedOutputStream();
checkAccess(out, true, blk, token,
Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null; FileInputStream fis[] = null;
SlotId registeredSlotId = null; SlotId registeredSlotId = null;
@ -326,9 +331,6 @@ class DataXceiver extends Receiver implements Runnable {
} catch (ShortCircuitFdsUnsupportedException e) { } catch (ShortCircuitFdsUnsupportedException e) {
bld.setStatus(ERROR_UNSUPPORTED); bld.setStatus(ERROR_UNSUPPORTED);
bld.setMessage(e.getMessage()); bld.setMessage(e.getMessage());
} catch (InvalidToken e) {
bld.setStatus(ERROR_ACCESS_TOKEN);
bld.setMessage(e.getMessage());
} catch (IOException e) { } catch (IOException e) {
bld.setStatus(ERROR); bld.setStatus(ERROR);
bld.setMessage(e.getMessage()); bld.setMessage(e.getMessage());
@ -516,9 +518,9 @@ class DataXceiver extends Receiver implements Runnable {
final CachingStrategy cachingStrategy) throws IOException { final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName; previousOpClientName = clientName;
long read = 0; long read = 0;
updateCurrentThreadName("Sending block " + block);
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
DataOutputStream out = new DataOutputStream(new BufferedOutputStream( DataOutputStream out = getBufferedOutputStream();
baseStream, smallBufferSize));
checkAccess(out, true, block, blockToken, checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ); Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
@ -534,7 +536,6 @@ class DataXceiver extends Receiver implements Runnable {
: dnR + " Served block " + block + " to " + : dnR + " Served block " + block + " to " +
remoteAddress; remoteAddress;
updateCurrentThreadName("Sending block " + block);
try { try {
try { try {
blockSender = new BlockSender(block, blockOffset, length, blockSender = new BlockSender(block, blockOffset, length,
@ -630,6 +631,10 @@ class DataXceiver extends Receiver implements Runnable {
allowLazyPersist = allowLazyPersist && allowLazyPersist = allowLazyPersist &&
(dnConf.getAllowNonLocalLazyPersist() || peer.isLocal()); (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
long size = 0; long size = 0;
// reply to upstream datanode or client
final DataOutputStream replyOut = getBufferedOutputStream();
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
// check single target for transfer-RBW/Finalized // check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) { if (isTransfer && targets.length > 0) {
throw new IOException(stage + " does not support multiple targets " throw new IOException(stage + " does not support multiple targets "
@ -660,11 +665,6 @@ class DataXceiver extends Receiver implements Runnable {
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
+ localAddress); + localAddress);
// reply to upstream datanode or client
final DataOutputStream replyOut = getBufferedOutputStream();
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
DataOutputStream mirrorOut = null; // stream to next target DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target Socket mirrorSock = null; // socket to next target
@ -863,13 +863,13 @@ class DataXceiver extends Receiver implements Runnable {
final String clientName, final String clientName,
final DatanodeInfo[] targets, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException { final StorageType[] targetStorageTypes) throws IOException {
checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
previousOpClientName = clientName; previousOpClientName = clientName;
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream( final DataOutputStream out = new DataOutputStream(
getOutputStream()); getOutputStream());
checkAccess(out, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
try { try {
datanode.transferReplicaForPipelineRecovery(blk, targets, datanode.transferReplicaForPipelineRecovery(blk, targets,
targetStorageTypes, clientName); targetStorageTypes, clientName);
@ -923,6 +923,7 @@ class DataXceiver extends Receiver implements Runnable {
@Override @Override
public void blockChecksum(final ExtendedBlock block, public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException { final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Getting checksum for block " + block);
final DataOutputStream out = new DataOutputStream( final DataOutputStream out = new DataOutputStream(
getOutputStream()); getOutputStream());
checkAccess(out, true, block, blockToken, checkAccess(out, true, block, blockToken,
@ -933,13 +934,11 @@ class DataXceiver extends Receiver implements Runnable {
long visibleLength = datanode.data.getReplicaVisibleLength(block); long visibleLength = datanode.data.getReplicaVisibleLength(block);
boolean partialBlk = requestLength < visibleLength; boolean partialBlk = requestLength < visibleLength;
updateCurrentThreadName("Reading metadata for block " + block);
final LengthInputStream metadataIn = datanode.data final LengthInputStream metadataIn = datanode.data
.getMetaDataInputStream(block); .getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream( final DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, ioFileBufferSize)); new BufferedInputStream(metadataIn, ioFileBufferSize));
updateCurrentThreadName("Getting checksum for block " + block);
try { try {
//read metadata file //read metadata file
final BlockMetadataHeader header = BlockMetadataHeader final BlockMetadataHeader header = BlockMetadataHeader
@ -987,21 +986,10 @@ class DataXceiver extends Receiver implements Runnable {
public void copyBlock(final ExtendedBlock block, public void copyBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException { final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Copying block " + block); updateCurrentThreadName("Copying block " + block);
// Read in the header DataOutputStream reply = getBufferedOutputStream();
if (datanode.isBlockTokenEnabled) { checkAccess(reply, true, block, blockToken,
try { Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
BlockTokenIdentifier.AccessMode.COPY);
} catch (InvalidToken e) {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_COPY_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
return;
}
}
if (datanode.data.getPinning(block)) { if (datanode.data.getPinning(block)) {
String msg = "Not able to copy block " + block.getBlockId() + " " + String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because it's pinned "; "to " + peer.getRemoteAddressString() + " because it's pinned ";
@ -1019,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable {
} }
BlockSender blockSender = null; BlockSender blockSender = null;
DataOutputStream reply = null;
boolean isOpSuccess = true; boolean isOpSuccess = true;
try { try {
@ -1027,10 +1014,7 @@ class DataXceiver extends Receiver implements Runnable {
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
null, CachingStrategy.newDropBehind()); null, CachingStrategy.newDropBehind());
// set up response stream
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
reply = new DataOutputStream(new BufferedOutputStream(
baseStream, smallBufferSize));
// send status first // send status first
writeSuccessWithChecksumInfo(blockSender, reply); writeSuccessWithChecksumInfo(blockSender, reply);
@ -1074,20 +1058,9 @@ class DataXceiver extends Receiver implements Runnable {
final String delHint, final String delHint,
final DatanodeInfo proxySource) throws IOException { final DatanodeInfo proxySource) throws IOException {
updateCurrentThreadName("Replacing block " + block + " from " + delHint); updateCurrentThreadName("Replacing block " + block + " from " + delHint);
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
/* read header */ checkAccess(replyOut, true, block, blockToken,
if (datanode.isBlockTokenEnabled) { Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE);
try {
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
BlockTokenIdentifier.AccessMode.REPLACE);
} catch (InvalidToken e) {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_REPLACE_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to receive block " + block.getBlockId() + String msg = "Not able to receive block " + block.getBlockId() +
@ -1104,7 +1077,6 @@ class DataXceiver extends Receiver implements Runnable {
String errMsg = null; String errMsg = null;
BlockReceiver blockReceiver = null; BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null; DataInputStream proxyReply = null;
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
boolean IoeDuringCopyBlockOperation = false; boolean IoeDuringCopyBlockOperation = false;
try { try {
// Move the block to different storage in the same datanode // Move the block to different storage in the same datanode
@ -1296,11 +1268,52 @@ class DataXceiver extends Receiver implements Runnable {
datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort); datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
} }
/**
* Wait until the BP is registered, upto the configured amount of time.
* Throws an exception if times out, which should fail the client request.
* @param the requested block
*/
void checkAndWaitForBP(final ExtendedBlock block)
throws IOException {
String bpId = block.getBlockPoolId();
// The registration is only missing in relatively short time window.
// Optimistically perform this first.
try {
datanode.getDNRegistrationForBP(bpId);
return;
} catch (IOException ioe) {
// not registered
}
// retry
long bpReadyTimeout = dnConf.getBpReadyTimeout();
StopWatch sw = new StopWatch();
sw.start();
while (sw.now(TimeUnit.SECONDS) <= bpReadyTimeout) {
try {
datanode.getDNRegistrationForBP(bpId);
return;
} catch (IOException ioe) {
// not registered
}
// sleep before trying again
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while serving request. Aborting.");
}
}
// failed to obtain registration.
throw new IOException("Not ready to serve the block pool, " + bpId + ".");
}
private void checkAccess(OutputStream out, final boolean reply, private void checkAccess(OutputStream out, final boolean reply,
final ExtendedBlock blk, final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t, final Token<BlockTokenIdentifier> t,
final Op op, final Op op,
final BlockTokenIdentifier.AccessMode mode) throws IOException { final BlockTokenIdentifier.AccessMode mode) throws IOException {
checkAndWaitForBP(blk);
if (datanode.isBlockTokenEnabled) { if (datanode.isBlockTokenEnabled) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Checking block access token for block '" + blk.getBlockId() LOG.debug("Checking block access token for block '" + blk.getBlockId()

View File

@ -2721,4 +2721,14 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.bp-ready.timeout</name>
<value>20</value>
<description>
The maximum wait time for datanode to be ready before failing the
received request. Setting this to 0 fails requests right away if the
datanode is not yet registered with the namenode. This wait time
reduces initial request failures after datanode restart.
</description>
</property>
</configuration> </configuration>

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.net.*;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*; import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
@ -162,17 +163,20 @@ public class TestDataXceiverLazyPersistHint {
return peer; return peer;
} }
private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) { private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist)
throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setBoolean( conf.setBoolean(
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED); nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
DNConf dnConf = new DNConf(conf); DNConf dnConf = new DNConf(conf);
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class); DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
DataNode mockDn = mock(DataNode.class); DataNode mockDn = mock(DataNode.class);
when(mockDn.getDnConf()).thenReturn(dnConf); when(mockDn.getDnConf()).thenReturn(dnConf);
when(mockDn.getConf()).thenReturn(conf); when(mockDn.getConf()).thenReturn(conf);
when(mockDn.getMetrics()).thenReturn(mockMetrics); when(mockDn.getMetrics()).thenReturn(mockMetrics);
when(mockDn.getDNRegistrationForBP("Dummy-pool")).thenReturn(mockDnReg);
return mockDn; return mockDn;
} }
} }

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@ -146,4 +147,75 @@ public class TestDatanodeRestart {
private static FsDatasetImpl dataset(DataNode dn) { private static FsDatasetImpl dataset(DataNode dn) {
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
} }
@Test
public void testWaitForRegistrationOnRestart() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY, 5);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
// This makes the datanode appear registered to the NN, but it won't be
// able to get to the saved dn reg internally.
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override
public void noRegistration() throws IOException {
throw new IOException("no reg found for testing");
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(dnFaultInjector);
MiniDFSCluster cluster = null;
long start = 0;
Path file = new Path("/reg");
try {
int numDNs = 1;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
start = System.currentTimeMillis();
FileSystem fileSys = cluster.getFileSystem();
try {
DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
// It is a bug if this does not fail.
throw new IOException("Did not fail!");
} catch (org.apache.hadoop.ipc.RemoteException e) {
long elapsed = System.currentTimeMillis() - start;
// timers have at-least semantics, so it should be at least 5 seconds.
if (elapsed < 5000 || elapsed > 10000) {
throw new IOException(elapsed + " seconds passed.", e);
}
}
DataNodeFaultInjector.set(oldDnInjector);
// this should succeed now.
DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
// turn it back to under-construction, so that the client calls
// getReplicaVisibleLength() rpc method against the datanode.
fileSys.append(file);
// back to simulating unregistered node.
DataNodeFaultInjector.set(dnFaultInjector);
byte[] buffer = new byte[8];
start = System.currentTimeMillis();
try {
fileSys.open(file).read(0L, buffer, 0, 1);
throw new IOException("Did not fail!");
} catch (IOException e) {
long elapsed = System.currentTimeMillis() - start;
if (e.getMessage().contains("readBlockLength")) {
throw new IOException("Failed, but with unexpected exception:", e);
}
// timers have at-least semantics, so it should be at least 5 seconds.
if (elapsed < 5000 || elapsed > 10000) {
throw new IOException(elapsed + " seconds passed.", e);
}
}
DataNodeFaultInjector.set(oldDnInjector);
fileSys.open(file).read(0L, buffer, 0, 1);
} finally {
DataNodeFaultInjector.set(oldDnInjector);
if (cluster != null) {
cluster.shutdown();
}
}
}
} }