HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (vinayakumarb)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617799 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6554994fab
commit
195961a7c1
|
@ -500,6 +500,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
|
HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
|
||||||
block replica (Arpit Agarwal)
|
block replica (Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate
|
||||||
|
responses to Balancer (vinayakumarb)
|
||||||
|
|
||||||
Release 2.5.0 - UNRELEASED
|
Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -87,8 +87,6 @@ public class Dispatcher {
|
||||||
|
|
||||||
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
|
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
|
||||||
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
|
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
|
||||||
private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
|
|
||||||
// minutes
|
|
||||||
|
|
||||||
private final NameNodeConnector nnc;
|
private final NameNodeConnector nnc;
|
||||||
private final SaslDataTransferClient saslClient;
|
private final SaslDataTransferClient saslClient;
|
||||||
|
@ -278,13 +276,6 @@ public class Dispatcher {
|
||||||
sock.connect(
|
sock.connect(
|
||||||
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
|
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
/*
|
|
||||||
* Unfortunately we don't have a good way to know if the Datanode is
|
|
||||||
* taking a really long time to move a block, OR something has gone
|
|
||||||
* wrong and it's never going to finish. To deal with this scenario, we
|
|
||||||
* set a long timeout (20 minutes) to avoid hanging indefinitely.
|
|
||||||
*/
|
|
||||||
sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
|
|
||||||
|
|
||||||
sock.setKeepAlive(true);
|
sock.setKeepAlive(true);
|
||||||
|
|
||||||
|
@ -341,8 +332,12 @@ public class Dispatcher {
|
||||||
|
|
||||||
/** Receive a block copy response from the input stream */
|
/** Receive a block copy response from the input stream */
|
||||||
private void receiveResponse(DataInputStream in) throws IOException {
|
private void receiveResponse(DataInputStream in) throws IOException {
|
||||||
BlockOpResponseProto response = BlockOpResponseProto
|
BlockOpResponseProto response =
|
||||||
.parseFrom(vintPrefixed(in));
|
BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
||||||
|
while (response.getStatus() == Status.IN_PROGRESS) {
|
||||||
|
// read intermediate responses
|
||||||
|
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
||||||
|
}
|
||||||
if (response.getStatus() != Status.SUCCESS) {
|
if (response.getStatus() != Status.SUCCESS) {
|
||||||
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
||||||
throw new IOException("block move failed due to access token error");
|
throw new IOException("block move failed due to access token error");
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||||
|
@ -123,6 +124,14 @@ class BlockReceiver implements Closeable {
|
||||||
private boolean syncOnClose;
|
private boolean syncOnClose;
|
||||||
private long restartBudget;
|
private long restartBudget;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* for replaceBlock response
|
||||||
|
*/
|
||||||
|
private final long responseInterval;
|
||||||
|
private long lastResponseTime = 0;
|
||||||
|
private boolean isReplaceBlock = false;
|
||||||
|
private DataOutputStream replyOut = null;
|
||||||
|
|
||||||
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
||||||
final DataInputStream in,
|
final DataInputStream in,
|
||||||
final String inAddr, final String myAddr,
|
final String inAddr, final String myAddr,
|
||||||
|
@ -144,6 +153,9 @@ class BlockReceiver implements Closeable {
|
||||||
this.isClient = !this.isDatanode;
|
this.isClient = !this.isDatanode;
|
||||||
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
|
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
|
||||||
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
||||||
|
// For replaceBlock() calls response should be sent to avoid socketTimeout
|
||||||
|
// at clients. So sending with the interval of 0.5 * socketTimeout
|
||||||
|
this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
|
||||||
//for datanode, we have
|
//for datanode, we have
|
||||||
//1: clientName.length() == 0, and
|
//1: clientName.length() == 0, and
|
||||||
//2: stage == null or PIPELINE_SETUP_CREATE
|
//2: stage == null or PIPELINE_SETUP_CREATE
|
||||||
|
@ -651,6 +663,20 @@ class BlockReceiver implements Closeable {
|
||||||
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Send in-progress responses for the replaceBlock() calls back to caller to
|
||||||
|
* avoid timeouts due to balancer throttling. HDFS-6247
|
||||||
|
*/
|
||||||
|
if (isReplaceBlock
|
||||||
|
&& (Time.monotonicNow() - lastResponseTime > responseInterval)) {
|
||||||
|
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
|
||||||
|
.setStatus(Status.IN_PROGRESS);
|
||||||
|
response.build().writeDelimitedTo(replyOut);
|
||||||
|
replyOut.flush();
|
||||||
|
|
||||||
|
lastResponseTime = Time.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
if (throttler != null) { // throttle I/O
|
if (throttler != null) { // throttle I/O
|
||||||
throttler.throttle(len);
|
throttler.throttle(len);
|
||||||
}
|
}
|
||||||
|
@ -718,7 +744,8 @@ class BlockReceiver implements Closeable {
|
||||||
DataInputStream mirrIn, // input from next datanode
|
DataInputStream mirrIn, // input from next datanode
|
||||||
DataOutputStream replyOut, // output to previous datanode
|
DataOutputStream replyOut, // output to previous datanode
|
||||||
String mirrAddr, DataTransferThrottler throttlerArg,
|
String mirrAddr, DataTransferThrottler throttlerArg,
|
||||||
DatanodeInfo[] downstreams) throws IOException {
|
DatanodeInfo[] downstreams,
|
||||||
|
boolean isReplaceBlock) throws IOException {
|
||||||
|
|
||||||
syncOnClose = datanode.getDnConf().syncOnClose;
|
syncOnClose = datanode.getDnConf().syncOnClose;
|
||||||
boolean responderClosed = false;
|
boolean responderClosed = false;
|
||||||
|
@ -726,6 +753,9 @@ class BlockReceiver implements Closeable {
|
||||||
mirrorAddr = mirrAddr;
|
mirrorAddr = mirrAddr;
|
||||||
throttler = throttlerArg;
|
throttler = throttlerArg;
|
||||||
|
|
||||||
|
this.replyOut = replyOut;
|
||||||
|
this.isReplaceBlock = isReplaceBlock;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isClient && !isTransfer) {
|
if (isClient && !isTransfer) {
|
||||||
responder = new Daemon(datanode.threadGroup,
|
responder = new Daemon(datanode.threadGroup,
|
||||||
|
|
|
@ -708,7 +708,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
if (blockReceiver != null) {
|
if (blockReceiver != null) {
|
||||||
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
|
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
|
||||||
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
|
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
|
||||||
mirrorAddr, null, targets);
|
mirrorAddr, null, targets, false);
|
||||||
|
|
||||||
// send close-ack for transfer-RBW/Finalized
|
// send close-ack for transfer-RBW/Finalized
|
||||||
if (isTransfer) {
|
if (isTransfer) {
|
||||||
|
@ -983,7 +983,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
String errMsg = null;
|
String errMsg = null;
|
||||||
BlockReceiver blockReceiver = null;
|
BlockReceiver blockReceiver = null;
|
||||||
DataInputStream proxyReply = null;
|
DataInputStream proxyReply = null;
|
||||||
|
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
||||||
try {
|
try {
|
||||||
// get the output stream to the proxy
|
// get the output stream to the proxy
|
||||||
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
||||||
|
@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
CachingStrategy.newDropBehind());
|
CachingStrategy.newDropBehind());
|
||||||
|
|
||||||
// receive a block
|
// receive a block
|
||||||
blockReceiver.receiveBlock(null, null, null, null,
|
blockReceiver.receiveBlock(null, null, replyOut, null,
|
||||||
dataXceiverServer.balanceThrottler, null);
|
dataXceiverServer.balanceThrottler, null, true);
|
||||||
|
|
||||||
// notify name node
|
// notify name node
|
||||||
datanode.notifyNamenodeReceivedBlock(
|
datanode.notifyNamenodeReceivedBlock(
|
||||||
|
@ -1076,6 +1076,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
IOUtils.closeStream(proxyOut);
|
IOUtils.closeStream(proxyOut);
|
||||||
IOUtils.closeStream(blockReceiver);
|
IOUtils.closeStream(blockReceiver);
|
||||||
IOUtils.closeStream(proxyReply);
|
IOUtils.closeStream(proxyReply);
|
||||||
|
IOUtils.closeStream(replyOut);
|
||||||
}
|
}
|
||||||
|
|
||||||
//update metrics
|
//update metrics
|
||||||
|
|
|
@ -207,6 +207,7 @@ enum Status {
|
||||||
OOB_RESERVED1 = 9; // Reserved
|
OOB_RESERVED1 = 9; // Reserved
|
||||||
OOB_RESERVED2 = 10; // Reserved
|
OOB_RESERVED2 = 10; // Reserved
|
||||||
OOB_RESERVED3 = 11; // Reserved
|
OOB_RESERVED3 = 11; // Reserved
|
||||||
|
IN_PROGRESS = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
message PipelineAckProto {
|
message PipelineAckProto {
|
||||||
|
|
|
@ -272,8 +272,10 @@ public class TestBlockReplacement {
|
||||||
// receiveResponse
|
// receiveResponse
|
||||||
DataInputStream reply = new DataInputStream(sock.getInputStream());
|
DataInputStream reply = new DataInputStream(sock.getInputStream());
|
||||||
|
|
||||||
BlockOpResponseProto proto =
|
BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
|
||||||
BlockOpResponseProto.parseDelimitedFrom(reply);
|
while (proto.getStatus() == Status.IN_PROGRESS) {
|
||||||
|
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
|
||||||
|
}
|
||||||
return proto.getStatus() == Status.SUCCESS;
|
return proto.getStatus() == Status.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue