From 3210b3d8aa7f1927ac2ef5f2d9e5d83969ae2c48 Mon Sep 17 00:00:00 2001 From: Daniel Templeton Date: Tue, 24 Jul 2018 15:34:19 -0700 Subject: [PATCH] HDFS-13448. HDFS Block Placement - Ignore Locality for First Block Replica (Contributed by BELUGA BEHR via Daniel Templeton) Change-Id: I965d1cfa642ad24296038b83e3d5c9983545267d (cherry picked from commit 849c45db187224095b13fe297a4d7377fbb9d2cd) (cherry picked from commit 00c476abd8f1d34414b646219856859477558458) --- .../java/org/apache/hadoop/fs/CreateFlag.java | 9 ++- .../org/apache/hadoop/hdfs/AddBlockFlag.java | 11 ++- .../apache/hadoop/hdfs/DFSOutputStream.java | 3 + .../hadoop/hdfs/DistributedFileSystem.java | 11 +++ .../main/proto/ClientNamenodeProtocol.proto | 1 + .../BlockPlacementPolicyDefault.java | 4 +- .../server/namenode/FSDirWriteFileOp.java | 30 ++++--- .../server/namenode/TestFSDirWriteFileOp.java | 79 +++++++++++++++++++ 8 files changed, 134 insertions(+), 14 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirWriteFileOp.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java index 383d65a06a3..c3e088b66d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java @@ -116,7 +116,14 @@ public enum CreateFlag { * Enforce the file to be a replicated file, no matter what its parent * directory's replication or erasure coding policy is. */ - SHOULD_REPLICATE((short) 0x80); + SHOULD_REPLICATE((short) 0x80), + + /** + * Advise that the first block replica NOT take into account DataNode + * locality. The first block replica should be placed randomly within the + * cluster. Subsequent block replicas should follow DataNode locality rules. + */ + IGNORE_CLIENT_LOCALITY((short) 0x100); private final short mode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java index 6a0805bb71b..b0686d7c4b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java @@ -36,7 +36,16 @@ public enum AddBlockFlag { * * @see CreateFlag#NO_LOCAL_WRITE */ - NO_LOCAL_WRITE((short) 0x01); + NO_LOCAL_WRITE((short) 0x01), + + /** + * Advise that the first block replica NOT take into account DataNode + * locality. The first block replica should be placed randomly within the + * cluster. Subsequent block replicas should follow DataNode locality rules. + * + * @see CreateFlag#IGNORE_CLIENT_LOCALITY + */ + IGNORE_CLIENT_LOCALITY((short) 0x02); private final short mode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 9734752052a..e9770548a7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -201,6 +201,9 @@ public class DFSOutputStream extends FSOutputSummer if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); } + if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) { + this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY); + } if (progress != null) { DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " +"{}", src); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index c2cd4d74b63..84d840f623d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -2987,6 +2987,17 @@ public class DistributedFileSystem extends FileSystem return this; } + /** + * Advise that the first block replica be written without regard to the + * client locality. + * + * @see CreateFlag for the details. + */ + public HdfsDataOutputStreamBuilder ignoreClientLocality() { + getFlags().add(CreateFlag.IGNORE_CLIENT_LOCALITY); + return this; + } + @VisibleForTesting @Override protected EnumSet getFlags() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 5c30f2c4fa1..fad814e8945 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -167,6 +167,7 @@ message AbandonBlockResponseProto { // void response enum AddBlockFlagProto { NO_LOCAL_WRITE = 1; // avoid writing to local node. + IGNORE_CLIENT_LOCALITY = 2; // write to a random node } message AddBlockRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 25c7cfc21e3..e2a9c5519f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -275,7 +275,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { if (avoidLocalNode) { results = new ArrayList<>(chosenStorage); Set excludedNodeCopy = new HashSet<>(excludedNodes); - excludedNodeCopy.add(writer); + if (writer != null) { + excludedNodeCopy.add(writer); + } localNode = chooseTarget(numOfReplicas, writer, excludedNodeCopy, blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 1c310563122..68a00326013 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -270,19 +270,27 @@ class FSDirWriteFileOp { BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[] favoredNodes, EnumSet flags, ValidateAddBlockResult r) throws IOException { - Node clientNode = bm.getDatanodeManager() - .getDatanodeByHost(r.clientMachine); - if (clientNode == null) { - clientNode = getClientNode(bm, r.clientMachine); + Node clientNode = null; + + boolean ignoreClientLocality = (flags != null + && flags.contains(AddBlockFlag.IGNORE_CLIENT_LOCALITY)); + + // If client locality is ignored, clientNode remains 'null' to indicate + if (!ignoreClientLocality) { + clientNode = bm.getDatanodeManager().getDatanodeByHost(r.clientMachine); + if (clientNode == null) { + clientNode = getClientNode(bm, r.clientMachine); + } } - Set excludedNodesSet = null; - if (excludedNodes != null) { - excludedNodesSet = new HashSet<>(excludedNodes.length); - Collections.addAll(excludedNodesSet, excludedNodes); - } - List favoredNodesList = (favoredNodes == null) ? null - : Arrays.asList(favoredNodes); + Set excludedNodesSet = + (excludedNodes == null) ? new HashSet<>() + : new HashSet<>(Arrays.asList(excludedNodes)); + + List favoredNodesList = + (favoredNodes == null) ? Collections.emptyList() + : Arrays.asList(favoredNodes); + // choose targets for the new block to be allocated. return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, excludedNodesSet, r.blockSize, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirWriteFileOp.java new file mode 100644 index 00000000000..762fa61dd6d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirWriteFileOp.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyByte; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anySet; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.ValidateAddBlockResult; +import org.apache.hadoop.net.Node; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestFSDirWriteFileOp { + + @Test + @SuppressWarnings("unchecked") + public void testIgnoreClientLocality() throws IOException { + ValidateAddBlockResult addBlockResult = + new ValidateAddBlockResult(1024L, 3, (byte) 0x01, null, null, null); + + EnumSet addBlockFlags = + EnumSet.of(AddBlockFlag.IGNORE_CLIENT_LOCALITY); + + BlockManager bmMock = mock(BlockManager.class); + + ArgumentCaptor nodeCaptor = ArgumentCaptor.forClass(Node.class); + + when(bmMock.chooseTarget4NewBlock(anyString(), anyInt(), any(), anySet(), + anyLong(), anyList(), anyByte(), any(), any(), any())).thenReturn(null); + + FSDirWriteFileOp.chooseTargetForNewBlock(bmMock, "localhost", null, null, + addBlockFlags, addBlockResult); + + // There should be no other interactions with the block manager when the + // IGNORE_CLIENT_LOCALITY is passed in because there is no need to discover + // the local node requesting the new block + verify(bmMock, times(1)).chooseTarget4NewBlock(anyString(), anyInt(), + nodeCaptor.capture(), anySet(), anyLong(), anyList(), anyByte(), any(), + any(), any()); + + verifyNoMoreInteractions(bmMock); + + assertNull( + "Source node was assigned a value. Expected 'null' value because " + + "chooseTarget was flagged to ignore source node locality", + nodeCaptor.getValue()); + } +}