HDFS-7192. DN should ignore lazyPersist hint if the writer is not local. (Contributed by Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2015-07-30 13:16:46 -07:00
parent 91b42e7d6e
commit 88d8736dde
5 changed files with 242 additions and 13 deletions

View File

@ -762,6 +762,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8816. Improve visualization for the Datanode tab in the NN UI. (wheat9)
HDFS-7192. DN should ignore lazyPersist hint if the writer is
not local. (Arpit Agarwal)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -96,6 +96,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
public static final String DFS_DATANODE_NON_LOCAL_LAZY_PERSIST =
"dfs.datanode.non.local.lazy.persist";
public static final boolean DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT =
false;
// This setting is for testing/internal use only.
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";

View File

@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@ -99,6 +101,9 @@ public class DNConf {
final long maxLockedMemory;
// Allow LAZY_PERSIST writes from non-local clients?
private final boolean allowNonLocalLazyPersist;
public DNConf(Configuration conf) {
this.conf = conf;
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@ -192,6 +197,10 @@ public class DNConf {
this.restartReplicaExpiry = conf.getLong(
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
this.allowNonLocalLazyPersist = conf.getBoolean(
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@ -265,4 +274,8 @@ public class DNConf {
public boolean getIgnoreSecurePortsForTesting() {
return ignoreSecurePortsForTesting;
}
public boolean getAllowNonLocalLazyPersist() {
return allowNonLocalLazyPersist;
}
}

View File

@ -46,11 +46,11 @@ import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -620,7 +620,7 @@ class DataXceiver extends Receiver implements Runnable {
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException {
previousOpClientName = clientname;
@ -629,6 +629,8 @@ class DataXceiver extends Receiver implements Runnable {
final boolean isClient = !isDatanode;
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
allowLazyPersist = allowLazyPersist &&
(dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
long size = 0;
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
@ -661,10 +663,7 @@ class DataXceiver extends Receiver implements Runnable {
+ localAddress);
// reply to upstream datanode or client
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
getOutputStream(),
smallBufferSize));
final DataOutputStream replyOut = getBufferedOutputStream();
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
@ -679,7 +678,7 @@ class DataXceiver extends Receiver implements Runnable {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
blockReceiver = new BlockReceiver(block, storageType, in,
blockReceiver = getBlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
@ -726,19 +725,18 @@ class DataXceiver extends Receiver implements Runnable {
smallBufferSize));
mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes.
if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, targetPinnings[0], targetPinnings);
allowLazyPersist, targetPinnings[0], targetPinnings);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, false, targetPinnings);
allowLazyPersist, false, targetPinnings);
}
mirrorOut.flush();
@ -853,8 +851,8 @@ class DataXceiver extends Receiver implements Runnable {
}
//update metrics
datanode.metrics.addWriteBlockOp(elapsed());
datanode.metrics.incrWritesFromClient(peer.isLocal(), size);
datanode.getMetrics().addWriteBlockOp(elapsed());
datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
}
@Override
@ -1161,7 +1159,7 @@ class DataXceiver extends Receiver implements Runnable {
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(block, storageType,
blockReceiver = getBlockReceiver(block, storageType,
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
@ -1216,6 +1214,39 @@ class DataXceiver extends Receiver implements Runnable {
datanode.metrics.addReplaceBlockOp(elapsed());
}
/**
* Separated for testing.
*/
@VisibleForTesting
BlockReceiver getBlockReceiver(
final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode dn, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
final boolean pinning) throws IOException {
return new BlockReceiver(block, storageType, in,
inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, dn, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning);
}
/**
* Separated for testing.
* @return
*/
@VisibleForTesting
DataOutputStream getBufferedOutputStream() {
return new DataOutputStream(
new BufferedOutputStream(getOutputStream(), smallBufferSize));
}
private long elapsed() {
return monotonicNow() - opStartTime;
}

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.net.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.util.DataChecksum;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.ArgumentCaptor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.*;
/**
* Mock-based unit test to verify that the DataXceiver correctly handles the
* LazyPersist hint from clients.
*/
public class TestDataXceiverLazyPersistHint {
@Rule
public Timeout timeout = new Timeout(300000);
private enum PeerLocality {
LOCAL,
REMOTE
}
private enum NonLocalLazyPersist {
ALLOWED,
NOT_ALLOWED
}
/**
* Ensure that the correct hint is passed to the block receiver when
* the client is local.
*/
@Test
public void testWithLocalClient() throws IOException {
ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
DataXceiver xceiver = makeStubDataXceiver(
PeerLocality.LOCAL, NonLocalLazyPersist.NOT_ALLOWED, captor);
for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
issueWriteBlockCall(xceiver, lazyPersistSetting);
assertThat(captor.getValue(), is(lazyPersistSetting));
}
}
/**
* Ensure that hint is always false when the client is remote.
*/
@Test
public void testWithRemoteClient() throws IOException {
ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
DataXceiver xceiver = makeStubDataXceiver(
PeerLocality.REMOTE, NonLocalLazyPersist.NOT_ALLOWED, captor);
for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
issueWriteBlockCall(xceiver, lazyPersistSetting);
assertThat(captor.getValue(), is(false));
}
}
/**
* Ensure that the correct hint is passed to the block receiver when
* the client is remote AND dfs.datanode.allow.non.local.lazy.persist
* is set to true.
*/
@Test
public void testOverrideWithRemoteClient() throws IOException {
ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
DataXceiver xceiver = makeStubDataXceiver(
PeerLocality.REMOTE, NonLocalLazyPersist.ALLOWED, captor);
for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
issueWriteBlockCall(xceiver, lazyPersistSetting);
assertThat(captor.getValue(), is(lazyPersistSetting));
}
}
/**
* Issue a write block call with dummy parameters. The only parameter useful
* for this test is the value of lazyPersist.
*/
private void issueWriteBlockCall(DataXceiver xceiver, boolean lazyPersist)
throws IOException {
xceiver.writeBlock(
new ExtendedBlock("Dummy-pool", 0L),
StorageType.RAM_DISK,
null,
"Dummy-Client",
new DatanodeInfo[0],
new StorageType[0],
mock(DatanodeInfo.class),
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0, 0, 0,
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
CachingStrategy.newDefaultStrategy(),
lazyPersist,
false, null);
}
// Helper functions to setup the mock objects.
private static DataXceiver makeStubDataXceiver(
PeerLocality locality,
NonLocalLazyPersist nonLocalLazyPersist,
final ArgumentCaptor<Boolean> captor) throws IOException {
DataXceiver xceiverSpy = spy(DataXceiver.create(
getMockPeer(locality),
getMockDn(nonLocalLazyPersist),
mock(DataXceiverServer.class)));
doReturn(mock(BlockReceiver.class)).when(xceiverSpy).getBlockReceiver(
any(ExtendedBlock.class), any(StorageType.class),
any(DataInputStream.class), anyString(), anyString(),
any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
anyString(), any(DatanodeInfo.class), any(DataNode.class),
any(DataChecksum.class), any(CachingStrategy.class),
captor.capture(), anyBoolean());
doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
.getBufferedOutputStream();
return xceiverSpy;
}
private static Peer getMockPeer(PeerLocality locality) {
Peer peer = mock(Peer.class);
when(peer.isLocal()).thenReturn(locality == PeerLocality.LOCAL);
when(peer.getRemoteAddressString()).thenReturn("1.1.1.1:1000");
when(peer.getLocalAddressString()).thenReturn("2.2.2.2:2000");
return peer;
}
private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
DNConf dnConf = new DNConf(conf);
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
DataNode mockDn = mock(DataNode.class);
when(mockDn.getDnConf()).thenReturn(dnConf);
when(mockDn.getConf()).thenReturn(conf);
when(mockDn.getMetrics()).thenReturn(mockMetrics);
return mockDn;
}
}