From 195961a7c1da86421761162836766b1de07930fd Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Wed, 13 Aug 2014 18:43:05 +0000 Subject: [PATCH] HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (vinayakumarb) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617799 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../hdfs/server/balancer/Dispatcher.java | 17 ++++------ .../hdfs/server/datanode/BlockReceiver.java | 32 ++++++++++++++++++- .../hdfs/server/datanode/DataXceiver.java | 9 +++--- .../src/main/proto/datatransfer.proto | 1 + .../server/datanode/TestBlockReplacement.java | 6 ++-- 6 files changed, 50 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 685cd6211d6..9253b8b88a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -500,6 +500,9 @@ Release 2.6.0 - UNRELEASED HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a block replica (Arpit Agarwal) + HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate + responses to Balancer (vinayakumarb) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 4a6f96be7e5..b7dceb6f48d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -87,8 +87,6 @@ public class Dispatcher { private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds - private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20 - // minutes private final NameNodeConnector nnc; private final SaslDataTransferClient saslClient; @@ -278,13 +276,6 @@ public class Dispatcher { sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); - /* - * Unfortunately we don't have a good way to know if the Datanode is - * taking a really long time to move a block, OR something has gone - * wrong and it's never going to finish. To deal with this scenario, we - * set a long timeout (20 minutes) to avoid hanging indefinitely. - */ - sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); sock.setKeepAlive(true); @@ -341,8 +332,12 @@ public class Dispatcher { /** Receive a block copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto - .parseFrom(vintPrefixed(in)); + BlockOpResponseProto response = + BlockOpResponseProto.parseFrom(vintPrefixed(in)); + while (response.getStatus() == Status.IN_PROGRESS) { + // read intermediate responses + response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + } if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new IOException("block move failed due to access token error"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index fb7ecd69e9a..9dcd006ed51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -123,6 +124,14 @@ class BlockReceiver implements Closeable { private boolean syncOnClose; private long restartBudget; + /** + * for replaceBlock response + */ + private final long responseInterval; + private long lastResponseTime = 0; + private boolean isReplaceBlock = false; + private DataOutputStream replyOut = null; + BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, final String inAddr, final String myAddr, @@ -144,6 +153,9 @@ class BlockReceiver implements Closeable { this.isClient = !this.isDatanode; this.restartBudget = datanode.getDnConf().restartReplicaExpiry; this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; + // For replaceBlock() calls response should be sent to avoid socketTimeout + // at clients. So sending with the interval of 0.5 * socketTimeout + this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); //for datanode, we have //1: clientName.length() == 0, and //2: stage == null or PIPELINE_SETUP_CREATE @@ -651,6 +663,20 @@ class BlockReceiver implements Closeable { lastPacketInBlock, offsetInBlock, Status.SUCCESS); } + /* + * Send in-progress responses for the replaceBlock() calls back to caller to + * avoid timeouts due to balancer throttling. HDFS-6247 + */ + if (isReplaceBlock + && (Time.monotonicNow() - lastResponseTime > responseInterval)) { + BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() + .setStatus(Status.IN_PROGRESS); + response.build().writeDelimitedTo(replyOut); + replyOut.flush(); + + lastResponseTime = Time.monotonicNow(); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); } @@ -718,7 +744,8 @@ class BlockReceiver implements Closeable { DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, - DatanodeInfo[] downstreams) throws IOException { + DatanodeInfo[] downstreams, + boolean isReplaceBlock) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; @@ -726,6 +753,9 @@ class BlockReceiver implements Closeable { mirrorAddr = mirrAddr; throttler = throttlerArg; + this.replyOut = replyOut; + this.isReplaceBlock = isReplaceBlock; + try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 5ef6cc7ee22..01fe036f1d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -708,7 +708,7 @@ class DataXceiver extends Receiver implements Runnable { if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets); + mirrorAddr, null, targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) { @@ -983,7 +983,7 @@ class DataXceiver extends Receiver implements Runnable { String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver implements Runnable { CachingStrategy.newDropBehind()); // receive a block - blockReceiver.receiveBlock(null, null, null, null, - dataXceiverServer.balanceThrottler, null); + blockReceiver.receiveBlock(null, null, replyOut, null, + dataXceiverServer.balanceThrottler, null, true); // notify name node datanode.notifyNamenodeReceivedBlock( @@ -1076,6 +1076,7 @@ class DataXceiver extends Receiver implements Runnable { IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); IOUtils.closeStream(proxyReply); + IOUtils.closeStream(replyOut); } //update metrics diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 9b4ba339d23..6283b569dda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -207,6 +207,7 @@ enum Status { OOB_RESERVED1 = 9; // Reserved OOB_RESERVED2 = 10; // Reserved OOB_RESERVED3 = 11; // Reserved + IN_PROGRESS = 12; } message PipelineAckProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index 478b6d1546b..e0d79648a88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -272,8 +272,10 @@ public class TestBlockReplacement { // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); - BlockOpResponseProto proto = - BlockOpResponseProto.parseDelimitedFrom(reply); + BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); + while (proto.getStatus() == Status.IN_PROGRESS) { + proto = BlockOpResponseProto.parseDelimitedFrom(reply); + } return proto.getStatus() == Status.SUCCESS; }