diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e7af2cbeab4..7f04125824f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -762,6 +762,9 @@ Release 2.8.0 - UNRELEASED HDFS-8816. Improve visualization for the Datanode tab in the NN UI. (wheat9) + HDFS-7192. DN should ignore lazyPersist hint if the writer is + not local. (Arpit Agarwal) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 0e569f0d560..1e5bf0deb8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -96,6 +96,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class; public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size"; public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; + public static final String DFS_DATANODE_NON_LOCAL_LAZY_PERSIST = + "dfs.datanode.non.local.lazy.persist"; + public static final boolean DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT = + false; // This setting is for testing/internal use only. public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 42b1b46e31c..abc9390dc88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; @@ -99,6 +101,9 @@ public class DNConf { final long maxLockedMemory; + // Allow LAZY_PERSIST writes from non-local clients? + private final boolean allowNonLocalLazyPersist; + public DNConf(Configuration conf) { this.conf = conf; socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, @@ -192,6 +197,10 @@ public class DNConf { this.restartReplicaExpiry = conf.getLong( DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; + + this.allowNonLocalLazyPersist = conf.getBoolean( + DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, + DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT); } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. @@ -265,4 +274,8 @@ public class DNConf { public boolean getIgnoreSecurePortsForTesting() { return ignoreSecurePortsForTesting; } + + public boolean getAllowNonLocalLazyPersist() { + return allowNonLocalLazyPersist; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 26d669cb5fb..089b7cddfed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -46,11 +46,11 @@ import java.nio.channels.ClosedChannelException; import java.security.MessageDigest; import java.util.Arrays; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -620,7 +620,7 @@ class DataXceiver extends Receiver implements Runnable { final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, - final boolean allowLazyPersist, + boolean allowLazyPersist, final boolean pinning, final boolean[] targetPinnings) throws IOException { previousOpClientName = clientname; @@ -629,6 +629,8 @@ class DataXceiver extends Receiver implements Runnable { final boolean isClient = !isDatanode; final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED; + allowLazyPersist = allowLazyPersist && + (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal()); long size = 0; // check single target for transfer-RBW/Finalized if (isTransfer && targets.length > 0) { @@ -661,10 +663,7 @@ class DataXceiver extends Receiver implements Runnable { + localAddress); // reply to upstream datanode or client - final DataOutputStream replyOut = new DataOutputStream( - new BufferedOutputStream( - getOutputStream(), - smallBufferSize)); + final DataOutputStream replyOut = getBufferedOutputStream(); checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE); @@ -679,7 +678,7 @@ class DataXceiver extends Receiver implements Runnable { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = new BlockReceiver(block, storageType, in, + blockReceiver = getBlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, @@ -726,19 +725,18 @@ class DataXceiver extends Receiver implements Runnable { smallBufferSize)); mirrorIn = new DataInputStream(unbufMirrorIn); - // Do not propagate allowLazyPersist to downstream DataNodes. if (targetPinnings != null && targetPinnings.length > 0) { new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, - false, targetPinnings[0], targetPinnings); + allowLazyPersist, targetPinnings[0], targetPinnings); } else { new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, - false, false, targetPinnings); + allowLazyPersist, false, targetPinnings); } mirrorOut.flush(); @@ -853,8 +851,8 @@ class DataXceiver extends Receiver implements Runnable { } //update metrics - datanode.metrics.addWriteBlockOp(elapsed()); - datanode.metrics.incrWritesFromClient(peer.isLocal(), size); + datanode.getMetrics().addWriteBlockOp(elapsed()); + datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size); } @Override @@ -1161,7 +1159,7 @@ class DataXceiver extends Receiver implements Runnable { DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver(block, storageType, + blockReceiver = getBlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, @@ -1216,6 +1214,39 @@ class DataXceiver extends Receiver implements Runnable { datanode.metrics.addReplaceBlockOp(elapsed()); } + + /** + * Separated for testing. + */ + @VisibleForTesting + BlockReceiver getBlockReceiver( + final ExtendedBlock block, final StorageType storageType, + final DataInputStream in, + final String inAddr, final String myAddr, + final BlockConstructionStage stage, + final long newGs, final long minBytesRcvd, final long maxBytesRcvd, + final String clientname, final DatanodeInfo srcDataNode, + final DataNode dn, DataChecksum requestedChecksum, + CachingStrategy cachingStrategy, + final boolean allowLazyPersist, + final boolean pinning) throws IOException { + return new BlockReceiver(block, storageType, in, + inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd, + clientname, srcDataNode, dn, requestedChecksum, + cachingStrategy, allowLazyPersist, pinning); + } + + /** + * Separated for testing. + * @return + */ + @VisibleForTesting + DataOutputStream getBufferedOutputStream() { + return new DataOutputStream( + new BufferedOutputStream(getOutputStream(), smallBufferSize)); + } + + private long elapsed() { return monotonicNow() - opStartTime; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java new file mode 100644 index 00000000000..d8a7188951e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.net.*; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.datatransfer.*; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.util.DataChecksum; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.mockito.ArgumentCaptor; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.*; + + +/** + * Mock-based unit test to verify that the DataXceiver correctly handles the + * LazyPersist hint from clients. + */ +public class TestDataXceiverLazyPersistHint { + @Rule + public Timeout timeout = new Timeout(300000); + + private enum PeerLocality { + LOCAL, + REMOTE + } + + private enum NonLocalLazyPersist { + ALLOWED, + NOT_ALLOWED + } + + /** + * Ensure that the correct hint is passed to the block receiver when + * the client is local. + */ + @Test + public void testWithLocalClient() throws IOException { + ArgumentCaptor captor = ArgumentCaptor.forClass(Boolean.class); + DataXceiver xceiver = makeStubDataXceiver( + PeerLocality.LOCAL, NonLocalLazyPersist.NOT_ALLOWED, captor); + + for (Boolean lazyPersistSetting : Arrays.asList(true, false)) { + issueWriteBlockCall(xceiver, lazyPersistSetting); + assertThat(captor.getValue(), is(lazyPersistSetting)); + } + } + + /** + * Ensure that hint is always false when the client is remote. + */ + @Test + public void testWithRemoteClient() throws IOException { + ArgumentCaptor captor = ArgumentCaptor.forClass(Boolean.class); + DataXceiver xceiver = makeStubDataXceiver( + PeerLocality.REMOTE, NonLocalLazyPersist.NOT_ALLOWED, captor); + + for (Boolean lazyPersistSetting : Arrays.asList(true, false)) { + issueWriteBlockCall(xceiver, lazyPersistSetting); + assertThat(captor.getValue(), is(false)); + } + } + + /** + * Ensure that the correct hint is passed to the block receiver when + * the client is remote AND dfs.datanode.allow.non.local.lazy.persist + * is set to true. + */ + @Test + public void testOverrideWithRemoteClient() throws IOException { + ArgumentCaptor captor = ArgumentCaptor.forClass(Boolean.class); + DataXceiver xceiver = makeStubDataXceiver( + PeerLocality.REMOTE, NonLocalLazyPersist.ALLOWED, captor); + + for (Boolean lazyPersistSetting : Arrays.asList(true, false)) { + issueWriteBlockCall(xceiver, lazyPersistSetting); + assertThat(captor.getValue(), is(lazyPersistSetting)); + } + } + + /** + * Issue a write block call with dummy parameters. The only parameter useful + * for this test is the value of lazyPersist. + */ + private void issueWriteBlockCall(DataXceiver xceiver, boolean lazyPersist) + throws IOException { + xceiver.writeBlock( + new ExtendedBlock("Dummy-pool", 0L), + StorageType.RAM_DISK, + null, + "Dummy-Client", + new DatanodeInfo[0], + new StorageType[0], + mock(DatanodeInfo.class), + BlockConstructionStage.PIPELINE_SETUP_CREATE, + 0, 0, 0, 0, + DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0), + CachingStrategy.newDefaultStrategy(), + lazyPersist, + false, null); + } + + // Helper functions to setup the mock objects. + + private static DataXceiver makeStubDataXceiver( + PeerLocality locality, + NonLocalLazyPersist nonLocalLazyPersist, + final ArgumentCaptor captor) throws IOException { + DataXceiver xceiverSpy = spy(DataXceiver.create( + getMockPeer(locality), + getMockDn(nonLocalLazyPersist), + mock(DataXceiverServer.class))); + + doReturn(mock(BlockReceiver.class)).when(xceiverSpy).getBlockReceiver( + any(ExtendedBlock.class), any(StorageType.class), + any(DataInputStream.class), anyString(), anyString(), + any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(), + anyString(), any(DatanodeInfo.class), any(DataNode.class), + any(DataChecksum.class), any(CachingStrategy.class), + captor.capture(), anyBoolean()); + doReturn(mock(DataOutputStream.class)).when(xceiverSpy) + .getBufferedOutputStream(); + return xceiverSpy; + } + + private static Peer getMockPeer(PeerLocality locality) { + Peer peer = mock(Peer.class); + when(peer.isLocal()).thenReturn(locality == PeerLocality.LOCAL); + when(peer.getRemoteAddressString()).thenReturn("1.1.1.1:1000"); + when(peer.getLocalAddressString()).thenReturn("2.2.2.2:2000"); + return peer; + } + + private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean( + DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, + nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED); + DNConf dnConf = new DNConf(conf); + DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class); + DataNode mockDn = mock(DataNode.class); + when(mockDn.getDnConf()).thenReturn(dnConf); + when(mockDn.getConf()).thenReturn(conf); + when(mockDn.getMetrics()).thenReturn(mockMetrics); + return mockDn; + } +}