diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index fbe81712fb9..2bcda8b08ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -360,6 +360,7 @@ public class DFSOutputStream extends FSOutputSummer private long restartDeadline = 0; // Deadline of DN restart private BlockConstructionStage stage; // block construction stage private long bytesSent = 0; // number of bytes that've been sent + private final boolean isLazyPersistFile; /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList(); @@ -377,14 +378,15 @@ public class DFSOutputStream extends FSOutputSummer * Default construction for file create */ private DataStreamer() { - this(null); + this(null, null); } /** * construction with tracing info */ - private DataStreamer(Span span) { + private DataStreamer(HdfsFileStatus stat, Span span) { isAppend = false; + isLazyPersistFile = stat.isLazyPersist(); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; traceSpan = span; } @@ -404,6 +406,7 @@ public class DFSOutputStream extends FSOutputSummer block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); + isLazyPersistFile = stat.isLazyPersist(); long usedInLastBlock = stat.getLen() % blockSize; int freeInLastBlock = (int)(blockSize - usedInLastBlock); @@ -1396,7 +1399,7 @@ public class DFSOutputStream extends FSOutputSummer new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, - cachingStrategy.get()); + cachingStrategy.get(), isLazyPersistFile); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( @@ -1649,7 +1652,7 @@ public class DFSOutputStream extends FSOutputSummer if (Trace.isTracing()) { traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); } - streamer = new DataStreamer(traceSpan); + streamer = new DataStreamer(stat, traceSpan); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } @@ -1726,7 +1729,7 @@ public class DFSOutputStream extends FSOutputSummer } else { computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); - streamer = new DataStreamer(traceSpan); + streamer = new DataStreamer(stat, traceSpan); } this.fileEncryptionInfo = stat.getFileEncryptionInfo(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index d54d5bed002..f6b99e61601 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -106,8 +106,8 @@ public interface DataTransferProtocol { final long maxBytesRcvd, final long latestGenerationStamp, final DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy) throws IOException; - + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist) throws IOException; /** * Transfer a block to another datanode. * The block stage must be diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index daae9b71129..538c82d7f78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -148,7 +148,8 @@ public abstract class Receiver implements DataTransferProtocol { fromProto(proto.getRequestedChecksum()), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : - CachingStrategy.newDefaultStrategy())); + CachingStrategy.newDefaultStrategy()), + (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false)); } finally { if (traceScope != null) traceScope.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index fb6cf2cc3fd..1ae9da53ca0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -128,7 +128,8 @@ public class Sender implements DataTransferProtocol { final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy) throws IOException { + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist) throws IOException { ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken); @@ -146,7 +147,8 @@ public class Sender implements DataTransferProtocol { .setMaxBytesRcvd(maxBytesRcvd) .setLatestGenerationStamp(latestGenerationStamp) .setRequestedChecksum(checksumProto) - .setCachingStrategy(getCachingStrategy(cachingStrategy)); + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .setAllowLazyPersist(allowLazyPersist); if (source != null) { proto.setSource(PBHelper.convertDatanodeInfo(source)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 83371025585..a4db56a21b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2010,7 +2010,8 @@ public class DataNode extends ReconfigurableBase new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, clientname, targets, targetStorageTypes, srcNode, - stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); + stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, + false); // send data & checksum blockSender.sendBlock(out, unbufOut, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 4575c9353ce..3b8304e7187 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -544,7 +544,8 @@ class DataXceiver extends Receiver implements Runnable { final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, - CachingStrategy cachingStrategy) throws IOException { + CachingStrategy cachingStrategy, + final boolean allowLazyPersist) throws IOException { previousOpClientName = clientname; updateCurrentThreadName("Receiving block " + block); final boolean isDatanode = clientname.length() == 0; @@ -648,10 +649,11 @@ class DataXceiver extends Receiver implements Runnable { HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); + // Do not propagate allowLazyPersist to downstream DataNodes. new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, - latestGenerationStamp, requestedChecksum, cachingStrategy); + latestGenerationStamp, requestedChecksum, cachingStrategy, false); mirrorOut.flush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 098d10ab9a4..fb774b78faa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -115,6 +115,13 @@ message OpWriteBlockProto { optional CachingStrategyProto cachingStrategy = 10; optional StorageTypeProto storageType = 11 [default = DISK]; repeated StorageTypeProto targetStorageTypes = 12; + + /** + * Hint to the DataNode that the block can be allocated on transient + * storage i.e. memory and written to disk lazily. The DataNode is free + * to ignore this hint. + */ + optional bool allowLazyPersist = 13 [default = false]; } message OpTransferBlockProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 494dd1d5316..6405b5a0e45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -530,6 +530,6 @@ public class TestDataTransferProtocol { BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, - checksum, CachingStrategy.newDefaultStrategy()); + checksum, CachingStrategy.newDefaultStrategy(), false); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index 4b5b6e1ec4f..f440bb6fe5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -152,7 +152,7 @@ public class TestDiskError { BlockTokenSecretManager.DUMMY_TOKEN, "", new DatanodeInfo[0], new StorageType[0], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, - checksum, CachingStrategy.newDefaultStrategy()); + checksum, CachingStrategy.newDefaultStrategy(), false); out.flush(); // close the connection before sending the content of the block