HDFS-7270. Add congestion signaling capability to DataNode write protocol. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-02-05 10:58:58 -08:00
parent 84df660af4
commit 67002a5fc0
8 changed files with 147 additions and 40 deletions

View File

@ -293,6 +293,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7712. Switch blockStateChangeLog to use slf4j. (wang) HDFS-7712. Switch blockStateChangeLog to use slf4j. (wang)
HDFS-7270. Add congestion signaling capability to DataNode write protocol.
(wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -767,4 +767,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String NNTOP_WINDOWS_MINUTES_KEY = public static final String NNTOP_WINDOWS_MINUTES_KEY =
"dfs.namenode.top.windows.minutes"; "dfs.namenode.top.windows.minutes";
public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"}; public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn";
public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false;
} }

View File

@ -891,7 +891,8 @@ public class DFSOutputStream extends FSOutputSummer
long seqno = ack.getSeqno(); long seqno = ack.getSeqno();
// processes response status from datanodes. // processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = ack.getReply(i); final Status reply = PipelineAck.getStatusFromHeader(ack
.getReply(i));
// Restart will not be treated differently unless it is // Restart will not be treated differently unless it is
// the local node or the only one in the pipeline. // the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) && if (PipelineAck.isRestartOOBStatus(reply) &&

View File

