HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (Contributed by Vinayakumar B.)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617784 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinayakumar B 2014-08-13 18:06:33 +00:00
parent 03d8ff496a
commit 471b1368e2
6 changed files with 50 additions and 18 deletions

View File

@ -500,6 +500,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
block replica (Arpit Agarwal)
HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate
responses to Balancer (vinayakumarb)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -87,8 +87,6 @@ public class Dispatcher {
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
// minutes
private final NameNodeConnector nnc;
private final SaslDataTransferClient saslClient;
@ -278,13 +276,6 @@ public class Dispatcher {
sock.connect(
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
/*
* Unfortunately we don't have a good way to know if the Datanode is
* taking a really long time to move a block, OR something has gone
* wrong and it's never going to finish. To deal with this scenario, we
* set a long timeout (20 minutes) to avoid hanging indefinitely.
*/
sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
sock.setKeepAlive(true);
@ -341,8 +332,12 @@ public class Dispatcher {
/** Receive a block copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
BlockOpResponseProto response = BlockOpResponseProto
.parseFrom(vintPrefixed(in));
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(vintPrefixed(in));
while (response.getStatus() == Status.IN_PROGRESS) {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
}
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new IOException("block move failed due to access token error");

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@ -123,6 +124,14 @@ class BlockReceiver implements Closeable {
private boolean syncOnClose;
private long restartBudget;
/**
* for replaceBlock response
*/
private final long responseInterval;
private long lastResponseTime = 0;
private boolean isReplaceBlock = false;
private DataOutputStream replyOut = null;
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
@ -144,6 +153,9 @@ class BlockReceiver implements Closeable {
this.isClient = !this.isDatanode;
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
// For replaceBlock() calls response should be sent to avoid socketTimeout
// at clients. So sending with the interval of 0.5 * socketTimeout
this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
//for datanode, we have
//1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE
@ -651,6 +663,20 @@ class BlockReceiver implements Closeable {
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
/*
* Send in-progress responses for the replaceBlock() calls back to caller to
* avoid timeouts due to balancer throttling. HDFS-6247
*/
if (isReplaceBlock
&& (Time.monotonicNow() - lastResponseTime > responseInterval)) {
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
.setStatus(Status.IN_PROGRESS);
response.build().writeDelimitedTo(replyOut);
replyOut.flush();
lastResponseTime = Time.monotonicNow();
}
if (throttler != null) { // throttle I/O
throttler.throttle(len);
}
@ -718,7 +744,8 @@ class BlockReceiver implements Closeable {
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams) throws IOException {
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
@ -726,6 +753,9 @@ class BlockReceiver implements Closeable {
mirrorAddr = mirrAddr;
throttler = throttlerArg;
this.replyOut = replyOut;
this.isReplaceBlock = isReplaceBlock;
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,

View File

@ -708,7 +708,7 @@ class DataXceiver extends Receiver implements Runnable {
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets);
mirrorAddr, null, targets, false);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
@ -983,7 +983,7 @@ class DataXceiver extends Receiver implements Runnable {
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
try {
// get the output stream to the proxy
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver implements Runnable {
CachingStrategy.newDropBehind());
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
dataXceiverServer.balanceThrottler, null);
blockReceiver.receiveBlock(null, null, replyOut, null,
dataXceiverServer.balanceThrottler, null, true);
// notify name node
datanode.notifyNamenodeReceivedBlock(
@ -1076,6 +1076,7 @@ class DataXceiver extends Receiver implements Runnable {
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
IOUtils.closeStream(proxyReply);
IOUtils.closeStream(replyOut);
}
//update metrics

View File

@ -207,6 +207,7 @@ enum Status {
OOB_RESERVED1 = 9; // Reserved
OOB_RESERVED2 = 10; // Reserved
OOB_RESERVED3 = 11; // Reserved
IN_PROGRESS = 12;
}
message PipelineAckProto {

View File

@ -272,8 +272,10 @@ public class TestBlockReplacement {
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
BlockOpResponseProto proto =
BlockOpResponseProto.parseDelimitedFrom(reply);
BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
while (proto.getStatus() == Status.IN_PROGRESS) {
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
}
return proto.getStatus() == Status.SUCCESS;
}