Reverted
Merged revision(s) 1617784 from hadoop/common/trunk: HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (Contributed by Vinayakumar B.) ........ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
471b1368e2
commit
6554994fab
|
@ -500,9 +500,6 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
|
HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
|
||||||
block replica (Arpit Agarwal)
|
block replica (Arpit Agarwal)
|
||||||
|
|
||||||
HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate
|
|
||||||
responses to Balancer (vinayakumarb)
|
|
||||||
|
|
||||||
Release 2.5.0 - UNRELEASED
|
Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -87,6 +87,8 @@ public class Dispatcher {
|
||||||
|
|
||||||
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
|
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
|
||||||
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
|
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
|
||||||
|
private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
|
||||||
|
// minutes
|
||||||
|
|
||||||
private final NameNodeConnector nnc;
|
private final NameNodeConnector nnc;
|
||||||
private final SaslDataTransferClient saslClient;
|
private final SaslDataTransferClient saslClient;
|
||||||
|
@ -276,6 +278,13 @@ public class Dispatcher {
|
||||||
sock.connect(
|
sock.connect(
|
||||||
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
|
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
|
/*
|
||||||
|
* Unfortunately we don't have a good way to know if the Datanode is
|
||||||
|
* taking a really long time to move a block, OR something has gone
|
||||||
|
* wrong and it's never going to finish. To deal with this scenario, we
|
||||||
|
* set a long timeout (20 minutes) to avoid hanging indefinitely.
|
||||||
|
*/
|
||||||
|
sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
|
||||||
|
|
||||||
sock.setKeepAlive(true);
|
sock.setKeepAlive(true);
|
||||||
|
|
||||||
|
@ -332,12 +341,8 @@ public class Dispatcher {
|
||||||
|
|
||||||
/** Receive a block copy response from the input stream */
|
/** Receive a block copy response from the input stream */
|
||||||
private void receiveResponse(DataInputStream in) throws IOException {
|
private void receiveResponse(DataInputStream in) throws IOException {
|
||||||
BlockOpResponseProto response =
|
BlockOpResponseProto response = BlockOpResponseProto
|
||||||
BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
.parseFrom(vintPrefixed(in));
|
||||||
while (response.getStatus() == Status.IN_PROGRESS) {
|
|
||||||
// read intermediate responses
|
|
||||||
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
|
||||||
}
|
|
||||||
if (response.getStatus() != Status.SUCCESS) {
|
if (response.getStatus() != Status.SUCCESS) {
|
||||||
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
||||||
throw new IOException("block move failed due to access token error");
|
throw new IOException("block move failed due to access token error");
|
||||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||||
|
@ -124,14 +123,6 @@ class BlockReceiver implements Closeable {
|
||||||
private boolean syncOnClose;
|
private boolean syncOnClose;
|
||||||
private long restartBudget;
|
private long restartBudget;
|
||||||
|
|
||||||
/**
|
|
||||||
* for replaceBlock response
|
|
||||||
*/
|
|
||||||
private final long responseInterval;
|
|
||||||
private long lastResponseTime = 0;
|
|
||||||
private boolean isReplaceBlock = false;
|
|
||||||
private DataOutputStream replyOut = null;
|
|
||||||
|
|
||||||
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
||||||
final DataInputStream in,
|
final DataInputStream in,
|
||||||
final String inAddr, final String myAddr,
|
final String inAddr, final String myAddr,
|
||||||
|
@ -153,9 +144,6 @@ class BlockReceiver implements Closeable {
|
||||||
this.isClient = !this.isDatanode;
|
this.isClient = !this.isDatanode;
|
||||||
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
|
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
|
||||||
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
||||||
// For replaceBlock() calls response should be sent to avoid socketTimeout
|
|
||||||
// at clients. So sending with the interval of 0.5 * socketTimeout
|
|
||||||
this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
|
|
||||||
//for datanode, we have
|
//for datanode, we have
|
||||||
//1: clientName.length() == 0, and
|
//1: clientName.length() == 0, and
|
||||||
//2: stage == null or PIPELINE_SETUP_CREATE
|
//2: stage == null or PIPELINE_SETUP_CREATE
|
||||||
|
@ -663,20 +651,6 @@ class BlockReceiver implements Closeable {
|
||||||
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Send in-progress responses for the replaceBlock() calls back to caller to
|
|
||||||
* avoid timeouts due to balancer throttling. HDFS-6247
|
|
||||||
*/
|
|
||||||
if (isReplaceBlock
|
|
||||||
&& (Time.monotonicNow() - lastResponseTime > responseInterval)) {
|
|
||||||
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
|
|
||||||
.setStatus(Status.IN_PROGRESS);
|
|
||||||
response.build().writeDelimitedTo(replyOut);
|
|
||||||
replyOut.flush();
|
|
||||||
|
|
||||||
lastResponseTime = Time.monotonicNow();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (throttler != null) { // throttle I/O
|
if (throttler != null) { // throttle I/O
|
||||||
throttler.throttle(len);
|
throttler.throttle(len);
|
||||||
}
|
}
|
||||||
|
@ -744,8 +718,7 @@ class BlockReceiver implements Closeable {
|
||||||
DataInputStream mirrIn, // input from next datanode
|
DataInputStream mirrIn, // input from next datanode
|
||||||
DataOutputStream replyOut, // output to previous datanode
|
DataOutputStream replyOut, // output to previous datanode
|
||||||
String mirrAddr, DataTransferThrottler throttlerArg,
|
String mirrAddr, DataTransferThrottler throttlerArg,
|
||||||
DatanodeInfo[] downstreams,
|
DatanodeInfo[] downstreams) throws IOException {
|
||||||
boolean isReplaceBlock) throws IOException {
|
|
||||||
|
|
||||||
syncOnClose = datanode.getDnConf().syncOnClose;
|
syncOnClose = datanode.getDnConf().syncOnClose;
|
||||||
boolean responderClosed = false;
|
boolean responderClosed = false;
|
||||||
|
@ -753,9 +726,6 @@ class BlockReceiver implements Closeable {
|
||||||
mirrorAddr = mirrAddr;
|
mirrorAddr = mirrAddr;
|
||||||
throttler = throttlerArg;
|
throttler = throttlerArg;
|
||||||
|
|
||||||
this.replyOut = replyOut;
|
|
||||||
this.isReplaceBlock = isReplaceBlock;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isClient && !isTransfer) {
|
if (isClient && !isTransfer) {
|
||||||
responder = new Daemon(datanode.threadGroup,
|
responder = new Daemon(datanode.threadGroup,
|
||||||
|
|
|
@ -708,7 +708,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
if (blockReceiver != null) {
|
if (blockReceiver != null) {
|
||||||
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
|
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
|
||||||
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
|
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
|
||||||
mirrorAddr, null, targets, false);
|
mirrorAddr, null, targets);
|
||||||
|
|
||||||
// send close-ack for transfer-RBW/Finalized
|
// send close-ack for transfer-RBW/Finalized
|
||||||
if (isTransfer) {
|
if (isTransfer) {
|
||||||
|
@ -983,7 +983,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
String errMsg = null;
|
String errMsg = null;
|
||||||
BlockReceiver blockReceiver = null;
|
BlockReceiver blockReceiver = null;
|
||||||
DataInputStream proxyReply = null;
|
DataInputStream proxyReply = null;
|
||||||
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
|
||||||
try {
|
try {
|
||||||
// get the output stream to the proxy
|
// get the output stream to the proxy
|
||||||
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
||||||
|
@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
CachingStrategy.newDropBehind());
|
CachingStrategy.newDropBehind());
|
||||||
|
|
||||||
// receive a block
|
// receive a block
|
||||||
blockReceiver.receiveBlock(null, null, replyOut, null,
|
blockReceiver.receiveBlock(null, null, null, null,
|
||||||
dataXceiverServer.balanceThrottler, null, true);
|
dataXceiverServer.balanceThrottler, null);
|
||||||
|
|
||||||
// notify name node
|
// notify name node
|
||||||
datanode.notifyNamenodeReceivedBlock(
|
datanode.notifyNamenodeReceivedBlock(
|
||||||
|
@ -1076,7 +1076,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
IOUtils.closeStream(proxyOut);
|
IOUtils.closeStream(proxyOut);
|
||||||
IOUtils.closeStream(blockReceiver);
|
IOUtils.closeStream(blockReceiver);
|
||||||
IOUtils.closeStream(proxyReply);
|
IOUtils.closeStream(proxyReply);
|
||||||
IOUtils.closeStream(replyOut);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//update metrics
|
//update metrics
|
||||||
|
|
|
@ -207,7 +207,6 @@ enum Status {
|
||||||
OOB_RESERVED1 = 9; // Reserved
|
OOB_RESERVED1 = 9; // Reserved
|
||||||
OOB_RESERVED2 = 10; // Reserved
|
OOB_RESERVED2 = 10; // Reserved
|
||||||
OOB_RESERVED3 = 11; // Reserved
|
OOB_RESERVED3 = 11; // Reserved
|
||||||
IN_PROGRESS = 12;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message PipelineAckProto {
|
message PipelineAckProto {
|
||||||
|
|
|
@ -272,10 +272,8 @@ public class TestBlockReplacement {
|
||||||
// receiveResponse
|
// receiveResponse
|
||||||
DataInputStream reply = new DataInputStream(sock.getInputStream());
|
DataInputStream reply = new DataInputStream(sock.getInputStream());
|
||||||
|
|
||||||
BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
|
BlockOpResponseProto proto =
|
||||||
while (proto.getStatus() == Status.IN_PROGRESS) {
|
BlockOpResponseProto.parseDelimitedFrom(reply);
|
||||||
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
|
|
||||||
}
|
|
||||||
return proto.getStatus() == Status.SUCCESS;
|
return proto.getStatus() == Status.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue