HDFS-2521. Remove custom checksum headers from data transfer protocol. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195829 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-01 05:17:08 +00:00
parent 8e04fa8b84
commit 1c940637b1
16 changed files with 1713 additions and 150 deletions

View File

@ -839,6 +839,9 @@ Release 0.23.0 - Unreleased
HDFS-2512. Add textual error message to data transfer protocol responses HDFS-2512. Add textual error message to data transfer protocol responses
(todd) (todd)
HDFS-2521. Remove custom checksum headers from data transfer protocol
(todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -1033,9 +1033,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
// send the request // send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS); nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
checksum.writeHeader(out);
out.flush();
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(

View File

@ -33,10 +33,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -408,11 +411,14 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
BlockOpResponseProto status = BlockOpResponseProto.parseFrom( BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in)); vintPrefixed(in));
checkSuccess(status, sock, block, file); checkSuccess(status, sock, block, file);
DataChecksum checksum = DataChecksum.newDataChecksum( in ); ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
//Warning when we get CHECKSUM_NULL? //Warning when we get CHECKSUM_NULL?
// Read the first chunk offset. // Read the first chunk offset.
long firstChunkOffset = in.readLong(); long firstChunkOffset = checksumInfo.getChunkOffset();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {

View File

@ -23,10 +23,16 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
/** /**
@ -35,7 +41,19 @@ import org.apache.hadoop.security.token.Token;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
abstract class DataTransferProtoUtil { public abstract class DataTransferProtoUtil {
/**
* Map between the internal DataChecksum identifiers and the protobuf-
* generated identifiers on the wire.
*/
static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap =
ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder()
.put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32)
.put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C)
.put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL)
.build();
static BlockConstructionStage fromProto( static BlockConstructionStage fromProto(
OpWriteBlockProto.BlockConstructionStage stage) { OpWriteBlockProto.BlockConstructionStage stage) {
@ -49,6 +67,28 @@ abstract class DataTransferProtoUtil {
stage.name()); stage.name());
} }
public static ChecksumProto toProto(DataChecksum checksum) {
ChecksumType type = checksumTypeMap.get(checksum.getChecksumType());
if (type == null) {
throw new IllegalArgumentException(
"Can't convert checksum to protobuf: " + checksum);
}
return ChecksumProto.newBuilder()
.setBytesPerChecksum(checksum.getBytesPerChecksum())
.setType(type)
.build();
}
public static DataChecksum fromProto(ChecksumProto proto) {
if (proto == null) return null;
int bytesPerChecksum = proto.getBytesPerChecksum();
int type = checksumTypeMap.inverse().get(proto.getType());
return DataChecksum.newDataChecksum(type, bytesPerChecksum);
}
static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
String client, Token<BlockTokenIdentifier> blockToken) { String client, Token<BlockTokenIdentifier> blockToken) {
ClientOperationHeaderProto header = ClientOperationHeaderProto header =

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/** /**
* Transfer data to/from datanode using a streaming protocol. * Transfer data to/from datanode using a streaming protocol.
@ -84,7 +85,8 @@ public interface DataTransferProtocol {
final int pipelineSize, final int pipelineSize,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException; final long latestGenerationStamp,
final DataChecksum requestedChecksum) throws IOException;
/** /**
* Transfer a block to another datanode. * Transfer a block to another datanode.

View File

@ -103,7 +103,8 @@ public abstract class Receiver implements DataTransferProtocol {
fromProto(proto.getStage()), fromProto(proto.getStage()),
proto.getPipelineSize(), proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp()); proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()));
} }
/** Receive {@link Op#TRANSFER_BLOCK} */ /** Receive {@link Op#TRANSFER_BLOCK} */

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.protobuf.Message; import com.google.protobuf.Message;
@ -93,10 +95,14 @@ public class Sender implements DataTransferProtocol {
final int pipelineSize, final int pipelineSize,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException { final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken); blk, clientName, blockToken);
ChecksumProto checksumProto =
DataTransferProtoUtil.toProto(requestedChecksum);
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header) .setHeader(header)
.addAllTargets(toProtos(targets, 1)) .addAllTargets(toProtos(targets, 1))
@ -104,7 +110,8 @@ public class Sender implements DataTransferProtocol {
.setPipelineSize(pipelineSize) .setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd) .setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd) .setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp); .setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto);
if (source != null) { if (source != null) {
proto.setSource(toProto(source)); proto.setSource(toProto(source));

View File

@ -403,8 +403,8 @@ class BlockPoolSliceScanner {
try { try {
adjustThrottler(); adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, false, true, blockSender = new BlockSender(block, 0, -1, false, true, datanode,
datanode, null); null);
DataOutputStream out = DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream()); new DataOutputStream(new IOUtils.NullOutputStream());

View File

@ -108,7 +108,8 @@ class BlockReceiver implements Closeable {
final BlockConstructionStage stage, final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode, final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode) throws IOException { final DataNode datanode, DataChecksum requestedChecksum)
throws IOException {
try{ try{
this.block = block; this.block = block;
this.in = in; this.in = in;
@ -177,7 +178,7 @@ class BlockReceiver implements Closeable {
} }
} }
// read checksum meta information // read checksum meta information
this.checksum = DataChecksum.newDataChecksum(in); this.checksum = requestedChecksum;
this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize(); this.checksumSize = checksum.getChecksumSize();
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
@ -687,11 +688,6 @@ class BlockReceiver implements Closeable {
} }
} }
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
checksum.writeHeader(mirrorOut);
}
void receiveBlock( void receiveBlock(
DataOutputStream mirrOut, // output to next datanode DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode DataInputStream mirrIn, // input from next datanode

View File

@ -134,8 +134,6 @@ class BlockSender implements java.io.Closeable {
private final int checksumSize; private final int checksumSize;
/** If true, failure to read checksum is ignored */ /** If true, failure to read checksum is ignored */
private final boolean corruptChecksumOk; private final boolean corruptChecksumOk;
/** true if chunk offset is needed to be sent in Checksum header */
private final boolean chunkOffsetOK;
/** Sequence number of packet being sent */ /** Sequence number of packet being sent */
private long seqno; private long seqno;
/** Set to true if transferTo is allowed for sending data to the client */ /** Set to true if transferTo is allowed for sending data to the client */
@ -173,19 +171,17 @@ class BlockSender implements java.io.Closeable {
* @param startOffset starting offset to read from * @param startOffset starting offset to read from
* @param length length of data to read * @param length length of data to read
* @param corruptChecksumOk * @param corruptChecksumOk
* @param chunkOffsetOK need to send check offset in checksum header
* @param verifyChecksum verify checksum while reading the data * @param verifyChecksum verify checksum while reading the data
* @param datanode datanode from which the block is being read * @param datanode datanode from which the block is being read
* @param clientTraceFmt format string used to print client trace logs * @param clientTraceFmt format string used to print client trace logs
* @throws IOException * @throws IOException
*/ */
BlockSender(ExtendedBlock block, long startOffset, long length, BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK, boolean corruptChecksumOk, boolean verifyChecksum,
boolean verifyChecksum, DataNode datanode, String clientTraceFmt) DataNode datanode, String clientTraceFmt)
throws IOException { throws IOException {
try { try {
this.block = block; this.block = block;
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk; this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt; this.clientTraceFmt = clientTraceFmt;
@ -600,8 +596,6 @@ class BlockSender implements java.io.Closeable {
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try { try {
writeChecksumHeader(out);
int maxChunksPerPacket; int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN; int pktSize = PacketHeader.PKT_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum boolean transferTo = transferToAllowed && !verifyChecksum
@ -691,22 +685,6 @@ class BlockSender implements java.io.Closeable {
return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES; return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
} }
/**
* Write checksum header to the output stream
*/
private void writeChecksumHeader(DataOutputStream out) throws IOException {
try {
checksum.writeHeader(out);
if (chunkOffsetOK) {
out.writeLong(offset);
}
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
}
/** /**
* Write packet header into {@code pkt} * Write packet header into {@code pkt}
*/ */
@ -720,4 +698,19 @@ class BlockSender implements java.io.Closeable {
boolean didSendEntireByteRange() { boolean didSendEntireByteRange() {
return sentEntireByteRange; return sentEntireByteRange;
} }
/**
* @return the checksum type that will be used with this block transfer.
*/
DataChecksum getChecksum() {
return checksum;
}
/**
* @return the offset into the block file where the sender is currently
* reading.
*/
long getOffset() {
return offset;
}
} }

View File

@ -2097,7 +2097,7 @@ public class DataNode extends Configured
out = new DataOutputStream(new BufferedOutputStream(baseStream, out = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, b.getNumBytes(), blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, false, DataNode.this, null); false, false, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg); DatanodeInfo srcNode = new DatanodeInfo(bpReg);
// //
@ -2110,7 +2110,7 @@ public class DataNode extends Configured
} }
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0); stage, 0, 0, 0, 0, blockSender.getChecksum());
// send data & checksum // send data & checksum
blockSender.sendBlock(out, baseStream, null); blockSender.sendBlock(out, baseStream, null);

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@ -225,7 +227,7 @@ class DataXceiver extends Receiver implements Runnable {
try { try {
try { try {
blockSender = new BlockSender(block, blockOffset, length, blockSender = new BlockSender(block, blockOffset, length,
true, true, false, datanode, clientTraceFmt); true, false, datanode, clientTraceFmt);
} catch(IOException e) { } catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e; String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg); LOG.info(msg);
@ -234,7 +236,8 @@ class DataXceiver extends Receiver implements Runnable {
} }
// send op status // send op status
sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout); writeSuccessWithChecksumInfo(blockSender,
getStreamWithTimeout(s, datanode.socketWriteTimeout));
long read = blockSender.sendBlock(out, baseStream, null); // send data long read = blockSender.sendBlock(out, baseStream, null); // send data
@ -292,7 +295,8 @@ class DataXceiver extends Receiver implements Runnable {
final int pipelineSize, final int pipelineSize,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException { final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname); updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0; final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode; final boolean isClient = !isDatanode;
@ -351,7 +355,7 @@ class DataXceiver extends Receiver implements Runnable {
s.getRemoteSocketAddress().toString(), s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(), s.getLocalSocketAddress().toString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode); clientname, srcDataNode, datanode, requestedChecksum);
} else { } else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
} }
@ -381,11 +385,8 @@ class DataXceiver extends Receiver implements Runnable {
new Sender(mirrorOut).writeBlock(originalBlock, blockToken, new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize, clientname, targets, srcDataNode, stage, pipelineSize,
minBytesRcvd, maxBytesRcvd, latestGenerationStamp); minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
if (blockReceiver != null) { // send checksum header
blockReceiver.writeChecksumHeader(mirrorOut);
}
mirrorOut.flush(); mirrorOut.flush();
// read connect ack (only for clients, not for replication req) // read connect ack (only for clients, not for replication req)
@ -600,8 +601,8 @@ class DataXceiver extends Receiver implements Runnable {
try { try {
// check if the block exists or not // check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, false, blockSender = new BlockSender(block, 0, -1, false, false, datanode,
datanode, null); null);
// set up response stream // set up response stream
OutputStream baseStream = NetUtils.getOutputStream( OutputStream baseStream = NetUtils.getOutputStream(
@ -610,7 +611,7 @@ class DataXceiver extends Receiver implements Runnable {
baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
// send status first // send status first
writeResponse(SUCCESS, null, reply); writeSuccessWithChecksumInfo(blockSender, reply);
// send block content to the target // send block content to the target
long read = blockSender.sendBlock(reply, baseStream, long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler); dataXceiverServer.balanceThrottler);
@ -709,11 +710,16 @@ class DataXceiver extends Receiver implements Runnable {
throw new IOException("Copy block " + block + " from " throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress() + " failed"); + proxySock.getRemoteSocketAddress() + " failed");
} }
// get checksum info about the block we're copying
ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist // open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver( blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(), block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode); null, 0, 0, 0, "", null, datanode, remoteChecksum);
// receive a block // receive a block
blockReceiver.receiveBlock(null, null, null, null, blockReceiver.receiveBlock(null, null, null, null,
@ -767,15 +773,19 @@ class DataXceiver extends Receiver implements Runnable {
* @param opStatus status message to write * @param opStatus status message to write
* @param timeout send timeout * @param timeout send timeout
**/ **/
private void sendResponse(Socket s, Status status, String message, private static void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException { long timeout) throws IOException {
DataOutputStream reply = DataOutputStream reply = getStreamWithTimeout(s, timeout);
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
writeResponse(status, message, reply); writeResponse(status, message, reply);
} }
private void writeResponse(Status status, String message, OutputStream out) private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
throws IOException {
return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
}
private static void writeResponse(Status status, String message, OutputStream out)
throws IOException { throws IOException {
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
.setStatus(status); .setStatus(status);
@ -786,6 +796,22 @@ class DataXceiver extends Receiver implements Runnable {
out.flush(); out.flush();
} }
private void writeSuccessWithChecksumInfo(BlockSender blockSender,
DataOutputStream out) throws IOException {
ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder()
.setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum()))
.setChunkOffset(blockSender.getOffset())
.build();
BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
.setStatus(SUCCESS)
.setReadOpChecksumInfo(ckInfo)
.build();
response.writeDelimitedTo(out);
out.flush();
}
private void checkAccess(DataOutputStream out, final boolean reply, private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk, final ExtendedBlock blk,

View File

@ -41,6 +41,17 @@ message OpReadBlockProto {
required uint64 len = 3; required uint64 len = 3;
} }
message ChecksumProto {
enum ChecksumType {
NULL = 0;
CRC32 = 1;
CRC32C = 2;
}
required ChecksumType type = 1;
required uint32 bytesPerChecksum = 2;
}
message OpWriteBlockProto { message OpWriteBlockProto {
required ClientOperationHeaderProto header = 1; required ClientOperationHeaderProto header = 1;
repeated DatanodeInfoProto targets = 2; repeated DatanodeInfoProto targets = 2;
@ -69,6 +80,11 @@ message OpWriteBlockProto {
required uint64 minBytesRcvd = 6; required uint64 minBytesRcvd = 6;
required uint64 maxBytesRcvd = 7; required uint64 maxBytesRcvd = 7;
required uint64 latestGenerationStamp = 8; required uint64 latestGenerationStamp = 8;
/**
* The requested checksum mechanism for this block write.
*/
required ChecksumProto requestedChecksum = 9;
} }
message OpTransferBlockProto { message OpTransferBlockProto {
@ -114,14 +130,30 @@ message PipelineAckProto {
repeated Status status = 2; repeated Status status = 2;
} }
/**
* Sent as part of the BlockOpResponseProto
* for READ_BLOCK and COPY_BLOCK operations.
*/
message ReadOpChecksumInfoProto {
required ChecksumProto checksum = 1;
/**
* The offset into the block at which the first packet
* will start. This is necessary since reads will align
* backwards to a checksum chunk boundary.
*/
required uint64 chunkOffset = 2;
}
message BlockOpResponseProto { message BlockOpResponseProto {
required Status status = 1; required Status status = 1;
optional string firstBadLink = 2; optional string firstBadLink = 2;
optional OpBlockChecksumResponseProto checksumResponse = 3; optional OpBlockChecksumResponseProto checksumResponse = 3;
optional ReadOpChecksumInfoProto readOpChecksumInfo = 4;
/** explanatory text which may be useful to log on the client side */ /** explanatory text which may be useful to log on the client side */
optional string message = 4; optional string message = 5;
} }
/** /**

View File

@ -31,6 +31,7 @@ import java.util.Random;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.commons.digester.SetRootRule;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -59,6 +62,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* This tests data transfer protocol handling in the Datanode. It sends * This tests data transfer protocol handling in the Datanode. It sends
@ -69,6 +73,9 @@ public class TestDataTransferProtocol extends TestCase {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestDataTransferProtocol"); "org.apache.hadoop.hdfs.TestDataTransferProtocol");
private static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
DatanodeID datanode; DatanodeID datanode;
InetSocketAddress dnAddr; InetSocketAddress dnAddr;
ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128); ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
@ -149,9 +156,6 @@ public class TestDataTransferProtocol extends TestCase {
private void writeZeroLengthPacket(ExtendedBlock block, String description) private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException { throws IOException {
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
PacketHeader hdr = new PacketHeader( PacketHeader hdr = new PacketHeader(
8, // size of packet 8, // size of packet
block.getNumBytes(), // OffsetInBlock block.getNumBytes(), // OffsetInBlock
@ -188,7 +192,8 @@ public class TestDataTransferProtocol extends TestCase {
recvBuf.reset(); recvBuf.reset();
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage, new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS); 0, block.getNumBytes(), block.getNumBytes(), newGS,
DEFAULT_CHECKSUM);
if (eofExcepted) { if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut); sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true); sendRecvData(description, true);
@ -373,15 +378,16 @@ public class TestDataTransferProtocol extends TestCase {
/* Test OP_WRITE_BLOCK */ /* Test OP_WRITE_BLOCK */
sendBuf.reset(); sendBuf.reset();
DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
sender.writeBlock(new ExtendedBlock(poolId, newBlockId), sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0L, 0L, 0L); 0, 0L, 0L, 0L,
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); badChecksum);
// bad bytes per checksum
sendOut.writeInt(-1-random.nextInt(oneMil));
recvBuf.reset(); recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut); sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true); sendRecvData("wrong bytesPerChecksum while writing", true);
@ -391,9 +397,8 @@ public class TestDataTransferProtocol extends TestCase {
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); DEFAULT_CHECKSUM);
sendOut.writeInt(512);
PacketHeader hdr = new PacketHeader( PacketHeader hdr = new PacketHeader(
4, // size of packet 4, // size of packet
@ -414,9 +419,8 @@ public class TestDataTransferProtocol extends TestCase {
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); DEFAULT_CHECKSUM);
sendOut.writeInt(512); // checksum size
hdr = new PacketHeader( hdr = new PacketHeader(
8, // size of packet 8, // size of packet
@ -462,7 +466,15 @@ public class TestDataTransferProtocol extends TestCase {
// negative length is ok. Datanode assumes we want to read the whole block. // negative length is ok. Datanode assumes we want to read the whole block.
recvBuf.reset(); recvBuf.reset();
sendResponse(Status.SUCCESS, null, null, recvOut);
BlockOpResponseProto.newBuilder()
.setStatus(Status.SUCCESS)
.setReadOpChecksumInfo(ReadOpChecksumInfoProto.newBuilder()
.setChecksum(DataTransferProtoUtil.toProto(DEFAULT_CHECKSUM))
.setChunkOffset(0L))
.build()
.writeDelimitedTo(recvOut);
sendBuf.reset(); sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil)); 0L, -1L-random.nextInt(oneMil));

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.util.DataChecksum;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -140,14 +141,13 @@ public class TestDiskError {
// write the header. // write the header.
DataOutputStream out = new DataOutputStream(s.getOutputStream()); DataOutputStream out = new DataOutputStream(s.getOutputStream());
DataChecksum checksum = DataChecksum.newDataChecksum(
DataChecksum.CHECKSUM_CRC32, 512);
new Sender(out).writeBlock(block.getBlock(), new Sender(out).writeBlock(block.getBlock(),
BlockTokenSecretManager.DUMMY_TOKEN, "", BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null, new DatanodeInfo[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L); BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum);
// write check header
out.writeByte( 1 );
out.writeInt( 512 );
out.flush(); out.flush();
// close the connection before sending the content of the block // close the connection before sending the content of the block