HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
This commit is contained in:
arp 2014-08-27 15:13:20 -07:00 committed by Jitendra Pandey
parent 1d9d29ad40
commit 8c3c0ec977
9 changed files with 31 additions and 15 deletions

View File

@ -360,6 +360,7 @@ public class DFSOutputStream extends FSOutputSummer
private long restartDeadline = 0; // Deadline of DN restart private long restartDeadline = 0; // Deadline of DN restart
private BlockConstructionStage stage; // block construction stage private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent private long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile;
/** Nodes have been used in the pipeline before and have failed. */ /** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>(); private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
@ -377,14 +378,15 @@ public class DFSOutputStream extends FSOutputSummer
* Default construction for file create * Default construction for file create
*/ */
private DataStreamer() { private DataStreamer() {
this(null); this(null, null);
} }
/** /**
* construction with tracing info * construction with tracing info
*/ */
private DataStreamer(Span span) { private DataStreamer(HdfsFileStatus stat, Span span) {
isAppend = false; isAppend = false;
isLazyPersistFile = stat.isLazyPersist();
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span; traceSpan = span;
} }
@ -404,6 +406,7 @@ public class DFSOutputStream extends FSOutputSummer
block = lastBlock.getBlock(); block = lastBlock.getBlock();
bytesSent = block.getNumBytes(); bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken(); accessToken = lastBlock.getBlockToken();
isLazyPersistFile = stat.isLazyPersist();
long usedInLastBlock = stat.getLen() % blockSize; long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock); int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@ -1396,7 +1399,7 @@ public class DFSOutputStream extends FSOutputSummer
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy.get()); cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1649,7 +1652,7 @@ public class DFSOutputStream extends FSOutputSummer
if (Trace.isTracing()) { if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
} }
streamer = new DataStreamer(traceSpan); streamer = new DataStreamer(stat, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) { if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes); streamer.setFavoredNodes(favoredNodes);
} }
@ -1726,7 +1729,7 @@ public class DFSOutputStream extends FSOutputSummer
} else { } else {
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum()); checksum.getBytesPerChecksum());
streamer = new DataStreamer(traceSpan); streamer = new DataStreamer(stat, traceSpan);
} }
this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.fileEncryptionInfo = stat.getFileEncryptionInfo();
} }

View File

@ -106,8 +106,8 @@ public interface DataTransferProtocol {
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp, final long latestGenerationStamp,
final DataChecksum requestedChecksum, final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException; final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException;
/** /**
* Transfer a block to another datanode. * Transfer a block to another datanode.
* The block stage must be * The block stage must be

View File

@ -148,7 +148,8 @@ public abstract class Receiver implements DataTransferProtocol {
fromProto(proto.getRequestedChecksum()), fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ? (proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) : getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy())); CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }

View File

@ -128,7 +128,8 @@ public class Sender implements DataTransferProtocol {
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp, final long latestGenerationStamp,
DataChecksum requestedChecksum, DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException { final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken); blk, clientName, blockToken);
@ -146,7 +147,8 @@ public class Sender implements DataTransferProtocol {
.setMaxBytesRcvd(maxBytesRcvd) .setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp) .setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto) .setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy)); .setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist);
if (source != null) { if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source)); proto.setSource(PBHelper.convertDatanodeInfo(source));

View File

@ -2010,7 +2010,8 @@ public class DataNode extends ReconfigurableBase
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode, clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
false);
// send data & checksum // send data & checksum
blockSender.sendBlock(out, unbufOut, null); blockSender.sendBlock(out, unbufOut, null);

View File

@ -544,7 +544,8 @@ class DataXceiver extends Receiver implements Runnable {
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp, final long latestGenerationStamp,
DataChecksum requestedChecksum, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy) throws IOException { CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
previousOpClientName = clientname; previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block); updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0; final boolean isDatanode = clientname.length() == 0;
@ -648,10 +649,11 @@ class DataXceiver extends Receiver implements Runnable {
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn); mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes.
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode, blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy); latestGenerationStamp, requestedChecksum, cachingStrategy, false);
mirrorOut.flush(); mirrorOut.flush();

View File

@ -115,6 +115,13 @@ message OpWriteBlockProto {
optional CachingStrategyProto cachingStrategy = 10; optional CachingStrategyProto cachingStrategy = 10;
optional StorageTypeProto storageType = 11 [default = DISK]; optional StorageTypeProto storageType = 11 [default = DISK];
repeated StorageTypeProto targetStorageTypes = 12; repeated StorageTypeProto targetStorageTypes = 12;
/**
* Hint to the DataNode that the block can be allocated on transient
* storage i.e. memory and written to disk lazily. The DataNode is free
* to ignore this hint.
*/
optional bool allowLazyPersist = 13 [default = false];
} }
message OpTransferBlockProto { message OpTransferBlockProto {

View File

@ -530,6 +530,6 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage, new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS, 0, block.getNumBytes(), block.getNumBytes(), newGS,
checksum, CachingStrategy.newDefaultStrategy()); checksum, CachingStrategy.newDefaultStrategy(), false);
} }
} }

View File

@ -152,7 +152,7 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "", BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null, new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum, CachingStrategy.newDefaultStrategy()); checksum, CachingStrategy.newDefaultStrategy(), false);
out.flush(); out.flush();
// close the connection before sending the content of the block // close the connection before sending the content of the block