From 67002a5fc07e2f91c3967801ca532988130660a4 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Thu, 5 Feb 2015 10:58:58 -0800 Subject: [PATCH] HDFS-7270. Add congestion signaling capability to DataNode write protocol. Contributed by Haohui Mai. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../apache/hadoop/hdfs/DFSOutputStream.java | 3 +- .../protocol/datatransfer/PipelineAck.java | 98 ++++++++++++++++--- .../hdfs/server/datanode/BlockReceiver.java | 49 +++++----- .../hadoop/hdfs/server/datanode/DataNode.java | 19 ++++ .../src/main/proto/datatransfer.proto | 3 +- .../hadoop/hdfs/TestDataTransferProtocol.java | 10 +- 8 files changed, 147 insertions(+), 40 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 34d80182b13..038fbddd455 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -293,6 +293,9 @@ Release 2.7.0 - UNRELEASED HDFS-7712. Switch blockStateChangeLog to use slf4j. (wang) + HDFS-7270. Add congestion signaling capability to DataNode write protocol. + (wheat9) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 666ec04e6ca..36d1a77156b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -767,4 +767,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String NNTOP_WINDOWS_MINUTES_KEY = "dfs.namenode.top.windows.minutes"; public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"}; + public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn"; + public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 58995db8422..601560cb638 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -891,7 +891,8 @@ public class DFSOutputStream extends FSOutputSummer long seqno = ack.getSeqno(); // processes response status from datanodes. for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { - final Status reply = ack.getReply(i); + final Status reply = PipelineAck.getStatusFromHeader(ack + .getReply(i)); // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 6d4065386bf..35e5bb84dc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -22,17 +22,21 @@ import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.HdfsConfiguration; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import com.google.protobuf.TextFormat; +import org.apache.hadoop.hdfs.util.LongBitFormat; /** Pipeline Acknowledgment **/ @InterfaceAudience.Private @@ -46,6 +50,55 @@ public class PipelineAck { // place holder for timeout value of each OOB type final static long[] OOB_TIMEOUT; + public enum ECN { + DISABLED(0), + SUPPORTED(1), + SUPPORTED2(2), + CONGESTED(3); + + private final int value; + private static final ECN[] VALUES = values(); + static ECN valueOf(int value) { + return VALUES[value]; + } + + ECN(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } + + private enum StatusFormat { + STATUS(null, 4), + RESERVED(STATUS.BITS, 1), + ECN_BITS(RESERVED.BITS, 2); + + private final LongBitFormat BITS; + + StatusFormat(LongBitFormat prev, int bits) { + BITS = new LongBitFormat(name(), prev, bits, 0); + } + + static Status getStatus(int header) { + return Status.valueOf((int) STATUS.BITS.retrieve(header)); + } + + static ECN getECN(int header) { + return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header)); + } + + public static int setStatus(int old, Status status) { + return (int) STATUS.BITS.combine(status.getNumber(), old); + } + + public static int setECN(int old, ECN ecn) { + return (int) ECN_BITS.BITS.combine(ecn.getValue(), old); + } + } + static { OOB_TIMEOUT = new long[NUM_OOB_TYPES]; HdfsConfiguration conf = new HdfsConfiguration(); @@ -65,7 +118,7 @@ public class PipelineAck { * @param seqno sequence number * @param replies an array of replies */ - public PipelineAck(long seqno, Status[] replies) { + public PipelineAck(long seqno, int[] replies) { this(seqno, replies, 0L); } @@ -75,10 +128,15 @@ public class PipelineAck { * @param replies an array of replies * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline */ - public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) { + public PipelineAck(long seqno, int[] replies, + long downstreamAckTimeNanos) { + ArrayList replyList = Lists.newArrayList(); + for (int r : replies) { + replyList.add(r); + } proto = PipelineAckProto.newBuilder() .setSeqno(seqno) - .addAllStatus(Arrays.asList(replies)) + .addAllReply(replyList) .setDownstreamAckTimeNanos(downstreamAckTimeNanos) .build(); } @@ -96,15 +154,15 @@ public class PipelineAck { * @return the number of replies */ public short getNumOfReplies() { - return (short)proto.getStatusCount(); + return (short)proto.getReplyCount(); } /** * get the ith reply * @return the the ith reply */ - public Status getReply(int i) { - return proto.getStatus(i); + public int getReply(int i) { + return proto.getReply(i); } /** @@ -120,8 +178,8 @@ public class PipelineAck { * @return true if all statuses are SUCCESS */ public boolean isSuccess() { - for (Status reply : proto.getStatusList()) { - if (reply != Status.SUCCESS) { + for (int reply : proto.getReplyList()) { + if (StatusFormat.getStatus(reply) != Status.SUCCESS) { return false; } } @@ -138,11 +196,12 @@ public class PipelineAck { if (getSeqno() != UNKOWN_SEQNO) { return null; } - for (Status reply : proto.getStatusList()) { + for (int reply : proto.getReplyList()) { // The following check is valid because protobuf guarantees to // preserve the ordering of enum elements. - if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) { - return reply; + Status s = StatusFormat.getStatus(reply); + if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) { + return s; } } return null; @@ -184,4 +243,19 @@ public class PipelineAck { public String toString() { return TextFormat.shortDebugString(proto); } + + public static Status getStatusFromHeader(int header) { + return StatusFormat.getStatus(header); + } + + public static int setStatusForHeader(int old, Status status) { + return StatusFormat.setStatus(old, status); + } + + public static int combineHeader(ECN ecn, Status status) { + int header = 0; + header = StatusFormat.setStatus(header, status); + header = StatusFormat.setECN(header, ecn); + return header; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 6e37c2367e9..cfd2442732e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -986,9 +986,7 @@ class BlockReceiver implements Closeable { private static enum PacketResponderType { NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE } - - private static final Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR}; - + /** * Processes responses from downstream datanodes in the pipeline * and sends back replies to the originator. @@ -1092,7 +1090,7 @@ class BlockReceiver implements Closeable { LOG.info("Sending an out of band ack of type " + ackStatus); try { sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L, - ackStatus); + PipelineAck.combineHeader(datanode.getECN(), ackStatus)); } finally { // Let others send ack. Unless there are miltiple OOB send // calls, there can be only one waiter, the responder thread. @@ -1175,7 +1173,8 @@ class BlockReceiver implements Closeable { if (oobStatus != null) { LOG.info("Relaying an out of band ack of type " + oobStatus); sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L, - Status.SUCCESS); + PipelineAck.combineHeader(datanode.getECN(), + Status.SUCCESS)); continue; } seqno = ack.getSeqno(); @@ -1249,9 +1248,10 @@ class BlockReceiver implements Closeable { finalizeBlock(startTime); } + Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS; sendAckUpstream(ack, expected, totalAckTimeNanos, - (pkt != null ? pkt.offsetInBlock : 0), - (pkt != null ? pkt.ackStatus : Status.SUCCESS)); + (pkt != null ? pkt.offsetInBlock : 0), + PipelineAck.combineHeader(datanode.getECN(), myStatus)); if (pkt != null) { // remove the packet from the ack queue removeAckHead(); @@ -1311,11 +1311,11 @@ class BlockReceiver implements Closeable { * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet - * @param myStatus the local ack status + * @param myHeader the local ack header */ private void sendAckUpstream(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, - Status myStatus) throws IOException { + int myHeader) throws IOException { try { // Wait for other sender to finish. Unless there is an OOB being sent, // the responder won't have to wait. @@ -1329,7 +1329,7 @@ class BlockReceiver implements Closeable { try { if (!running) return; sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, - offsetInBlock, myStatus); + offsetInBlock, myHeader); } finally { synchronized(this) { sending = false; @@ -1349,32 +1349,34 @@ class BlockReceiver implements Closeable { * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet - * @param myStatus the local ack status + * @param myHeader the local ack header */ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, - long totalAckTimeNanos, long offsetInBlock, Status myStatus) + long totalAckTimeNanos, long offsetInBlock, int myHeader) throws IOException { - Status[] replies = null; + final int[] replies; if (ack == null) { // A new OOB response is being sent from this node. Regardless of // downstream nodes, reply should contain one reply. - replies = new Status[1]; - replies[0] = myStatus; + replies = new int[] { myHeader }; } else if (mirrorError) { // ack read error - replies = MIRROR_ERROR_STATUS; + int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS); + int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR); + replies = new int[] {h, h1}; } else { short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack .getNumOfReplies(); - replies = new Status[1 + ackLen]; - replies[0] = myStatus; - for (int i = 0; i < ackLen; i++) { + replies = new int[ackLen + 1]; + replies[0] = myHeader; + for (int i = 0; i < ackLen; ++i) { replies[i + 1] = ack.getReply(i); } // If the mirror has reported that it received a corrupt packet, - // do self-destruct to mark myself bad, instead of making the + // do self-destruct to mark myself bad, instead of making the // mirror node bad. The mirror is guaranteed to be good without // corrupt data on disk. - if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) { + if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) == + Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "since the down streams reported the data sent by this " + "thread is corrupt"); @@ -1400,7 +1402,8 @@ class BlockReceiver implements Closeable { } // If a corruption was detected in the received data, terminate after - // sending ERROR_CHECKSUM back. + // sending ERROR_CHECKSUM back. + Status myStatus = PipelineAck.getStatusFromHeader(myHeader); if (myStatus == Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "due to a checksum error in received data. The error " @@ -1420,7 +1423,7 @@ class BlockReceiver implements Closeable { } } } - + /** * This information is cached by the Datanode in the ackQueue. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index dc6734c0e38..b872bbb4729 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -125,6 +125,7 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; @@ -336,6 +337,7 @@ public class DataNode extends ReconfigurableBase private Configuration conf; private final String confVersion; private final long maxNumberOfBlocksToLog; + private final boolean pipelineSupportECN; private final List usersWithLocalPathAccess; private final boolean connectToDnViaHostname; @@ -368,6 +370,7 @@ public class DataNode extends ReconfigurableBase this.usersWithLocalPathAccess = null; this.connectToDnViaHostname = false; this.getHdfsBlockLocationsEnabled = false; + this.pipelineSupportECN = false; } /** @@ -395,6 +398,9 @@ public class DataNode extends ReconfigurableBase this.isPermissionEnabled = conf.getBoolean( DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); + this.pipelineSupportECN = conf.getBoolean( + DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, + DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT); confVersion = "core-" + conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + @@ -469,6 +475,19 @@ public class DataNode extends ReconfigurableBase return reconfigurable; } + /** + * The ECN bit for the DataNode. The DataNode should return: + *
    + *
  • ECN.DISABLED when ECN is disabled.
  • + *
  • ECN.SUPPORTED when ECN is enabled but the DN still has capacity.
  • + *
  • ECN.CONGESTED when ECN is enabled and the DN is congested.
  • + *