@ -22,17 +22,21 @@ import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
import org.apache.hadoop.hdfs.util.LongBitFormat;
/** Pipeline Acknowledgment **/ /** Pipeline Acknowledgment **/
@InterfaceAudience.Private @InterfaceAudience.Private
@ -46,6 +50,55 @@ public class PipelineAck {
// place holder for timeout value of each OOB type // place holder for timeout value of each OOB type
final static long[] OOB_TIMEOUT; final static long[] OOB_TIMEOUT;
public enum ECN {
DISABLED(0),
SUPPORTED(1),
SUPPORTED2(2),
CONGESTED(3);
private final int value;
private static final ECN[] VALUES = values();
static ECN valueOf(int value) {
return VALUES[value];
}
ECN(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
private enum StatusFormat {
STATUS(null, 4),
RESERVED(STATUS.BITS, 1),
ECN_BITS(RESERVED.BITS, 2);
private final LongBitFormat BITS;
StatusFormat(LongBitFormat prev, int bits) {
BITS = new LongBitFormat(name(), prev, bits, 0);
}
static Status getStatus(int header) {
return Status.valueOf((int) STATUS.BITS.retrieve(header));
}
static ECN getECN(int header) {
return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
}
public static int setStatus(int old, Status status) {
return (int) STATUS.BITS.combine(status.getNumber(), old);
}
public static int setECN(int old, ECN ecn) {
return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
}
}
static { static {
OOB_TIMEOUT = new long[NUM_OOB_TYPES]; OOB_TIMEOUT = new long[NUM_OOB_TYPES];
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
@ -65,7 +118,7 @@ public class PipelineAck {
* @param seqno sequence number * @param seqno sequence number
* @param replies an array of replies * @param replies an array of replies
*/ */
public PipelineAck(long seqno, Status[] replies) { public PipelineAck(long seqno, int[] replies) {
this(seqno, replies, 0L); this(seqno, replies, 0L);
} }
@ -75,10 +128,15 @@ public class PipelineAck {
* @param replies an array of replies * @param replies an array of replies
* @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
*/ */
public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) { public PipelineAck(long seqno, int[] replies,
long downstreamAckTimeNanos) {
ArrayList<Integer> replyList = Lists.newArrayList();
for (int r : replies) {
replyList.add(r);
}
proto = PipelineAckProto.newBuilder() proto = PipelineAckProto.newBuilder()
.setSeqno(seqno) .setSeqno(seqno)
.addAllStatus(Arrays.asList(replies)) .addAllReply(replyList)
.setDownstreamAckTimeNanos(downstreamAckTimeNanos) .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
.build(); .build();
} }
@ -96,15 +154,15 @@ public class PipelineAck {
* @return the number of replies * @return the number of replies
*/ */
public short getNumOfReplies() { public short getNumOfReplies() {
return (short)proto.getStatusCount(); return (short)proto.getReplyCount();
} }
/** /**
* get the ith reply * get the ith reply
* @return the the ith reply * @return the the ith reply
*/ */
public Status getReply(int i) { public int getReply(int i) {
return proto.getStatus(i); return proto.getReply(i);
} }
/** /**
@ -120,8 +178,8 @@ public class PipelineAck {
* @return true if all statuses are SUCCESS * @return true if all statuses are SUCCESS
*/ */
public boolean isSuccess() { public boolean isSuccess() {
for (Status reply : proto.getStatusList()) { for (int reply : proto.getReplyList()) {
if (reply != Status.SUCCESS) { if (StatusFormat.getStatus(reply) != Status.SUCCESS) {
return false; return false;
} }
} }
@ -138,11 +196,12 @@ public class PipelineAck {
if (getSeqno() != UNKOWN_SEQNO) { if (getSeqno() != UNKOWN_SEQNO) {
return null; return null;
} }
for (Status reply : proto.getStatusList()) { for (int reply : proto.getReplyList()) {
// The following check is valid because protobuf guarantees to // The following check is valid because protobuf guarantees to
// preserve the ordering of enum elements. // preserve the ordering of enum elements.
if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) { Status s = StatusFormat.getStatus(reply);
return reply; if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
return s;
} }
} }
return null; return null;
@ -184,4 +243,19 @@ public class PipelineAck {
public String toString() { public String toString() {
return TextFormat.shortDebugString(proto); return TextFormat.shortDebugString(proto);
} }
public static Status getStatusFromHeader(int header) {
return StatusFormat.getStatus(header);
}
public static int setStatusForHeader(int old, Status status) {
return StatusFormat.setStatus(old, status);
}
public static int combineHeader(ECN ecn, Status status) {
int header = 0;
header = StatusFormat.setStatus(header, status);
header = StatusFormat.setECN(header, ecn);
return header;
}
} }

View File

@ -987,8 +987,6 @@ class BlockReceiver implements Closeable {
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
} }
private static final Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
/** /**
* Processes responses from downstream datanodes in the pipeline * Processes responses from downstream datanodes in the pipeline
* and sends back replies to the originator. * and sends back replies to the originator.
@ -1092,7 +1090,7 @@ class BlockReceiver implements Closeable {
LOG.info("Sending an out of band ack of type " + ackStatus); LOG.info("Sending an out of band ack of type " + ackStatus);
try { try {
sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L, sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
ackStatus); PipelineAck.combineHeader(datanode.getECN(), ackStatus));
} finally { } finally {
// Let others send ack. Unless there are miltiple OOB send // Let others send ack. Unless there are miltiple OOB send
// calls, there can be only one waiter, the responder thread. // calls, there can be only one waiter, the responder thread.
@ -1175,7 +1173,8 @@ class BlockReceiver implements Closeable {
if (oobStatus != null) { if (oobStatus != null) {
LOG.info("Relaying an out of band ack of type " + oobStatus); LOG.info("Relaying an out of band ack of type " + oobStatus);
sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L, sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
Status.SUCCESS); PipelineAck.combineHeader(datanode.getECN(),
Status.SUCCESS));
continue; continue;
} }
seqno = ack.getSeqno(); seqno = ack.getSeqno();
@ -1249,9 +1248,10 @@ class BlockReceiver implements Closeable {
finalizeBlock(startTime); finalizeBlock(startTime);
} }
Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS;
sendAckUpstream(ack, expected, totalAckTimeNanos, sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0), (pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS)); PipelineAck.combineHeader(datanode.getECN(), myStatus));
if (pkt != null) { if (pkt != null) {
// remove the packet from the ack queue // remove the packet from the ack queue
removeAckHead(); removeAckHead();
@ -1311,11 +1311,11 @@ class BlockReceiver implements Closeable {
* @param totalAckTimeNanos total ack time including all the downstream * @param totalAckTimeNanos total ack time including all the downstream
* nodes * nodes
* @param offsetInBlock offset in block for the data in packet * @param offsetInBlock offset in block for the data in packet
* @param myStatus the local ack status * @param myHeader the local ack header
*/ */
private void sendAckUpstream(PipelineAck ack, long seqno, private void sendAckUpstream(PipelineAck ack, long seqno,
long totalAckTimeNanos, long offsetInBlock, long totalAckTimeNanos, long offsetInBlock,
Status myStatus) throws IOException { int myHeader) throws IOException {
try { try {
// Wait for other sender to finish. Unless there is an OOB being sent, // Wait for other sender to finish. Unless there is an OOB being sent,
// the responder won't have to wait. // the responder won't have to wait.
@ -1329,7 +1329,7 @@ class BlockReceiver implements Closeable {
try { try {
if (!running) return; if (!running) return;
sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
offsetInBlock, myStatus); offsetInBlock, myHeader);
} finally { } finally {
synchronized(this) { synchronized(this) {
sending = false; sending = false;
@ -1349,32 +1349,34 @@ class BlockReceiver implements Closeable {
* @param totalAckTimeNanos total ack time including all the downstream * @param totalAckTimeNanos total ack time including all the downstream
* nodes * nodes
* @param offsetInBlock offset in block for the data in packet * @param offsetInBlock offset in block for the data in packet
* @param myStatus the local ack status * @param myHeader the local ack header
*/ */
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
long totalAckTimeNanos, long offsetInBlock, Status myStatus) long totalAckTimeNanos, long offsetInBlock, int myHeader)
throws IOException { throws IOException {
Status[] replies = null; final int[] replies;
if (ack == null) { if (ack == null) {
// A new OOB response is being sent from this node. Regardless of // A new OOB response is being sent from this node. Regardless of
// downstream nodes, reply should contain one reply. // downstream nodes, reply should contain one reply.
replies = new Status[1]; replies = new int[] { myHeader };
replies[0] = myStatus;
} else if (mirrorError) { // ack read error } else if (mirrorError) { // ack read error
replies = MIRROR_ERROR_STATUS; int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
replies = new int[] {h, h1};
} else { } else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
.getNumOfReplies(); .getNumOfReplies();
replies = new Status[1 + ackLen]; replies = new int[ackLen + 1];
replies[0] = myStatus; replies[0] = myHeader;
for (int i = 0; i < ackLen; i++) { for (int i = 0; i < ackLen; ++i) {
replies[i + 1] = ack.getReply(i); replies[i + 1] = ack.getReply(i);
} }
// If the mirror has reported that it received a corrupt packet, // If the mirror has reported that it received a corrupt packet,
// do self-destruct to mark myself bad, instead of making the // do self-destruct to mark myself bad, instead of making the
// mirror node bad. The mirror is guaranteed to be good without // mirror node bad. The mirror is guaranteed to be good without
// corrupt data on disk. // corrupt data on disk.
if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) { if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) ==
Status.ERROR_CHECKSUM) {
throw new IOException("Shutting down writer and responder " throw new IOException("Shutting down writer and responder "
+ "since the down streams reported the data sent by this " + "since the down streams reported the data sent by this "
+ "thread is corrupt"); + "thread is corrupt");
@ -1401,6 +1403,7 @@ class BlockReceiver implements Closeable {
// If a corruption was detected in the received data, terminate after // If a corruption was detected in the received data, terminate after
// sending ERROR_CHECKSUM back. // sending ERROR_CHECKSUM back.
Status myStatus = PipelineAck.getStatusFromHeader(myHeader);
if (myStatus == Status.ERROR_CHECKSUM) { if (myStatus == Status.ERROR_CHECKSUM) {
throw new IOException("Shutting down writer and responder " throw new IOException("Shutting down writer and responder "
+ "due to a checksum error in received data. The error " + "due to a checksum error in received data. The error "

View File

@ -125,6 +125,7 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
@ -336,6 +337,7 @@ public class DataNode extends ReconfigurableBase
private Configuration conf; private Configuration conf;
private final String confVersion; private final String confVersion;
private final long maxNumberOfBlocksToLog; private final long maxNumberOfBlocksToLog;
private final boolean pipelineSupportECN;
private final List<String> usersWithLocalPathAccess; private final List<String> usersWithLocalPathAccess;
private final boolean connectToDnViaHostname; private final boolean connectToDnViaHostname;
@ -368,6 +370,7 @@ public class DataNode extends ReconfigurableBase
this.usersWithLocalPathAccess = null; this.usersWithLocalPathAccess = null;
this.connectToDnViaHostname = false; this.connectToDnViaHostname = false;
this.getHdfsBlockLocationsEnabled = false; this.getHdfsBlockLocationsEnabled = false;
this.pipelineSupportECN = false;
} }
/** /**
@ -395,6 +398,9 @@ public class DataNode extends ReconfigurableBase
this.isPermissionEnabled = conf.getBoolean( this.isPermissionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
this.pipelineSupportECN = conf.getBoolean(
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
confVersion = "core-" + confVersion = "core-" +
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
@ -469,6 +475,19 @@ public class DataNode extends ReconfigurableBase
return reconfigurable; return reconfigurable;
} }
/**
* The ECN bit for the DataNode. The DataNode should return:
* <ul>
* <li>ECN.DISABLED when ECN is disabled.</li>
* <li>ECN.SUPPORTED when ECN is enabled but the DN still has capacity.</li>
* <li>ECN.CONGESTED when ECN is enabled and the DN is congested.</li>
* </ul>
*/
public PipelineAck.ECN getECN() {
return pipelineSupportECN ? PipelineAck.ECN.SUPPORTED : PipelineAck.ECN
.DISABLED;
}
/** /**
* Contains the StorageLocations for changed data volumes. * Contains the StorageLocations for changed data volumes.
*/ */

View File

@ -210,6 +210,7 @@ message PacketHeaderProto {
optional bool syncBlock = 5 [default = false]; optional bool syncBlock = 5 [default = false];
} }
// Status is a 4-bit enum
enum Status { enum Status {
SUCCESS = 0; SUCCESS = 0;
ERROR = 1; ERROR = 1;
@ -228,7 +229,7 @@ enum Status {
message PipelineAckProto { message PipelineAckProto {
required sint64 seqno = 1; required sint64 seqno = 1;
repeated Status status = 2; repeated uint32 reply = 2;
optional uint64 downstreamAckTimeNanos = 3 [default = 0]; optional uint64 downstreamAckTimeNanos = 3 [default = 0];
} }

View File

@ -168,7 +168,9 @@ public class TestDataTransferProtocol {
//ok finally write a block with 0 len //ok finally write a block with 0 len
sendResponse(Status.SUCCESS, "", null, recvOut); sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut); new PipelineAck(100, new int[] {PipelineAck.combineHeader
(PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write
(recvOut);
sendRecvData(description, false); sendRecvData(description, false);
} }
@ -399,7 +401,8 @@ public class TestDataTransferProtocol {
hdr.write(sendOut); hdr.write(sendOut);
sendResponse(Status.SUCCESS, "", null, recvOut); sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut); new PipelineAck(100, new int[] {PipelineAck.combineHeader
(PipelineAck.ECN.DISABLED, Status.ERROR)}).write(recvOut);
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
true); true);
@ -420,7 +423,8 @@ public class TestDataTransferProtocol {
sendOut.flush(); sendOut.flush();
//ok finally write a block with 0 len //ok finally write a block with 0 len
sendResponse(Status.SUCCESS, "", null, recvOut); sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut); new PipelineAck(100, new int[] {PipelineAck.combineHeader
(PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write(recvOut);
sendRecvData("Writing a zero len block blockid " + newBlockId, false); sendRecvData("Writing a zero len block blockid " + newBlockId, false);
/* Test OP_READ_BLOCK */ /* Test OP_READ_BLOCK */