HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol. (Arpit Agarwal)
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
This commit is contained in:
parent
183f4a4dfb
commit
3b7d4715a1
|
@ -360,6 +360,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private long restartDeadline = 0; // Deadline of DN restart
|
||||
private BlockConstructionStage stage; // block construction stage
|
||||
private long bytesSent = 0; // number of bytes that've been sent
|
||||
private final boolean isLazyPersistFile;
|
||||
|
||||
/** Nodes have been used in the pipeline before and have failed. */
|
||||
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
|
||||
|
@ -377,14 +378,15 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
* Default construction for file create
|
||||
*/
|
||||
private DataStreamer() {
|
||||
this(null);
|
||||
this(null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* construction with tracing info
|
||||
*/
|
||||
private DataStreamer(Span span) {
|
||||
private DataStreamer(HdfsFileStatus stat, Span span) {
|
||||
isAppend = false;
|
||||
isLazyPersistFile = stat.isLazyPersist();
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
traceSpan = span;
|
||||
}
|
||||
|
@ -404,6 +406,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
block = lastBlock.getBlock();
|
||||
bytesSent = block.getNumBytes();
|
||||
accessToken = lastBlock.getBlockToken();
|
||||
isLazyPersistFile = stat.isLazyPersist();
|
||||
long usedInLastBlock = stat.getLen() % blockSize;
|
||||
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
|
||||
|
||||
|
@ -1396,7 +1399,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
|
||||
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
|
||||
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
|
||||
cachingStrategy.get());
|
||||
cachingStrategy.get(), isLazyPersistFile);
|
||||
|
||||
// receive ack for connect
|
||||
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
|
||||
|
@ -1649,7 +1652,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
if (Trace.isTracing()) {
|
||||
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
|
||||
}
|
||||
streamer = new DataStreamer(traceSpan);
|
||||
streamer = new DataStreamer(stat, traceSpan);
|
||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||
streamer.setFavoredNodes(favoredNodes);
|
||||
}
|
||||
|
@ -1726,7 +1729,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
} else {
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
streamer = new DataStreamer(traceSpan);
|
||||
streamer = new DataStreamer(stat, traceSpan);
|
||||
}
|
||||
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
||||
}
|
||||
|
|
|
@ -106,8 +106,8 @@ public interface DataTransferProtocol {
|
|||
final long maxBytesRcvd,
|
||||
final long latestGenerationStamp,
|
||||
final DataChecksum requestedChecksum,
|
||||
final CachingStrategy cachingStrategy) throws IOException;
|
||||
|
||||
final CachingStrategy cachingStrategy,
|
||||
final boolean allowLazyPersist) throws IOException;
|
||||
/**
|
||||
* Transfer a block to another datanode.
|
||||
* The block stage must be
|
||||
|
|
|
@ -148,7 +148,8 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
fromProto(proto.getRequestedChecksum()),
|
||||
(proto.hasCachingStrategy() ?
|
||||
getCachingStrategy(proto.getCachingStrategy()) :
|
||||
CachingStrategy.newDefaultStrategy()));
|
||||
CachingStrategy.newDefaultStrategy()),
|
||||
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
|
|
|
@ -128,7 +128,8 @@ public class Sender implements DataTransferProtocol {
|
|||
final long maxBytesRcvd,
|
||||
final long latestGenerationStamp,
|
||||
DataChecksum requestedChecksum,
|
||||
final CachingStrategy cachingStrategy) throws IOException {
|
||||
final CachingStrategy cachingStrategy,
|
||||
final boolean allowLazyPersist) throws IOException {
|
||||
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
|
||||
blk, clientName, blockToken);
|
||||
|
||||
|
@ -146,7 +147,8 @@ public class Sender implements DataTransferProtocol {
|
|||
.setMaxBytesRcvd(maxBytesRcvd)
|
||||
.setLatestGenerationStamp(latestGenerationStamp)
|
||||
.setRequestedChecksum(checksumProto)
|
||||
.setCachingStrategy(getCachingStrategy(cachingStrategy));
|
||||
.setCachingStrategy(getCachingStrategy(cachingStrategy))
|
||||
.setAllowLazyPersist(allowLazyPersist);
|
||||
|
||||
if (source != null) {
|
||||
proto.setSource(PBHelper.convertDatanodeInfo(source));
|
||||
|
|
|
@ -2010,7 +2010,8 @@ public class DataNode extends ReconfigurableBase
|
|||
|
||||
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
|
||||
clientname, targets, targetStorageTypes, srcNode,
|
||||
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
|
||||
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
|
||||
false);
|
||||
|
||||
// send data & checksum
|
||||
blockSender.sendBlock(out, unbufOut, null);
|
||||
|
|
|
@ -544,7 +544,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
final long maxBytesRcvd,
|
||||
final long latestGenerationStamp,
|
||||
DataChecksum requestedChecksum,
|
||||
CachingStrategy cachingStrategy) throws IOException {
|
||||
CachingStrategy cachingStrategy,
|
||||
final boolean allowLazyPersist) throws IOException {
|
||||
previousOpClientName = clientname;
|
||||
updateCurrentThreadName("Receiving block " + block);
|
||||
final boolean isDatanode = clientname.length() == 0;
|
||||
|
@ -648,10 +649,11 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
mirrorIn = new DataInputStream(unbufMirrorIn);
|
||||
|
||||
// Do not propagate allowLazyPersist to downstream DataNodes.
|
||||
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
|
||||
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
|
||||
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
|
||||
latestGenerationStamp, requestedChecksum, cachingStrategy);
|
||||
latestGenerationStamp, requestedChecksum, cachingStrategy, false);
|
||||
|
||||
mirrorOut.flush();
|
||||
|
||||
|
|
|
@ -115,6 +115,13 @@ message OpWriteBlockProto {
|
|||
optional CachingStrategyProto cachingStrategy = 10;
|
||||
optional StorageTypeProto storageType = 11 [default = DISK];
|
||||
repeated StorageTypeProto targetStorageTypes = 12;
|
||||
|
||||
/**
|
||||
* Hint to the DataNode that the block can be allocated on transient
|
||||
* storage i.e. memory and written to disk lazily. The DataNode is free
|
||||
* to ignore this hint.
|
||||
*/
|
||||
optional bool allowLazyPersist = 13 [default = false];
|
||||
}
|
||||
|
||||
message OpTransferBlockProto {
|
||||
|
|
|
@ -530,6 +530,6 @@ public class TestDataTransferProtocol {
|
|||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], new StorageType[1], null, stage,
|
||||
0, block.getNumBytes(), block.getNumBytes(), newGS,
|
||||
checksum, CachingStrategy.newDefaultStrategy());
|
||||
checksum, CachingStrategy.newDefaultStrategy(), false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -152,7 +152,7 @@ public class TestDiskError {
|
|||
BlockTokenSecretManager.DUMMY_TOKEN, "",
|
||||
new DatanodeInfo[0], new StorageType[0], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
|
||||
checksum, CachingStrategy.newDefaultStrategy());
|
||||
checksum, CachingStrategy.newDefaultStrategy(), false);
|
||||
out.flush();
|
||||
|
||||
// close the connection before sending the content of the block
|
||||
|
|
Loading…
Reference in New Issue