HDFS-9574. Reduce client failures during datanode restart. Contributed by Kihwal Lee.
(cherry picked from commit 38c4c14472
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
This commit is contained in:
parent
852033ca62
commit
d1cbc8442c
|
@ -29,6 +29,7 @@ import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
@ -73,10 +74,12 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
||||||
import org.apache.hadoop.io.ByteBufferPool;
|
import org.apache.hadoop.io.ByteBufferPool;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.IdentityHashStore;
|
import org.apache.hadoop.util.IdentityHashStore;
|
||||||
|
import org.apache.hadoop.util.StopWatch;
|
||||||
import org.apache.htrace.core.SpanId;
|
import org.apache.htrace.core.SpanId;
|
||||||
import org.apache.htrace.core.TraceScope;
|
import org.apache.htrace.core.TraceScope;
|
||||||
import org.apache.htrace.core.Tracer;
|
import org.apache.htrace.core.Tracer;
|
||||||
|
@ -358,12 +361,18 @@ public class DFSInputStream extends FSInputStream
|
||||||
int replicaNotFoundCount = locatedblock.getLocations().length;
|
int replicaNotFoundCount = locatedblock.getLocations().length;
|
||||||
|
|
||||||
final DfsClientConf conf = dfsClient.getConf();
|
final DfsClientConf conf = dfsClient.getConf();
|
||||||
for(DatanodeInfo datanode : locatedblock.getLocations()) {
|
final int timeout = conf.getSocketTimeout();
|
||||||
|
LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>(
|
||||||
|
Arrays.asList(locatedblock.getLocations()));
|
||||||
|
LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>();
|
||||||
|
boolean isRetry = false;
|
||||||
|
StopWatch sw = new StopWatch();
|
||||||
|
while (nodeList.size() > 0) {
|
||||||
|
DatanodeInfo datanode = nodeList.pop();
|
||||||
ClientDatanodeProtocol cdp = null;
|
ClientDatanodeProtocol cdp = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
|
cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
|
||||||
dfsClient.getConfiguration(), conf.getSocketTimeout(),
|
dfsClient.getConfiguration(), timeout,
|
||||||
conf.isConnectToDnViaHostname(), locatedblock);
|
conf.isConnectToDnViaHostname(), locatedblock);
|
||||||
|
|
||||||
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
|
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
|
||||||
|
@ -371,15 +380,19 @@ public class DFSInputStream extends FSInputStream
|
||||||
if (n >= 0) {
|
if (n >= 0) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
}
|
} catch (IOException ioe) {
|
||||||
catch(IOException ioe) {
|
if (ioe instanceof RemoteException) {
|
||||||
if (ioe instanceof RemoteException &&
|
if (((RemoteException) ioe).unwrapRemoteException() instanceof
|
||||||
(((RemoteException) ioe).unwrapRemoteException() instanceof
|
ReplicaNotFoundException) {
|
||||||
ReplicaNotFoundException)) {
|
// replica is not on the DN. We will treat it as 0 length
|
||||||
// special case : replica might not be on the DN, treat as 0 length
|
// if no one actually has a replica.
|
||||||
replicaNotFoundCount--;
|
replicaNotFoundCount--;
|
||||||
|
} else if (((RemoteException) ioe).unwrapRemoteException() instanceof
|
||||||
|
RetriableException) {
|
||||||
|
// add to the list to be retried if necessary.
|
||||||
|
retryList.add(datanode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
|
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
|
||||||
+ " for block {}", datanode, locatedblock.getBlock(), ioe);
|
+ " for block {}", datanode, locatedblock.getBlock(), ioe);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -387,6 +400,30 @@ public class DFSInputStream extends FSInputStream
|
||||||
RPC.stopProxy(cdp);
|
RPC.stopProxy(cdp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ran out of nodes, but there are retriable nodes.
|
||||||
|
if (nodeList.size() == 0 && retryList.size() > 0) {
|
||||||
|
nodeList.addAll(retryList);
|
||||||
|
retryList.clear();
|
||||||
|
isRetry = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isRetry) {
|
||||||
|
// start the stop watch if not already running.
|
||||||
|
if (!sw.isRunning()) {
|
||||||
|
sw.start();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(500); // delay between retries.
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException("Interrupted while getting the length.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// see if we ran out of retry time
|
||||||
|
if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Namenode told us about these locations, but none know about the replica
|
// Namenode told us about these locations, but none know about the replica
|
||||||
|
|
|
@ -1651,6 +1651,8 @@ Release 2.7.3 - UNRELEASED
|
||||||
HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
|
HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
|
||||||
retry policy. (Eric Payne via kihwal)
|
retry policy. (Eric Payne via kihwal)
|
||||||
|
|
||||||
|
HDFS-9574. Reduce client failures during datanode restart (kihwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -492,6 +492,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
|
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
|
||||||
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.1.0-beta";
|
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.1.0-beta";
|
||||||
public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
|
public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
|
||||||
|
public static final String DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout";
|
||||||
|
public static final long DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20;
|
||||||
|
|
||||||
public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
|
public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
|
||||||
public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;
|
public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;
|
||||||
|
|
|
@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -104,6 +106,8 @@ public class DNConf {
|
||||||
|
|
||||||
final long maxLockedMemory;
|
final long maxLockedMemory;
|
||||||
|
|
||||||
|
private final long bpReadyTimeout;
|
||||||
|
|
||||||
// Allow LAZY_PERSIST writes from non-local clients?
|
// Allow LAZY_PERSIST writes from non-local clients?
|
||||||
private final boolean allowNonLocalLazyPersist;
|
private final boolean allowNonLocalLazyPersist;
|
||||||
|
|
||||||
|
@ -210,6 +214,10 @@ public class DNConf {
|
||||||
this.allowNonLocalLazyPersist = conf.getBoolean(
|
this.allowNonLocalLazyPersist = conf.getBoolean(
|
||||||
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
|
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
|
||||||
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
|
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
|
||||||
|
|
||||||
|
this.bpReadyTimeout = conf.getLong(
|
||||||
|
DFS_DATANODE_BP_READY_TIMEOUT_KEY,
|
||||||
|
DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
|
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
|
||||||
|
@ -295,4 +303,8 @@ public class DNConf {
|
||||||
public int getTransferSocketSendBufferSize() {
|
public int getTransferSocketSendBufferSize() {
|
||||||
return transferSocketSendBufferSize;
|
return transferSocketSendBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getBpReadyTimeout() {
|
||||||
|
return bpReadyTimeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1582,6 +1582,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public DatanodeRegistration getDNRegistrationForBP(String bpid)
|
public DatanodeRegistration getDNRegistrationForBP(String bpid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
DataNodeFaultInjector.get().noRegistration();
|
||||||
BPOfferService bpos = blockPoolManager.get(bpid);
|
BPOfferService bpos = blockPoolManager.get(bpid);
|
||||||
if(bpos==null || bpos.bpRegistration==null) {
|
if(bpos==null || bpos.bpRegistration==null) {
|
||||||
throw new IOException("cannot find BPOfferService for bpid="+bpid);
|
throw new IOException("cannot find BPOfferService for bpid="+bpid);
|
||||||
|
@ -1709,7 +1710,6 @@ public class DataNode extends ReconfigurableBase
|
||||||
throw new ShortCircuitFdsUnsupportedException(
|
throw new ShortCircuitFdsUnsupportedException(
|
||||||
fileDescriptorPassingDisabledReason);
|
fileDescriptorPassingDisabledReason);
|
||||||
}
|
}
|
||||||
checkBlockToken(blk, token, BlockTokenIdentifier.AccessMode.READ);
|
|
||||||
int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
|
int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
|
||||||
if (maxVersion < blkVersion) {
|
if (maxVersion < blkVersion) {
|
||||||
throw new ShortCircuitFdsVersionException("Your client is too old " +
|
throw new ShortCircuitFdsVersionException("Your client is too old " +
|
||||||
|
@ -2695,6 +2695,15 @@ public class DataNode extends ReconfigurableBase
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkReadAccess(final ExtendedBlock block) throws IOException {
|
private void checkReadAccess(final ExtendedBlock block) throws IOException {
|
||||||
|
// Make sure this node has registered for the block pool.
|
||||||
|
try {
|
||||||
|
getDNRegistrationForBP(block.getBlockPoolId());
|
||||||
|
} catch (IOException e) {
|
||||||
|
// if it has not registered with the NN, throw an exception back.
|
||||||
|
throw new org.apache.hadoop.ipc.RetriableException(
|
||||||
|
"Datanode not registered. Try again later.");
|
||||||
|
}
|
||||||
|
|
||||||
if (isBlockTokenEnabled) {
|
if (isBlockTokenEnabled) {
|
||||||
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
|
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
|
||||||
.getTokenIdentifiers();
|
.getTokenIdentifiers();
|
||||||
|
|
|
@ -49,4 +49,6 @@ public class DataNodeFaultInjector {
|
||||||
public boolean dropHeartbeatPacket() {
|
public boolean dropHeartbeatPacket() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void noRegistration() throws IOException { }
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import java.net.SocketTimeoutException;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -85,6 +86,7 @@ import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
import org.apache.hadoop.util.StopWatch;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
@ -298,6 +300,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
updateCurrentThreadName("Passing file descriptors for block " + blk);
|
updateCurrentThreadName("Passing file descriptors for block " + blk);
|
||||||
|
DataOutputStream out = getBufferedOutputStream();
|
||||||
|
checkAccess(out, true, blk, token,
|
||||||
|
Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ);
|
||||||
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
|
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
|
||||||
FileInputStream fis[] = null;
|
FileInputStream fis[] = null;
|
||||||
SlotId registeredSlotId = null;
|
SlotId registeredSlotId = null;
|
||||||
|
@ -326,9 +331,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
} catch (ShortCircuitFdsUnsupportedException e) {
|
} catch (ShortCircuitFdsUnsupportedException e) {
|
||||||
bld.setStatus(ERROR_UNSUPPORTED);
|
bld.setStatus(ERROR_UNSUPPORTED);
|
||||||
bld.setMessage(e.getMessage());
|
bld.setMessage(e.getMessage());
|
||||||
} catch (InvalidToken e) {
|
|
||||||
bld.setStatus(ERROR_ACCESS_TOKEN);
|
|
||||||
bld.setMessage(e.getMessage());
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
bld.setStatus(ERROR);
|
bld.setStatus(ERROR);
|
||||||
bld.setMessage(e.getMessage());
|
bld.setMessage(e.getMessage());
|
||||||
|
@ -516,9 +518,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
final CachingStrategy cachingStrategy) throws IOException {
|
final CachingStrategy cachingStrategy) throws IOException {
|
||||||
previousOpClientName = clientName;
|
previousOpClientName = clientName;
|
||||||
long read = 0;
|
long read = 0;
|
||||||
|
updateCurrentThreadName("Sending block " + block);
|
||||||
OutputStream baseStream = getOutputStream();
|
OutputStream baseStream = getOutputStream();
|
||||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
DataOutputStream out = getBufferedOutputStream();
|
||||||
baseStream, smallBufferSize));
|
|
||||||
checkAccess(out, true, block, blockToken,
|
checkAccess(out, true, block, blockToken,
|
||||||
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
|
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
|
||||||
|
|
||||||
|
@ -534,7 +536,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
: dnR + " Served block " + block + " to " +
|
: dnR + " Served block " + block + " to " +
|
||||||
remoteAddress;
|
remoteAddress;
|
||||||
|
|
||||||
updateCurrentThreadName("Sending block " + block);
|
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
blockSender = new BlockSender(block, blockOffset, length,
|
blockSender = new BlockSender(block, blockOffset, length,
|
||||||
|
@ -630,6 +631,10 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
allowLazyPersist = allowLazyPersist &&
|
allowLazyPersist = allowLazyPersist &&
|
||||||
(dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
|
(dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
|
||||||
long size = 0;
|
long size = 0;
|
||||||
|
// reply to upstream datanode or client
|
||||||
|
final DataOutputStream replyOut = getBufferedOutputStream();
|
||||||
|
checkAccess(replyOut, isClient, block, blockToken,
|
||||||
|
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
// check single target for transfer-RBW/Finalized
|
// check single target for transfer-RBW/Finalized
|
||||||
if (isTransfer && targets.length > 0) {
|
if (isTransfer && targets.length > 0) {
|
||||||
throw new IOException(stage + " does not support multiple targets "
|
throw new IOException(stage + " does not support multiple targets "
|
||||||
|
@ -660,11 +665,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
|
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
|
||||||
+ localAddress);
|
+ localAddress);
|
||||||
|
|
||||||
// reply to upstream datanode or client
|
|
||||||
final DataOutputStream replyOut = getBufferedOutputStream();
|
|
||||||
checkAccess(replyOut, isClient, block, blockToken,
|
|
||||||
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
|
|
||||||
|
|
||||||
DataOutputStream mirrorOut = null; // stream to next target
|
DataOutputStream mirrorOut = null; // stream to next target
|
||||||
DataInputStream mirrorIn = null; // reply from next target
|
DataInputStream mirrorIn = null; // reply from next target
|
||||||
Socket mirrorSock = null; // socket to next target
|
Socket mirrorSock = null; // socket to next target
|
||||||
|
@ -863,13 +863,13 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
final String clientName,
|
final String clientName,
|
||||||
final DatanodeInfo[] targets,
|
final DatanodeInfo[] targets,
|
||||||
final StorageType[] targetStorageTypes) throws IOException {
|
final StorageType[] targetStorageTypes) throws IOException {
|
||||||
checkAccess(socketOut, true, blk, blockToken,
|
|
||||||
Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
|
|
||||||
previousOpClientName = clientName;
|
previousOpClientName = clientName;
|
||||||
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
||||||
|
|
||||||
final DataOutputStream out = new DataOutputStream(
|
final DataOutputStream out = new DataOutputStream(
|
||||||
getOutputStream());
|
getOutputStream());
|
||||||
|
checkAccess(out, true, blk, blockToken,
|
||||||
|
Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
|
||||||
try {
|
try {
|
||||||
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
||||||
targetStorageTypes, clientName);
|
targetStorageTypes, clientName);
|
||||||
|
@ -923,6 +923,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void blockChecksum(final ExtendedBlock block,
|
public void blockChecksum(final ExtendedBlock block,
|
||||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||||
|
updateCurrentThreadName("Getting checksum for block " + block);
|
||||||
final DataOutputStream out = new DataOutputStream(
|
final DataOutputStream out = new DataOutputStream(
|
||||||
getOutputStream());
|
getOutputStream());
|
||||||
checkAccess(out, true, block, blockToken,
|
checkAccess(out, true, block, blockToken,
|
||||||
|
@ -933,13 +934,11 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
long visibleLength = datanode.data.getReplicaVisibleLength(block);
|
long visibleLength = datanode.data.getReplicaVisibleLength(block);
|
||||||
boolean partialBlk = requestLength < visibleLength;
|
boolean partialBlk = requestLength < visibleLength;
|
||||||
|
|
||||||
updateCurrentThreadName("Reading metadata for block " + block);
|
|
||||||
final LengthInputStream metadataIn = datanode.data
|
final LengthInputStream metadataIn = datanode.data
|
||||||
.getMetaDataInputStream(block);
|
.getMetaDataInputStream(block);
|
||||||
|
|
||||||
final DataInputStream checksumIn = new DataInputStream(
|
final DataInputStream checksumIn = new DataInputStream(
|
||||||
new BufferedInputStream(metadataIn, ioFileBufferSize));
|
new BufferedInputStream(metadataIn, ioFileBufferSize));
|
||||||
updateCurrentThreadName("Getting checksum for block " + block);
|
|
||||||
try {
|
try {
|
||||||
//read metadata file
|
//read metadata file
|
||||||
final BlockMetadataHeader header = BlockMetadataHeader
|
final BlockMetadataHeader header = BlockMetadataHeader
|
||||||
|
@ -987,20 +986,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
public void copyBlock(final ExtendedBlock block,
|
public void copyBlock(final ExtendedBlock block,
|
||||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||||
updateCurrentThreadName("Copying block " + block);
|
updateCurrentThreadName("Copying block " + block);
|
||||||
// Read in the header
|
DataOutputStream reply = getBufferedOutputStream();
|
||||||
if (datanode.isBlockTokenEnabled) {
|
checkAccess(reply, true, block, blockToken,
|
||||||
try {
|
Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
|
||||||
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
|
||||||
BlockTokenIdentifier.AccessMode.COPY);
|
|
||||||
} catch (InvalidToken e) {
|
|
||||||
LOG.warn("Invalid access token in request from " + remoteAddress
|
|
||||||
+ " for OP_COPY_BLOCK for block " + block + " : "
|
|
||||||
+ e.getLocalizedMessage());
|
|
||||||
sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if (datanode.data.getPinning(block)) {
|
if (datanode.data.getPinning(block)) {
|
||||||
String msg = "Not able to copy block " + block.getBlockId() + " " +
|
String msg = "Not able to copy block " + block.getBlockId() + " " +
|
||||||
|
@ -1019,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockSender blockSender = null;
|
BlockSender blockSender = null;
|
||||||
DataOutputStream reply = null;
|
|
||||||
boolean isOpSuccess = true;
|
boolean isOpSuccess = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -1027,10 +1014,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
|
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
|
||||||
null, CachingStrategy.newDropBehind());
|
null, CachingStrategy.newDropBehind());
|
||||||
|
|
||||||
// set up response stream
|
|
||||||
OutputStream baseStream = getOutputStream();
|
OutputStream baseStream = getOutputStream();
|
||||||
reply = new DataOutputStream(new BufferedOutputStream(
|
|
||||||
baseStream, smallBufferSize));
|
|
||||||
|
|
||||||
// send status first
|
// send status first
|
||||||
writeSuccessWithChecksumInfo(blockSender, reply);
|
writeSuccessWithChecksumInfo(blockSender, reply);
|
||||||
|
@ -1074,20 +1058,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
final String delHint,
|
final String delHint,
|
||||||
final DatanodeInfo proxySource) throws IOException {
|
final DatanodeInfo proxySource) throws IOException {
|
||||||
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
|
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
|
||||||
|
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
||||||
/* read header */
|
checkAccess(replyOut, true, block, blockToken,
|
||||||
if (datanode.isBlockTokenEnabled) {
|
Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE);
|
||||||
try {
|
|
||||||
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
|
||||||
BlockTokenIdentifier.AccessMode.REPLACE);
|
|
||||||
} catch (InvalidToken e) {
|
|
||||||
LOG.warn("Invalid access token in request from " + remoteAddress
|
|
||||||
+ " for OP_REPLACE_BLOCK for block " + block + " : "
|
|
||||||
+ e.getLocalizedMessage());
|
|
||||||
sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
||||||
String msg = "Not able to receive block " + block.getBlockId() +
|
String msg = "Not able to receive block " + block.getBlockId() +
|
||||||
|
@ -1104,7 +1077,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
String errMsg = null;
|
String errMsg = null;
|
||||||
BlockReceiver blockReceiver = null;
|
BlockReceiver blockReceiver = null;
|
||||||
DataInputStream proxyReply = null;
|
DataInputStream proxyReply = null;
|
||||||
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
|
||||||
boolean IoeDuringCopyBlockOperation = false;
|
boolean IoeDuringCopyBlockOperation = false;
|
||||||
try {
|
try {
|
||||||
// Move the block to different storage in the same datanode
|
// Move the block to different storage in the same datanode
|
||||||
|
@ -1296,11 +1268,52 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
|
datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until the BP is registered, upto the configured amount of time.
|
||||||
|
* Throws an exception if times out, which should fail the client request.
|
||||||
|
* @param the requested block
|
||||||
|
*/
|
||||||
|
void checkAndWaitForBP(final ExtendedBlock block)
|
||||||
|
throws IOException {
|
||||||
|
String bpId = block.getBlockPoolId();
|
||||||
|
|
||||||
|
// The registration is only missing in relatively short time window.
|
||||||
|
// Optimistically perform this first.
|
||||||
|
try {
|
||||||
|
datanode.getDNRegistrationForBP(bpId);
|
||||||
|
return;
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// not registered
|
||||||
|
}
|
||||||
|
|
||||||
|
// retry
|
||||||
|
long bpReadyTimeout = dnConf.getBpReadyTimeout();
|
||||||
|
StopWatch sw = new StopWatch();
|
||||||
|
sw.start();
|
||||||
|
while (sw.now(TimeUnit.SECONDS) <= bpReadyTimeout) {
|
||||||
|
try {
|
||||||
|
datanode.getDNRegistrationForBP(bpId);
|
||||||
|
return;
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// not registered
|
||||||
|
}
|
||||||
|
// sleep before trying again
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw new IOException("Interrupted while serving request. Aborting.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// failed to obtain registration.
|
||||||
|
throw new IOException("Not ready to serve the block pool, " + bpId + ".");
|
||||||
|
}
|
||||||
|
|
||||||
private void checkAccess(OutputStream out, final boolean reply,
|
private void checkAccess(OutputStream out, final boolean reply,
|
||||||
final ExtendedBlock blk,
|
final ExtendedBlock blk,
|
||||||
final Token<BlockTokenIdentifier> t,
|
final Token<BlockTokenIdentifier> t,
|
||||||
final Op op,
|
final Op op,
|
||||||
final BlockTokenIdentifier.AccessMode mode) throws IOException {
|
final BlockTokenIdentifier.AccessMode mode) throws IOException {
|
||||||
|
checkAndWaitForBP(blk);
|
||||||
if (datanode.isBlockTokenEnabled) {
|
if (datanode.isBlockTokenEnabled) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Checking block access token for block '" + blk.getBlockId()
|
LOG.debug("Checking block access token for block '" + blk.getBlockId()
|
||||||
|
|
|
@ -2696,4 +2696,14 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.bp-ready.timeout</name>
|
||||||
|
<value>20</value>
|
||||||
|
<description>
|
||||||
|
The maximum wait time for datanode to be ready before failing the
|
||||||
|
received request. Setting this to 0 fails requests right away if the
|
||||||
|
datanode is not yet registered with the namenode. This wait time
|
||||||
|
reduces initial request failures after datanode restart.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.net.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
import org.apache.hadoop.hdfs.protocol.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
|
|
||||||
|
@ -162,17 +163,20 @@ public class TestDataXceiverLazyPersistHint {
|
||||||
return peer;
|
return peer;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) {
|
private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist)
|
||||||
|
throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(
|
conf.setBoolean(
|
||||||
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
|
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
|
||||||
nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
|
nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
|
||||||
DNConf dnConf = new DNConf(conf);
|
DNConf dnConf = new DNConf(conf);
|
||||||
|
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
|
||||||
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
|
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
|
||||||
DataNode mockDn = mock(DataNode.class);
|
DataNode mockDn = mock(DataNode.class);
|
||||||
when(mockDn.getDnConf()).thenReturn(dnConf);
|
when(mockDn.getDnConf()).thenReturn(dnConf);
|
||||||
when(mockDn.getConf()).thenReturn(conf);
|
when(mockDn.getConf()).thenReturn(conf);
|
||||||
when(mockDn.getMetrics()).thenReturn(mockMetrics);
|
when(mockDn.getMetrics()).thenReturn(mockMetrics);
|
||||||
|
when(mockDn.getDNRegistrationForBP("Dummy-pool")).thenReturn(mockDnReg);
|
||||||
return mockDn;
|
return mockDn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
|
@ -146,4 +147,75 @@ public class TestDatanodeRestart {
|
||||||
private static FsDatasetImpl dataset(DataNode dn) {
|
private static FsDatasetImpl dataset(DataNode dn) {
|
||||||
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWaitForRegistrationOnRestart() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY, 5);
|
||||||
|
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
|
||||||
|
|
||||||
|
// This makes the datanode appear registered to the NN, but it won't be
|
||||||
|
// able to get to the saved dn reg internally.
|
||||||
|
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public void noRegistration() throws IOException {
|
||||||
|
throw new IOException("no reg found for testing");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
|
||||||
|
DataNodeFaultInjector.set(dnFaultInjector);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
long start = 0;
|
||||||
|
Path file = new Path("/reg");
|
||||||
|
try {
|
||||||
|
int numDNs = 1;
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
start = System.currentTimeMillis();
|
||||||
|
FileSystem fileSys = cluster.getFileSystem();
|
||||||
|
try {
|
||||||
|
DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
|
||||||
|
// It is a bug if this does not fail.
|
||||||
|
throw new IOException("Did not fail!");
|
||||||
|
} catch (org.apache.hadoop.ipc.RemoteException e) {
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
// timers have at-least semantics, so it should be at least 5 seconds.
|
||||||
|
if (elapsed < 5000 || elapsed > 10000) {
|
||||||
|
throw new IOException(elapsed + " seconds passed.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DataNodeFaultInjector.set(oldDnInjector);
|
||||||
|
// this should succeed now.
|
||||||
|
DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
|
||||||
|
|
||||||
|
// turn it back to under-construction, so that the client calls
|
||||||
|
// getReplicaVisibleLength() rpc method against the datanode.
|
||||||
|
fileSys.append(file);
|
||||||
|
// back to simulating unregistered node.
|
||||||
|
DataNodeFaultInjector.set(dnFaultInjector);
|
||||||
|
byte[] buffer = new byte[8];
|
||||||
|
start = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
fileSys.open(file).read(0L, buffer, 0, 1);
|
||||||
|
throw new IOException("Did not fail!");
|
||||||
|
} catch (IOException e) {
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
if (e.getMessage().contains("readBlockLength")) {
|
||||||
|
throw new IOException("Failed, but with unexpected exception:", e);
|
||||||
|
}
|
||||||
|
// timers have at-least semantics, so it should be at least 5 seconds.
|
||||||
|
if (elapsed < 5000 || elapsed > 10000) {
|
||||||
|
throw new IOException(elapsed + " seconds passed.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DataNodeFaultInjector.set(oldDnInjector);
|
||||||
|
fileSys.open(file).read(0L, buffer, 0, 1);
|
||||||
|
} finally {
|
||||||
|
DataNodeFaultInjector.set(oldDnInjector);
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue