HDFS-13448. HDFS Block Placement - Ignore Locality for First Block Replica
(Contributed by BELUGA BEHR via Daniel Templeton)
Change-Id: I965d1cfa642ad24296038b83e3d5c9983545267d
(cherry picked from commit 849c45db18
)
This commit is contained in:
parent
7e7792dd7b
commit
00c476abd8
|
@ -116,7 +116,14 @@ public enum CreateFlag {
|
||||||
* Enforce the file to be a replicated file, no matter what its parent
|
* Enforce the file to be a replicated file, no matter what its parent
|
||||||
* directory's replication or erasure coding policy is.
|
* directory's replication or erasure coding policy is.
|
||||||
*/
|
*/
|
||||||
SHOULD_REPLICATE((short) 0x80);
|
SHOULD_REPLICATE((short) 0x80),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Advise that the first block replica NOT take into account DataNode
|
||||||
|
* locality. The first block replica should be placed randomly within the
|
||||||
|
* cluster. Subsequent block replicas should follow DataNode locality rules.
|
||||||
|
*/
|
||||||
|
IGNORE_CLIENT_LOCALITY((short) 0x100);
|
||||||
|
|
||||||
private final short mode;
|
private final short mode;
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,16 @@ public enum AddBlockFlag {
|
||||||
*
|
*
|
||||||
* @see CreateFlag#NO_LOCAL_WRITE
|
* @see CreateFlag#NO_LOCAL_WRITE
|
||||||
*/
|
*/
|
||||||
NO_LOCAL_WRITE((short) 0x01);
|
NO_LOCAL_WRITE((short) 0x01),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Advise that the first block replica NOT take into account DataNode
|
||||||
|
* locality. The first block replica should be placed randomly within the
|
||||||
|
* cluster. Subsequent block replicas should follow DataNode locality rules.
|
||||||
|
*
|
||||||
|
* @see CreateFlag#IGNORE_CLIENT_LOCALITY
|
||||||
|
*/
|
||||||
|
IGNORE_CLIENT_LOCALITY((short) 0x02);
|
||||||
|
|
||||||
private final short mode;
|
private final short mode;
|
||||||
|
|
||||||
|
|
|
@ -201,6 +201,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
|
if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
|
||||||
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
|
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
|
||||||
}
|
}
|
||||||
|
if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) {
|
||||||
|
this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
|
||||||
|
}
|
||||||
if (progress != null) {
|
if (progress != null) {
|
||||||
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
|
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
|
||||||
+"{}", src);
|
+"{}", src);
|
||||||
|
|
|
@ -3205,6 +3205,17 @@ public class DistributedFileSystem extends FileSystem
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Advise that the first block replica be written without regard to the
|
||||||
|
* client locality.
|
||||||
|
*
|
||||||
|
* @see CreateFlag for the details.
|
||||||
|
*/
|
||||||
|
public HdfsDataOutputStreamBuilder ignoreClientLocality() {
|
||||||
|
getFlags().add(CreateFlag.IGNORE_CLIENT_LOCALITY);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@Override
|
@Override
|
||||||
protected EnumSet<CreateFlag> getFlags() {
|
protected EnumSet<CreateFlag> getFlags() {
|
||||||
|
|
|
@ -167,6 +167,7 @@ message AbandonBlockResponseProto { // void response
|
||||||
|
|
||||||
enum AddBlockFlagProto {
|
enum AddBlockFlagProto {
|
||||||
NO_LOCAL_WRITE = 1; // avoid writing to local node.
|
NO_LOCAL_WRITE = 1; // avoid writing to local node.
|
||||||
|
IGNORE_CLIENT_LOCALITY = 2; // write to a random node
|
||||||
}
|
}
|
||||||
|
|
||||||
message AddBlockRequestProto {
|
message AddBlockRequestProto {
|
||||||
|
|
|
@ -275,7 +275,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
if (avoidLocalNode) {
|
if (avoidLocalNode) {
|
||||||
results = new ArrayList<>(chosenStorage);
|
results = new ArrayList<>(chosenStorage);
|
||||||
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
|
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
|
||||||
excludedNodeCopy.add(writer);
|
if (writer != null) {
|
||||||
|
excludedNodeCopy.add(writer);
|
||||||
|
}
|
||||||
localNode = chooseTarget(numOfReplicas, writer,
|
localNode = chooseTarget(numOfReplicas, writer,
|
||||||
excludedNodeCopy, blocksize, maxNodesPerRack, results,
|
excludedNodeCopy, blocksize, maxNodesPerRack, results,
|
||||||
avoidStaleNodes, storagePolicy,
|
avoidStaleNodes, storagePolicy,
|
||||||
|
|
|
@ -270,19 +270,27 @@ class FSDirWriteFileOp {
|
||||||
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
|
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
|
||||||
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
|
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
|
||||||
ValidateAddBlockResult r) throws IOException {
|
ValidateAddBlockResult r) throws IOException {
|
||||||
Node clientNode = bm.getDatanodeManager()
|
Node clientNode = null;
|
||||||
.getDatanodeByHost(r.clientMachine);
|
|
||||||
if (clientNode == null) {
|
boolean ignoreClientLocality = (flags != null
|
||||||
clientNode = getClientNode(bm, r.clientMachine);
|
&& flags.contains(AddBlockFlag.IGNORE_CLIENT_LOCALITY));
|
||||||
|
|
||||||
|
// If client locality is ignored, clientNode remains 'null' to indicate
|
||||||
|
if (!ignoreClientLocality) {
|
||||||
|
clientNode = bm.getDatanodeManager().getDatanodeByHost(r.clientMachine);
|
||||||
|
if (clientNode == null) {
|
||||||
|
clientNode = getClientNode(bm, r.clientMachine);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<Node> excludedNodesSet = null;
|
Set<Node> excludedNodesSet =
|
||||||
if (excludedNodes != null) {
|
(excludedNodes == null) ? new HashSet<>()
|
||||||
excludedNodesSet = new HashSet<>(excludedNodes.length);
|
: new HashSet<>(Arrays.asList(excludedNodes));
|
||||||
Collections.addAll(excludedNodesSet, excludedNodes);
|
|
||||||
}
|
List<String> favoredNodesList =
|
||||||
List<String> favoredNodesList = (favoredNodes == null) ? null
|
(favoredNodes == null) ? Collections.emptyList()
|
||||||
: Arrays.asList(favoredNodes);
|
: Arrays.asList(favoredNodes);
|
||||||
|
|
||||||
// choose targets for the new block to be allocated.
|
// choose targets for the new block to be allocated.
|
||||||
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
|
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
|
||||||
excludedNodesSet, r.blockSize,
|
excludedNodesSet, r.blockSize,
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyByte;
|
||||||
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Matchers.anyList;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.anySet;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.ValidateAddBlockResult;
|
||||||
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
|
public class TestFSDirWriteFileOp {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testIgnoreClientLocality() throws IOException {
|
||||||
|
ValidateAddBlockResult addBlockResult =
|
||||||
|
new ValidateAddBlockResult(1024L, 3, (byte) 0x01, null, null, null);
|
||||||
|
|
||||||
|
EnumSet<AddBlockFlag> addBlockFlags =
|
||||||
|
EnumSet.of(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
|
||||||
|
|
||||||
|
BlockManager bmMock = mock(BlockManager.class);
|
||||||
|
|
||||||
|
ArgumentCaptor<Node> nodeCaptor = ArgumentCaptor.forClass(Node.class);
|
||||||
|
|
||||||
|
when(bmMock.chooseTarget4NewBlock(anyString(), anyInt(), any(), anySet(),
|
||||||
|
anyLong(), anyList(), anyByte(), any(), any(), any())).thenReturn(null);
|
||||||
|
|
||||||
|
FSDirWriteFileOp.chooseTargetForNewBlock(bmMock, "localhost", null, null,
|
||||||
|
addBlockFlags, addBlockResult);
|
||||||
|
|
||||||
|
// There should be no other interactions with the block manager when the
|
||||||
|
// IGNORE_CLIENT_LOCALITY is passed in because there is no need to discover
|
||||||
|
// the local node requesting the new block
|
||||||
|
verify(bmMock, times(1)).chooseTarget4NewBlock(anyString(), anyInt(),
|
||||||
|
nodeCaptor.capture(), anySet(), anyLong(), anyList(), anyByte(), any(),
|
||||||
|
any(), any());
|
||||||
|
|
||||||
|
verifyNoMoreInteractions(bmMock);
|
||||||
|
|
||||||
|
assertNull(
|
||||||
|
"Source node was assigned a value. Expected 'null' value because "
|
||||||
|
+ "chooseTarget was flagged to ignore source node locality",
|
||||||
|
nodeCaptor.getValue());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue