HDFS-2521. Remove custom checksum headers from data transfer protocol. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1195828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-01 05:16:53 +00:00
parent 63cb9360f1
commit f6a4c651a0
16 changed files with 1713 additions and 150 deletions

View File

@ -761,6 +761,9 @@ Release 0.23.0 - Unreleased
HDFS-2512. Add textual error message to data transfer protocol responses HDFS-2512. Add textual error message to data transfer protocol responses
(todd) (todd)
HDFS-2521. Remove custom checksum headers from data transfer protocol
(todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -1033,9 +1033,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
// send the request // send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS); nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
checksum.writeHeader(out);
out.flush();
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(

View File

@ -33,10 +33,13 @@
import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -408,11 +411,14 @@ public static RemoteBlockReader newBlockReader( Socket sock, String file,
BlockOpResponseProto status = BlockOpResponseProto.parseFrom( BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in)); vintPrefixed(in));
checkSuccess(status, sock, block, file); checkSuccess(status, sock, block, file);
DataChecksum checksum = DataChecksum.newDataChecksum( in ); ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
//Warning when we get CHECKSUM_NULL? //Warning when we get CHECKSUM_NULL?
// Read the first chunk offset. // Read the first chunk offset.
long firstChunkOffset = in.readLong(); long firstChunkOffset = checksumInfo.getChunkOffset();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {

View File

@ -23,10 +23,16 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
/** /**
@ -35,8 +41,20 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
abstract class DataTransferProtoUtil { public abstract class DataTransferProtoUtil {
/**
* Map between the internal DataChecksum identifiers and the protobuf-
* generated identifiers on the wire.
*/
static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap =
ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder()
.put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32)
.put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C)
.put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL)
.build();
static BlockConstructionStage fromProto( static BlockConstructionStage fromProto(
OpWriteBlockProto.BlockConstructionStage stage) { OpWriteBlockProto.BlockConstructionStage stage) {
return BlockConstructionStage.valueOf(BlockConstructionStage.class, return BlockConstructionStage.valueOf(BlockConstructionStage.class,
@ -49,6 +67,28 @@ static OpWriteBlockProto.BlockConstructionStage toProto(
stage.name()); stage.name());
} }
public static ChecksumProto toProto(DataChecksum checksum) {
ChecksumType type = checksumTypeMap.get(checksum.getChecksumType());
if (type == null) {
throw new IllegalArgumentException(
"Can't convert checksum to protobuf: " + checksum);
}
return ChecksumProto.newBuilder()
.setBytesPerChecksum(checksum.getBytesPerChecksum())
.setType(type)
.build();
}
public static DataChecksum fromProto(ChecksumProto proto) {
if (proto == null) return null;
int bytesPerChecksum = proto.getBytesPerChecksum();
int type = checksumTypeMap.inverse().get(proto.getType());
return DataChecksum.newDataChecksum(type, bytesPerChecksum);
}
static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
String client, Token<BlockTokenIdentifier> blockToken) { String client, Token<BlockTokenIdentifier> blockToken) {
ClientOperationHeaderProto header = ClientOperationHeaderProto header =

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/** /**
* Transfer data to/from datanode using a streaming protocol. * Transfer data to/from datanode using a streaming protocol.
@ -84,7 +85,8 @@ public void writeBlock(final ExtendedBlock blk,
final int pipelineSize, final int pipelineSize,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException; final long latestGenerationStamp,
final DataChecksum requestedChecksum) throws IOException;
/** /**
* Transfer a block to another datanode. * Transfer a block to another datanode.

View File

@ -103,7 +103,8 @@ private void opWriteBlock(DataInputStream in) throws IOException {
fromProto(proto.getStage()), fromProto(proto.getStage()),
proto.getPipelineSize(), proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp()); proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()));
} }
/** Receive {@link Op#TRANSFER_BLOCK} */ /** Receive {@link Op#TRANSFER_BLOCK} */

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@ -38,6 +39,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.protobuf.Message; import com.google.protobuf.Message;
@ -93,10 +95,14 @@ public void writeBlock(final ExtendedBlock blk,
final int pipelineSize, final int pipelineSize,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException { final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken); blk, clientName, blockToken);
ChecksumProto checksumProto =
DataTransferProtoUtil.toProto(requestedChecksum);
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header) .setHeader(header)
.addAllTargets(toProtos(targets, 1)) .addAllTargets(toProtos(targets, 1))
@ -104,7 +110,8 @@ public void writeBlock(final ExtendedBlock blk,
.setPipelineSize(pipelineSize) .setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd) .setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd) .setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp); .setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto);
if (source != null) { if (source != null) {
proto.setSource(toProto(source)); proto.setSource(toProto(source));

View File

@ -403,8 +403,8 @@ private void verifyBlock(ExtendedBlock block) {
try { try {
adjustThrottler(); adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, false, true, blockSender = new BlockSender(block, 0, -1, false, true, datanode,
datanode, null); null);
DataOutputStream out = DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream()); new DataOutputStream(new IOUtils.NullOutputStream());

View File

@ -108,7 +108,8 @@ class BlockReceiver implements Closeable {
final BlockConstructionStage stage, final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode, final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode) throws IOException { final DataNode datanode, DataChecksum requestedChecksum)
throws IOException {
try{ try{
this.block = block; this.block = block;
this.in = in; this.in = in;
@ -177,7 +178,7 @@ class BlockReceiver implements Closeable {
} }
} }
// read checksum meta information // read checksum meta information
this.checksum = DataChecksum.newDataChecksum(in); this.checksum = requestedChecksum;
this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize(); this.checksumSize = checksum.getChecksumSize();
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
@ -687,11 +688,6 @@ private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
} }
} }
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
checksum.writeHeader(mirrorOut);
}
void receiveBlock( void receiveBlock(
DataOutputStream mirrOut, // output to next datanode DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode DataInputStream mirrIn, // input from next datanode

View File

@ -134,8 +134,6 @@ class BlockSender implements java.io.Closeable {
private final int checksumSize; private final int checksumSize;
/** If true, failure to read checksum is ignored */ /** If true, failure to read checksum is ignored */
private final boolean corruptChecksumOk; private final boolean corruptChecksumOk;
/** true if chunk offset is needed to be sent in Checksum header */
private final boolean chunkOffsetOK;
/** Sequence number of packet being sent */ /** Sequence number of packet being sent */
private long seqno; private long seqno;
/** Set to true if transferTo is allowed for sending data to the client */ /** Set to true if transferTo is allowed for sending data to the client */
@ -173,19 +171,17 @@ class BlockSender implements java.io.Closeable {
* @param startOffset starting offset to read from * @param startOffset starting offset to read from
* @param length length of data to read * @param length length of data to read
* @param corruptChecksumOk * @param corruptChecksumOk
* @param chunkOffsetOK need to send check offset in checksum header
* @param verifyChecksum verify checksum while reading the data * @param verifyChecksum verify checksum while reading the data
* @param datanode datanode from which the block is being read * @param datanode datanode from which the block is being read
* @param clientTraceFmt format string used to print client trace logs * @param clientTraceFmt format string used to print client trace logs
* @throws IOException * @throws IOException
*/ */
BlockSender(ExtendedBlock block, long startOffset, long length, BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK, boolean corruptChecksumOk, boolean verifyChecksum,
boolean verifyChecksum, DataNode datanode, String clientTraceFmt) DataNode datanode, String clientTraceFmt)
throws IOException { throws IOException {
try { try {
this.block = block; this.block = block;
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk; this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt; this.clientTraceFmt = clientTraceFmt;
@ -600,8 +596,6 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try { try {
writeChecksumHeader(out);
int maxChunksPerPacket; int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN; int pktSize = PacketHeader.PKT_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum boolean transferTo = transferToAllowed && !verifyChecksum
@ -691,22 +685,6 @@ private boolean isLongRead() {
return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES; return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
} }
/**
* Write checksum header to the output stream
*/
private void writeChecksumHeader(DataOutputStream out) throws IOException {
try {
checksum.writeHeader(out);
if (chunkOffsetOK) {
out.writeLong(offset);
}
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
}
/** /**
* Write packet header into {@code pkt} * Write packet header into {@code pkt}
*/ */
@ -720,4 +698,19 @@ private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
boolean didSendEntireByteRange() { boolean didSendEntireByteRange() {
return sentEntireByteRange; return sentEntireByteRange;
} }
/**
* @return the checksum type that will be used with this block transfer.
*/
DataChecksum getChecksum() {
return checksum;
}
/**
* @return the offset into the block file where the sender is currently
* reading.
*/
long getOffset() {
return offset;
}
} }

View File

@ -2053,7 +2053,7 @@ public void run() {
out = new DataOutputStream(new BufferedOutputStream(baseStream, out = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, b.getNumBytes(), blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, false, DataNode.this, null); false, false, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg); DatanodeInfo srcNode = new DatanodeInfo(bpReg);
// //
@ -2066,7 +2066,7 @@ public void run() {
} }
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0); stage, 0, 0, 0, 0, blockSender.getChecksum());
// send data & checksum // send data & checksum
blockSender.sendBlock(out, baseStream, null); blockSender.sendBlock(out, baseStream, null);

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -52,6 +53,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@ -225,7 +227,7 @@ public void readBlock(final ExtendedBlock block,
try { try {
try { try {
blockSender = new BlockSender(block, blockOffset, length, blockSender = new BlockSender(block, blockOffset, length,
true, true, false, datanode, clientTraceFmt); true, false, datanode, clientTraceFmt);
} catch(IOException e) { } catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e; String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg); LOG.info(msg);
@ -234,7 +236,8 @@ public void readBlock(final ExtendedBlock block,
} }
// send op status // send op status
sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout); writeSuccessWithChecksumInfo(blockSender,
getStreamWithTimeout(s, datanode.socketWriteTimeout));
long read = blockSender.sendBlock(out, baseStream, null); // send data long read = blockSender.sendBlock(out, baseStream, null); // send data
@ -292,7 +295,8 @@ public void writeBlock(final ExtendedBlock block,
final int pipelineSize, final int pipelineSize,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException { final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname); updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0; final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode; final boolean isClient = !isDatanode;
@ -351,7 +355,7 @@ public void writeBlock(final ExtendedBlock block,
s.getRemoteSocketAddress().toString(), s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(), s.getLocalSocketAddress().toString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode); clientname, srcDataNode, datanode, requestedChecksum);
} else { } else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
} }
@ -381,11 +385,8 @@ public void writeBlock(final ExtendedBlock block,
new Sender(mirrorOut).writeBlock(originalBlock, blockToken, new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize, clientname, targets, srcDataNode, stage, pipelineSize,
minBytesRcvd, maxBytesRcvd, latestGenerationStamp); minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
if (blockReceiver != null) { // send checksum header
blockReceiver.writeChecksumHeader(mirrorOut);
}
mirrorOut.flush(); mirrorOut.flush();
// read connect ack (only for clients, not for replication req) // read connect ack (only for clients, not for replication req)
@ -600,8 +601,8 @@ public void copyBlock(final ExtendedBlock block,
try { try {
// check if the block exists or not // check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, false, blockSender = new BlockSender(block, 0, -1, false, false, datanode,
datanode, null); null);
// set up response stream // set up response stream
OutputStream baseStream = NetUtils.getOutputStream( OutputStream baseStream = NetUtils.getOutputStream(
@ -610,7 +611,7 @@ public void copyBlock(final ExtendedBlock block,
baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
// send status first // send status first
writeResponse(SUCCESS, null, reply); writeSuccessWithChecksumInfo(blockSender, reply);
// send block content to the target // send block content to the target
long read = blockSender.sendBlock(reply, baseStream, long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler); dataXceiverServer.balanceThrottler);
@ -709,11 +710,16 @@ public void replaceBlock(final ExtendedBlock block,
throw new IOException("Copy block " + block + " from " throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress() + " failed"); + proxySock.getRemoteSocketAddress() + " failed");
} }
// get checksum info about the block we're copying
ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist // open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver( blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(), block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode); null, 0, 0, 0, "", null, datanode, remoteChecksum);
// receive a block // receive a block
blockReceiver.receiveBlock(null, null, null, null, blockReceiver.receiveBlock(null, null, null, null,
@ -767,15 +773,19 @@ private long elapsed() {
* @param opStatus status message to write * @param opStatus status message to write
* @param timeout send timeout * @param timeout send timeout
**/ **/
private void sendResponse(Socket s, Status status, String message, private static void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException { long timeout) throws IOException {
DataOutputStream reply = DataOutputStream reply = getStreamWithTimeout(s, timeout);
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
writeResponse(status, message, reply); writeResponse(status, message, reply);
} }
private void writeResponse(Status status, String message, OutputStream out) private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
throws IOException {
return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
}
private static void writeResponse(Status status, String message, OutputStream out)
throws IOException { throws IOException {
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
.setStatus(status); .setStatus(status);
@ -786,6 +796,22 @@ private void writeResponse(Status status, String message, OutputStream out)
out.flush(); out.flush();
} }
private void writeSuccessWithChecksumInfo(BlockSender blockSender,
DataOutputStream out) throws IOException {
ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder()
.setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum()))
.setChunkOffset(blockSender.getOffset())
.build();
BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
.setStatus(SUCCESS)
.setReadOpChecksumInfo(ckInfo)
.build();
response.writeDelimitedTo(out);
out.flush();
}
private void checkAccess(DataOutputStream out, final boolean reply, private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk, final ExtendedBlock blk,

View File

@ -40,6 +40,17 @@ message OpReadBlockProto {
required uint64 offset = 2; required uint64 offset = 2;
required uint64 len = 3; required uint64 len = 3;
} }
message ChecksumProto {
enum ChecksumType {
NULL = 0;
CRC32 = 1;
CRC32C = 2;
}
required ChecksumType type = 1;
required uint32 bytesPerChecksum = 2;
}
message OpWriteBlockProto { message OpWriteBlockProto {
required ClientOperationHeaderProto header = 1; required ClientOperationHeaderProto header = 1;
@ -69,6 +80,11 @@ message OpWriteBlockProto {
required uint64 minBytesRcvd = 6; required uint64 minBytesRcvd = 6;
required uint64 maxBytesRcvd = 7; required uint64 maxBytesRcvd = 7;
required uint64 latestGenerationStamp = 8; required uint64 latestGenerationStamp = 8;
/**
* The requested checksum mechanism for this block write.
*/
required ChecksumProto requestedChecksum = 9;
} }
message OpTransferBlockProto { message OpTransferBlockProto {
@ -114,14 +130,30 @@ message PipelineAckProto {
repeated Status status = 2; repeated Status status = 2;
} }
/**
* Sent as part of the BlockOpResponseProto
* for READ_BLOCK and COPY_BLOCK operations.
*/
message ReadOpChecksumInfoProto {
required ChecksumProto checksum = 1;
/**
* The offset into the block at which the first packet
* will start. This is necessary since reads will align
* backwards to a checksum chunk boundary.
*/
required uint64 chunkOffset = 2;
}
message BlockOpResponseProto { message BlockOpResponseProto {
required Status status = 1; required Status status = 1;
optional string firstBadLink = 2; optional string firstBadLink = 2;
optional OpBlockChecksumResponseProto checksumResponse = 3; optional OpBlockChecksumResponseProto checksumResponse = 3;
optional ReadOpChecksumInfoProto readOpChecksumInfo = 4;
/** explanatory text which may be useful to log on the client side */ /** explanatory text which may be useful to log on the client side */
optional string message = 4; optional string message = 5;
} }
/** /**

View File

@ -31,6 +31,7 @@
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.commons.digester.SetRootRule;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -43,6 +44,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@ -50,6 +52,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -59,6 +62,7 @@
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* This tests data transfer protocol handling in the Datanode. It sends * This tests data transfer protocol handling in the Datanode. It sends
@ -68,6 +72,9 @@ public class TestDataTransferProtocol extends TestCase {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestDataTransferProtocol"); "org.apache.hadoop.hdfs.TestDataTransferProtocol");
private static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
DatanodeID datanode; DatanodeID datanode;
InetSocketAddress dnAddr; InetSocketAddress dnAddr;
@ -149,9 +156,6 @@ void readFile(FileSystem fs, Path path, int fileLen) throws IOException {
private void writeZeroLengthPacket(ExtendedBlock block, String description) private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException { throws IOException {
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
PacketHeader hdr = new PacketHeader( PacketHeader hdr = new PacketHeader(
8, // size of packet 8, // size of packet
block.getNumBytes(), // OffsetInBlock block.getNumBytes(), // OffsetInBlock
@ -188,7 +192,8 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
recvBuf.reset(); recvBuf.reset();
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage, new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS); 0, block.getNumBytes(), block.getNumBytes(), newGS,
DEFAULT_CHECKSUM);
if (eofExcepted) { if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut); sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true); sendRecvData(description, true);
@ -373,15 +378,16 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
/* Test OP_WRITE_BLOCK */ /* Test OP_WRITE_BLOCK */
sendBuf.reset(); sendBuf.reset();
DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
sender.writeBlock(new ExtendedBlock(poolId, newBlockId), sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0L, 0L, 0L); 0, 0L, 0L, 0L,
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); badChecksum);
// bad bytes per checksum
sendOut.writeInt(-1-random.nextInt(oneMil));
recvBuf.reset(); recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut); sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true); sendRecvData("wrong bytesPerChecksum while writing", true);
@ -391,9 +397,8 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); DEFAULT_CHECKSUM);
sendOut.writeInt(512);
PacketHeader hdr = new PacketHeader( PacketHeader hdr = new PacketHeader(
4, // size of packet 4, // size of packet
@ -414,9 +419,8 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); DEFAULT_CHECKSUM);
sendOut.writeInt(512); // checksum size
hdr = new PacketHeader( hdr = new PacketHeader(
8, // size of packet 8, // size of packet
@ -462,7 +466,15 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
// negative length is ok. Datanode assumes we want to read the whole block. // negative length is ok. Datanode assumes we want to read the whole block.
recvBuf.reset(); recvBuf.reset();
sendResponse(Status.SUCCESS, null, null, recvOut);
BlockOpResponseProto.newBuilder()
.setStatus(Status.SUCCESS)
.setReadOpChecksumInfo(ReadOpChecksumInfoProto.newBuilder()
.setChecksum(DataTransferProtoUtil.toProto(DEFAULT_CHECKSUM))
.setChunkOffset(0L))
.build()
.writeDelimitedTo(recvOut);
sendBuf.reset(); sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil)); 0L, -1L-random.nextInt(oneMil));

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.util.DataChecksum;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -140,14 +141,13 @@ public void testReplicationError() throws Exception {
// write the header. // write the header.
DataOutputStream out = new DataOutputStream(s.getOutputStream()); DataOutputStream out = new DataOutputStream(s.getOutputStream());
DataChecksum checksum = DataChecksum.newDataChecksum(
DataChecksum.CHECKSUM_CRC32, 512);
new Sender(out).writeBlock(block.getBlock(), new Sender(out).writeBlock(block.getBlock(),
BlockTokenSecretManager.DUMMY_TOKEN, "", BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null, new DatanodeInfo[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L); BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum);
// write check header
out.writeByte( 1 );
out.writeInt( 512 );
out.flush(); out.flush();
// close the connection before sending the content of the block // close the connection before sending the content of the block