HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck. Contributed by Anu Engineer and Haohui Mai.

This commit is contained in:
Haohui Mai 2015-03-30 11:59:21 -07:00
parent 24d879026d
commit dd5b2dac5a
6 changed files with 58 additions and 14 deletions

View File

@ -1009,6 +1009,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7963. Fix expected tracing spans in TestTracing along with HDFS-7054.
(Masatake Iwasaki via kihwal)
HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck.
(Anu Engineer and Haohui Mai via wheat9)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -817,7 +817,7 @@ class DataStreamer extends Daemon {
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = PipelineAck.getStatusFromHeader(ack
.getReply(i));
.getHeaderFlag(i));
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&

View File

@ -130,13 +130,16 @@ public class PipelineAck {
*/
public PipelineAck(long seqno, int[] replies,
long downstreamAckTimeNanos) {
ArrayList<Integer> replyList = Lists.newArrayList();
ArrayList<Status> statusList = Lists.newArrayList();
ArrayList<Integer> flagList = Lists.newArrayList();
for (int r : replies) {
replyList.add(r);
statusList.add(StatusFormat.getStatus(r));
flagList.add(r);
}
proto = PipelineAckProto.newBuilder()
.setSeqno(seqno)
.addAllReply(replyList)
.addAllReply(statusList)
.addAllFlag(flagList)
.setDownstreamAckTimeNanos(downstreamAckTimeNanos)
.build();
}
@ -158,11 +161,18 @@ public class PipelineAck {
}
/**
* get the ith reply
* @return the the ith reply
* get the header flag of ith reply
*/
public int getReply(int i) {
return proto.getReply(i);
public int getHeaderFlag(int i) {
if (proto.getFlagCount() > 0) {
return proto.getFlag(i);
} else {
return combineHeader(ECN.DISABLED, proto.getReply(i));
}
}
public int getFlag(int i) {
return proto.getFlag(i);
}
/**
@ -178,8 +188,8 @@ public class PipelineAck {
* @return true if all statuses are SUCCESS
*/
public boolean isSuccess() {
for (int reply : proto.getReplyList()) {
if (StatusFormat.getStatus(reply) != Status.SUCCESS) {
for (Status s : proto.getReplyList()) {
if (s != Status.SUCCESS) {
return false;
}
}
@ -196,10 +206,9 @@ public class PipelineAck {
if (getSeqno() != UNKOWN_SEQNO) {
return null;
}
for (int reply : proto.getReplyList()) {
for (Status s : proto.getReplyList()) {
// The following check is valid because protobuf guarantees to
// preserve the ordering of enum elements.
Status s = StatusFormat.getStatus(reply);
if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
return s;
}

View File

@ -1372,7 +1372,7 @@ class BlockReceiver implements Closeable {
replies = new int[ackLen + 1];
replies[0] = myHeader;
for (int i = 0; i < ackLen; ++i) {
replies[i + 1] = ack.getReply(i);
replies[i + 1] = ack.getHeaderFlag(i);
}
// If the mirror has reported that it received a corrupt packet,
// do self-destruct to mark myself bad, instead of making the

View File

@ -243,8 +243,9 @@ enum ShortCircuitFdResponse {
message PipelineAckProto {
required sint64 seqno = 1;
repeated uint32 reply = 2;
repeated Status reply = 2;
optional uint64 downstreamAckTimeNanos = 3 [default = 0];
repeated uint32 flag = 4 [packed=true];
}
/**

View File

@ -33,6 +33,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Random;
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
@ -524,6 +526,35 @@ public class TestDataTransferProtocol {
assertFalse(hdr.sanityCheck(100));
}
@Test
public void TestPipeLineAckCompatibility() throws IOException {
DataTransferProtos.PipelineAckProto proto = DataTransferProtos
.PipelineAckProto.newBuilder()
.setSeqno(0)
.addReply(Status.CHECKSUM_OK)
.build();
DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
.PipelineAckProto.newBuilder().mergeFrom(proto)
.addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
Status.CHECKSUM_OK))
.build();
ByteOutputStream oldAckBytes = new ByteOutputStream();
proto.writeDelimitedTo(oldAckBytes);
PipelineAck oldAck = new PipelineAck();
oldAck.readFields(new ByteArrayInputStream(oldAckBytes.getBytes()));
assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status
.CHECKSUM_OK), oldAck.getHeaderFlag(0));
PipelineAck newAck = new PipelineAck();
ByteOutputStream newAckBytes = new ByteOutputStream();
newProto.writeDelimitedTo(newAckBytes);
newAck.readFields(new ByteArrayInputStream(newAckBytes.getBytes()));
assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status
.CHECKSUM_OK), newAck.getHeaderFlag(0));
}
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
writeBlock(new ExtendedBlock(poolId, blockId),
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);