HDFS-13448. HDFS Block Placement - Ignore Locality for First Block Replica

(Contributed by BELUGA BEHR via Daniel Templeton)

Change-Id: I965d1cfa642ad24296038b83e3d5c9983545267d
(cherry picked from commit 849c45db18)
(cherry picked from commit 00c476abd8)
This commit is contained in:
Daniel Templeton 2018-07-24 15:34:19 -07:00
parent 1d8fce0d2f
commit 3210b3d8aa
8 changed files with 134 additions and 14 deletions

View File

@ -116,7 +116,14 @@ public enum CreateFlag {
* Enforce the file to be a replicated file, no matter what its parent * Enforce the file to be a replicated file, no matter what its parent
* directory's replication or erasure coding policy is. * directory's replication or erasure coding policy is.
*/ */
SHOULD_REPLICATE((short) 0x80); SHOULD_REPLICATE((short) 0x80),
/**
* Advise that the first block replica NOT take into account DataNode
* locality. The first block replica should be placed randomly within the
* cluster. Subsequent block replicas should follow DataNode locality rules.
*/
IGNORE_CLIENT_LOCALITY((short) 0x100);
private final short mode; private final short mode;

View File

@ -36,7 +36,16 @@ public enum AddBlockFlag {
* *
* @see CreateFlag#NO_LOCAL_WRITE * @see CreateFlag#NO_LOCAL_WRITE
*/ */
NO_LOCAL_WRITE((short) 0x01); NO_LOCAL_WRITE((short) 0x01),
/**
* Advise that the first block replica NOT take into account DataNode
* locality. The first block replica should be placed randomly within the
* cluster. Subsequent block replicas should follow DataNode locality rules.
*
* @see CreateFlag#IGNORE_CLIENT_LOCALITY
*/
IGNORE_CLIENT_LOCALITY((short) 0x02);
private final short mode; private final short mode;

View File

@ -201,6 +201,9 @@ public class DFSOutputStream extends FSOutputSummer
if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
} }
if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) {
this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
}
if (progress != null) { if (progress != null) {
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
+"{}", src); +"{}", src);

View File

@ -2987,6 +2987,17 @@ public class DistributedFileSystem extends FileSystem
return this; return this;
} }
/**
* Advise that the first block replica be written without regard to the
* client locality.
*
* @see CreateFlag for the details.
*/
public HdfsDataOutputStreamBuilder ignoreClientLocality() {
getFlags().add(CreateFlag.IGNORE_CLIENT_LOCALITY);
return this;
}
@VisibleForTesting @VisibleForTesting
@Override @Override
protected EnumSet<CreateFlag> getFlags() { protected EnumSet<CreateFlag> getFlags() {

View File

@ -167,6 +167,7 @@ message AbandonBlockResponseProto { // void response
enum AddBlockFlagProto { enum AddBlockFlagProto {
NO_LOCAL_WRITE = 1; // avoid writing to local node. NO_LOCAL_WRITE = 1; // avoid writing to local node.
IGNORE_CLIENT_LOCALITY = 2; // write to a random node
} }
message AddBlockRequestProto { message AddBlockRequestProto {

View File

@ -275,7 +275,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (avoidLocalNode) { if (avoidLocalNode) {
results = new ArrayList<>(chosenStorage); results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes); Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
excludedNodeCopy.add(writer); if (writer != null) {
excludedNodeCopy.add(writer);
}
localNode = chooseTarget(numOfReplicas, writer, localNode = chooseTarget(numOfReplicas, writer,
excludedNodeCopy, blocksize, maxNodesPerRack, results, excludedNodeCopy, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storagePolicy, avoidStaleNodes, storagePolicy,

View File

@ -270,19 +270,27 @@ class FSDirWriteFileOp {
BlockManager bm, String src, DatanodeInfo[] excludedNodes, BlockManager bm, String src, DatanodeInfo[] excludedNodes,
String[] favoredNodes, EnumSet<AddBlockFlag> flags, String[] favoredNodes, EnumSet<AddBlockFlag> flags,
ValidateAddBlockResult r) throws IOException { ValidateAddBlockResult r) throws IOException {
Node clientNode = bm.getDatanodeManager() Node clientNode = null;
.getDatanodeByHost(r.clientMachine);
if (clientNode == null) { boolean ignoreClientLocality = (flags != null
clientNode = getClientNode(bm, r.clientMachine); && flags.contains(AddBlockFlag.IGNORE_CLIENT_LOCALITY));
// If client locality is ignored, clientNode remains 'null' to indicate
if (!ignoreClientLocality) {
clientNode = bm.getDatanodeManager().getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
clientNode = getClientNode(bm, r.clientMachine);
}
} }
Set<Node> excludedNodesSet = null; Set<Node> excludedNodesSet =
if (excludedNodes != null) { (excludedNodes == null) ? new HashSet<>()
excludedNodesSet = new HashSet<>(excludedNodes.length); : new HashSet<>(Arrays.asList(excludedNodes));
Collections.addAll(excludedNodesSet, excludedNodes);
} List<String> favoredNodesList =
List<String> favoredNodesList = (favoredNodes == null) ? null (favoredNodes == null) ? Collections.emptyList()
: Arrays.asList(favoredNodes); : Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated. // choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
excludedNodesSet, r.blockSize, excludedNodesSet, r.blockSize,

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyByte;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anySet;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.ValidateAddBlockResult;
import org.apache.hadoop.net.Node;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestFSDirWriteFileOp {
@Test
@SuppressWarnings("unchecked")
public void testIgnoreClientLocality() throws IOException {
ValidateAddBlockResult addBlockResult =
new ValidateAddBlockResult(1024L, 3, (byte) 0x01, null, null, null);
EnumSet<AddBlockFlag> addBlockFlags =
EnumSet.of(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
BlockManager bmMock = mock(BlockManager.class);
ArgumentCaptor<Node> nodeCaptor = ArgumentCaptor.forClass(Node.class);
when(bmMock.chooseTarget4NewBlock(anyString(), anyInt(), any(), anySet(),
anyLong(), anyList(), anyByte(), any(), any(), any())).thenReturn(null);
FSDirWriteFileOp.chooseTargetForNewBlock(bmMock, "localhost", null, null,
addBlockFlags, addBlockResult);
// There should be no other interactions with the block manager when the
// IGNORE_CLIENT_LOCALITY is passed in because there is no need to discover
// the local node requesting the new block
verify(bmMock, times(1)).chooseTarget4NewBlock(anyString(), anyInt(),
nodeCaptor.capture(), anySet(), anyLong(), anyList(), anyByte(), any(),
any(), any());
verifyNoMoreInteractions(bmMock);
assertNull(
"Source node was assigned a value. Expected 'null' value because "
+ "chooseTarget was flagged to ignore source node locality",
nodeCaptor.getValue());
}
}