+ */ + public PipelineAck.ECN getECN() { + return pipelineSupportECN ? PipelineAck.ECN.SUPPORTED : PipelineAck.ECN + .DISABLED; + } + /** * Contains the StorageLocations for changed data volumes. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 4bd7bda1ad4..95126882d66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -210,6 +210,7 @@ message PacketHeaderProto { optional bool syncBlock = 5 [default = false]; } +// Status is a 4-bit enum enum Status { SUCCESS = 0; ERROR = 1; @@ -228,7 +229,7 @@ enum Status { message PipelineAckProto { required sint64 seqno = 1; - repeated Status status = 2; + repeated uint32 reply = 2; optional uint64 downstreamAckTimeNanos = 3 [default = 0]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 6405b5a0e45..fd4f1a52cc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -168,7 +168,9 @@ public class TestDataTransferProtocol { //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); - new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut); + new PipelineAck(100, new int[] {PipelineAck.combineHeader + (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write + (recvOut); sendRecvData(description, false); } @@ -399,7 +401,8 @@ public class TestDataTransferProtocol { hdr.write(sendOut); sendResponse(Status.SUCCESS, "", null, recvOut); - new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut); + new PipelineAck(100, new int[] {PipelineAck.combineHeader + (PipelineAck.ECN.DISABLED, Status.ERROR)}).write(recvOut); sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, true); @@ -420,7 +423,8 @@ public class TestDataTransferProtocol { sendOut.flush(); //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); - new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut); + new PipelineAck(100, new int[] {PipelineAck.combineHeader + (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write(recvOut); sendRecvData("Writing a zero len block blockid " + newBlockId, false); /* Test OP_READ_BLOCK */