From 4289cb8b36bcb96510b9e63e3e966e306c6e3893 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Wed, 27 Apr 2016 14:22:51 -0700 Subject: [PATCH] HDFS-3702. Add an option for NOT writing the blocks locally if there is a datanode on the same box as the client. (Contributed by Lei (Eddy) Xu) (cherry picked from commit 0a152103f19a3e8e1b7f33aeb9dd115ba231d7b7) --- .../java/org/apache/hadoop/fs/CreateFlag.java | 9 ++- .../org/apache/hadoop/hdfs/AddBlockFlag.java | 59 ++++++++++++++++++ .../apache/hadoop/hdfs/DFSOutputStream.java | 19 ++++-- .../org/apache/hadoop/hdfs/DataStreamer.java | 18 ++++-- .../hadoop/hdfs/protocol/ClientProtocol.java | 5 +- .../ClientNamenodeProtocolTranslatorPB.java | 8 ++- .../hdfs/protocolPB/PBHelperClient.java | 27 ++++++++ .../main/proto/ClientNamenodeProtocol.proto | 5 ++ ...amenodeProtocolServerSideTranslatorPB.java | 6 +- .../server/blockmanagement/BlockManager.java | 13 ++-- .../blockmanagement/BlockPlacementPolicy.java | 11 +++- .../BlockPlacementPolicyDefault.java | 51 +++++++++++---- .../blockmanagement/ReplicationWork.java | 2 +- .../server/namenode/FSDirWriteFileOp.java | 9 +-- .../hdfs/server/namenode/FSNamesystem.java | 6 +- .../server/namenode/NameNodeRpcServer.java | 7 ++- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../hadoop/hdfs/TestBlockStoragePolicy.java | 6 +- .../hadoop/hdfs/TestDFSClientRetries.java | 12 +++- .../hadoop/hdfs/TestDFSOutputStream.java | 62 +++++++++++++++++-- .../apache/hadoop/hdfs/TestFileCreation.java | 4 +- .../BaseReplicationPolicyTest.java | 2 +- ...estAvailableSpaceBlockPlacementPolicy.java | 2 +- .../TestReplicationPolicy.java | 47 ++++++++++++-- .../TestReplicationPolicyConsiderLoad.java | 4 +- .../TestReplicationPolicyWithNodeGroup.java | 8 ++- ...estReplicationPolicyWithUpgradeDomain.java | 2 +- .../namenode/NNThroughputBenchmark.java | 2 +- .../server/namenode/TestAddBlockRetry.java | 8 +-- ...BlockPlacementPolicyRackFaultTolerant.java | 4 +- .../server/namenode/TestDeadDatanode.java | 3 +- .../TestDefaultBlockPlacementPolicy.java | 2 +- .../hdfs/server/namenode/TestDeleteRace.java | 7 ++- ...TestUpgradeDomainBlockPlacementPolicy.java | 2 +- .../server/namenode/ha/TestHASafeMode.java | 2 +- .../snapshot/TestOpenFilesWithSnapshot.java | 2 +- .../snapshot/TestSnapshotBlocksMap.java | 6 +- 37 files changed, 359 insertions(+), 85 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java index 539b5113d39..d480fc9f4c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java @@ -103,7 +103,14 @@ public enum CreateFlag { * Append data to a new block instead of the end of the last partial block. * This is only useful for APPEND. */ - NEW_BLOCK((short) 0x20); + NEW_BLOCK((short) 0x20), + + /** + * Advise that a block replica NOT be written to the local DataNode where + * 'local' means the same host as the client is being run on. + */ + @InterfaceAudience.LimitedPrivate({"HBase"}) + NO_LOCAL_WRITE((short) 0x40); private final short mode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java new file mode 100644 index 00000000000..6a0805bb71b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CreateFlag; + +/** + * AddBlockFlag provides hints for new block allocation and placement. + * Users can use this flag to control per DFSOutputStream + * {@see ClientProtocol#addBlock()} behavior. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public enum AddBlockFlag { + + /** + * Advise that a block replica NOT be written to the local DataNode where + * 'local' means the same host as the client is being run on. + * + * @see CreateFlag#NO_LOCAL_WRITE + */ + NO_LOCAL_WRITE((short) 0x01); + + private final short mode; + + AddBlockFlag(short mode) { + this.mode = mode; + } + + public static AddBlockFlag valueOf(short mode) { + for (AddBlockFlag flag : AddBlockFlag.values()) { + if (flag.getMode() == mode) { + return flag; + } + } + return null; + } + + public short getMode() { + return mode; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 18509f8127c..472c41f4c33 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -116,6 +116,7 @@ public class DFSOutputStream extends FSOutputSummer private long initialFileSize = 0; // at time of file open private final short blockReplication; // replication factor of file protected boolean shouldSyncBlock = false; // force blocks to disk upon close + private final EnumSet addBlockFlags; protected final AtomicReference cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; @@ -178,6 +179,7 @@ public class DFSOutputStream extends FSOutputSummer } private DFSOutputStream(DFSClient dfsClient, String src, + EnumSet flag, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) { super(getChecksum4Compute(checksum, stat)); this.dfsClient = dfsClient; @@ -188,6 +190,10 @@ public class DFSOutputStream extends FSOutputSummer this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.cachingStrategy = new AtomicReference<>( dfsClient.getDefaultWriteCachingStrategy()); + this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class); + if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { + this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); + } if (progress != null) { DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " +"{}", src); @@ -211,14 +217,14 @@ public class DFSOutputStream extends FSOutputSummer protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { - this(dfsClient, src, progress, stat, checksum); + this(dfsClient, src, flag, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, - cachingStrategy, byteArrayManager, favoredNodes); + cachingStrategy, byteArrayManager, favoredNodes, addBlockFlags); } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, @@ -280,7 +286,7 @@ public class DFSOutputStream extends FSOutputSummer EnumSet flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { - this(dfsClient, src, progress, stat, checksum); + this(dfsClient, src, flags, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); @@ -301,7 +307,8 @@ public class DFSOutputStream extends FSOutputSummer bytesPerChecksum); streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src, - progress, checksum, cachingStrategy, byteArrayManager, favoredNodes); + progress, checksum, cachingStrategy, byteArrayManager, favoredNodes, + addBlockFlags); } } @@ -848,6 +855,10 @@ public class DFSOutputStream extends FSOutputSummer return initialFileSize; } + protected EnumSet getAddBlockFlags() { + return addBlockFlags; + } + /** * @return the FileEncryptionInfo for this stream, or null if not encrypted. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 9ccb89a0058..0d8cc662942 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -33,6 +33,7 @@ import java.net.Socket; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -392,12 +393,15 @@ class DataStreamer extends Daemon { private final LoadingCache excludedNodes; private final String[] favoredNodes; + private final EnumSet addBlockFlags; private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage, - boolean isAppend, String[] favoredNodes) { + boolean isAppend, String[] favoredNodes, + EnumSet flags) { + this.block = block; this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -408,11 +412,11 @@ class DataStreamer extends Daemon { this.isLazyPersistFile = isLazyPersist(stat); this.isAppend = isAppend; this.favoredNodes = favoredNodes; - final DfsClientConf conf = dfsClient.getConf(); this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs(); this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); + this.addBlockFlags = flags; } /** @@ -421,9 +425,10 @@ class DataStreamer extends Daemon { DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, - ByteArrayManager byteArrayManage, String[] favoredNodes) { + ByteArrayManager byteArrayManage, String[] favoredNodes, + EnumSet flags) { this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, false, favoredNodes); + byteArrayManage, false, favoredNodes, flags); this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -438,7 +443,7 @@ class DataStreamer extends Daemon { AtomicReference cachingStrategy, ByteArrayManager byteArrayManage) throws IOException { this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, true, null); + byteArrayManage, true, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); @@ -1643,7 +1648,8 @@ class DataStreamer extends Daemon { while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, stat.getFileId(), favoredNodes); + block, excludedNodes, stat.getFileId(), favoredNodes, + addBlockFlags); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 93d409d0927..a0371dc2cb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -390,6 +391,8 @@ public interface ClientProtocol { * @param fileId the id uniquely identifying a file * @param favoredNodes the list of nodes where the client wants the blocks. * Nodes are identified by either host name or address. + * @param addBlockFlags flags to advise the behavior of allocating and placing + * a new block. * * @return LocatedBlock allocated block information. * @@ -408,7 +411,7 @@ public interface ClientProtocol { @Idempotent LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) + String[] favoredNodes, EnumSet addBlockFlags) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 1bfd6b25446..9a52d9398e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -396,7 +397,8 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) throws IOException { + String[] favoredNodes, EnumSet addBlockFlags) + throws IOException { AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) @@ -406,6 +408,10 @@ public class ClientNamenodeProtocolTranslatorPB implements if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } + if (addBlockFlags != null) { + req.addAllFlags(PBHelperClient.convertAddBlockFlags( + addBlockFlags)); + } try { return PBHelperClient.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 5d81c1acbe1..80f0d4fdde7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -36,6 +36,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.hadoop.crypto.CipherOption; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -94,6 +95,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; @@ -2394,4 +2396,29 @@ public class PBHelperClient { } return Arrays.asList(ret); } + + public static EnumSet convertAddBlockFlags( + List addBlockFlags) { + EnumSet flags = + EnumSet.noneOf(AddBlockFlag.class); + for (AddBlockFlagProto af : addBlockFlags) { + AddBlockFlag flag = AddBlockFlag.valueOf((short)af.getNumber()); + if (flag != null) { + flags.add(flag); + } + } + return flags; + } + + public static List convertAddBlockFlags( + EnumSet flags) { + List ret = new ArrayList<>(); + for (AddBlockFlag flag : flags) { + AddBlockFlagProto abfp = AddBlockFlagProto.valueOf(flag.getMode()); + if (abfp != null) { + ret.add(abfp); + } + } + return ret; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 87a76f25ba7..c0c02f20979 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -161,6 +161,10 @@ message AbandonBlockRequestProto { message AbandonBlockResponseProto { // void response } +enum AddBlockFlagProto { + NO_LOCAL_WRITE = 1; // avoid writing to local node. +} + message AddBlockRequestProto { required string src = 1; required string clientName = 2; @@ -168,6 +172,7 @@ message AddBlockRequestProto { repeated DatanodeInfoProto excludeNodes = 4; optional uint64 fileId = 5 [default = 0]; // default as a bogus id repeated string favoredNodes = 6; //the set of datanodes to use for the block + repeated AddBlockFlagProto flags = 7; // default to empty. } message AddBlockResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 2c19db9e0bd..de5a4dd28c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FsServerDefaults; @@ -500,6 +501,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements try { List excl = req.getExcludeNodesList(); List favor = req.getFavoredNodesList(); + EnumSet flags = + PBHelperClient.convertAddBlockFlags(req.getFlagsList()); LocatedBlock result = server.addBlock( req.getSrc(), req.getClientName(), @@ -507,7 +510,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements (excl == null || excl.size() == 0) ? null : PBHelperClient.convert(excl .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(), (favor == null || favor.size() == 0) ? null : favor - .toArray(new String[favor.size()])); + .toArray(new String[favor.size()]), + flags); return AddBlockResponseProto.newBuilder() .setBlock(PBHelperClient.convert(result)).build(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 6288db9b37c..d811d1df278 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -47,6 +47,8 @@ import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -1591,7 +1593,7 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor clientnode, Set excludes, long blocksize) { return blockplacement.chooseTarget(src, 1, clientnode, Collections.emptyList(), false, excludes, - blocksize, storagePolicySuite.getDefaultPolicy()); + blocksize, storagePolicySuite.getDefaultPolicy(), null); } /** Choose target for getting additional datanodes for an existing pipeline. */ @@ -1605,7 +1607,7 @@ public class BlockManager implements BlockStatsMXBean { final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, - chosen, true, excludes, blocksize, storagePolicy); + chosen, true, excludes, blocksize, storagePolicy, null); } /** @@ -1621,13 +1623,14 @@ public class BlockManager implements BlockStatsMXBean { final Set excludedNodes, final long blocksize, final List favoredNodes, - final byte storagePolicyID) throws IOException { - List favoredDatanodeDescriptors = + final byte storagePolicyID, + final EnumSet flags) throws IOException { + List favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, - favoredDatanodeDescriptors, storagePolicy); + favoredDatanodeDescriptors, storagePolicy, flags); if (targets.length < minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index d35b24623cd..1b614f4c0ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -26,6 +27,7 @@ import java.util.Set; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -66,6 +68,7 @@ public abstract class BlockPlacementPolicy { * @param returnChosenNodes decide if the chosenNodes are returned. * @param excludedNodes datanodes that should not be considered as targets. * @param blocksize size of the data to be written. + * @param flags Block placement flags. * @return array of DatanodeDescriptor instances chosen as target * and sorted as a pipeline. */ @@ -76,7 +79,8 @@ public abstract class BlockPlacementPolicy { boolean returnChosenNodes, Set excludedNodes, long blocksize, - BlockStoragePolicy storagePolicy); + BlockStoragePolicy storagePolicy, + EnumSet flags); /** * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)} @@ -90,14 +94,15 @@ public abstract class BlockPlacementPolicy { Set excludedNodes, long blocksize, List favoredNodes, - BlockStoragePolicy storagePolicy) { + BlockStoragePolicy storagePolicy, + EnumSet flags) { // This class does not provide the functionality of placing // a block in favored datanodes. The implementations of this class // are expected to provide this functionality return chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 63e96c5a328..e936abd9520 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -23,6 +23,7 @@ import java.util.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -111,9 +112,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet flags) { return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } @Override @@ -123,13 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { Set excludedNodes, long blocksize, List favoredNodes, - BlockStoragePolicy storagePolicy) { + BlockStoragePolicy storagePolicy, + EnumSet flags) { try { if (favoredNodes == null || favoredNodes.size() == 0) { // Favored nodes not specified, fall back to regular block placement. return chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } Set favoriteAndExcludedNodes = excludedNodes == null ? @@ -164,7 +167,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { DatanodeStorageInfo[] remainingTargets = chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - favoriteAndExcludedNodes, blocksize, storagePolicy); + favoriteAndExcludedNodes, blocksize, storagePolicy, flags); for (int i = 0; i < remainingTargets.length; i++) { results.add(remainingTargets[i]); } @@ -179,7 +182,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Fall back to regular block placement disregarding favored nodes hint return chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } } @@ -213,7 +216,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet addBlockFlags) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return DatanodeStorageInfo.EMPTY_ARRAY; } @@ -226,17 +230,42 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { numOfReplicas = result[0]; int maxNodesPerRack = result[1]; - final List results = new ArrayList<>(chosenStorage); for (DatanodeStorageInfo storage : chosenStorage) { // add localMachine and related nodes to excludedNodes addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes); } + List results = null; + Node localNode = null; boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); - final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, - EnumSet.noneOf(StorageType.class), results.isEmpty()); + boolean avoidLocalNode = (addBlockFlags != null + && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE) + && writer != null + && !excludedNodes.contains(writer)); + // Attempt to exclude local node if the client suggests so. If no enough + // nodes can be obtained, it falls back to the default block placement + // policy. + if (avoidLocalNode) { + results = new ArrayList<>(chosenStorage); + Set excludedNodeCopy = new HashSet<>(excludedNodes); + excludedNodeCopy.add(writer); + localNode = chooseTarget(numOfReplicas, writer, + excludedNodeCopy, blocksize, maxNodesPerRack, results, + avoidStaleNodes, storagePolicy, + EnumSet.noneOf(StorageType.class), results.isEmpty()); + if (results.size() < numOfReplicas) { + // not enough nodes; discard results and fall back + results = null; + } + } + if (results == null) { + results = new ArrayList<>(chosenStorage); + localNode = chooseTarget(numOfReplicas, writer, excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes, + storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty()); + } + if (!returnChosenNodes) { results.removeAll(chosenStorage); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java index f8a6dad1d1d..258dfddafeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java @@ -55,7 +55,7 @@ class ReplicationWork { targets = blockplacement.chooseTarget(bc.getName(), additionalReplRequired, srcNode, liveReplicaStorages, false, excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + storagePolicySuite.getPolicy(bc.getStoragePolicyID()), null); } finally { srcNode.decrementPendingReplicationWithoutTargets(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 10cb5558c40..683d3b5dfe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -23,6 +23,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; @@ -265,8 +266,9 @@ class FSDirWriteFileOp { } static DatanodeStorageInfo[] chooseTargetForNewBlock( - BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[] - favoredNodes, ValidateAddBlockResult r) throws IOException { + BlockManager bm, String src, DatanodeInfo[] excludedNodes, + String[] favoredNodes, EnumSet flags, + ValidateAddBlockResult r) throws IOException { Node clientNode = bm.getDatanodeManager() .getDatanodeByHost(r.clientMachine); if (clientNode == null) { @@ -280,11 +282,10 @@ class FSDirWriteFileOp { } List favoredNodesList = (favoredNodes == null) ? null : Arrays.asList(favoredNodes); - // choose targets for the new block to be allocated. return bm.chooseTarget4NewBlock(src, r.replication, clientNode, excludedNodesSet, r.blockSize, - favoredNodesList, r.storagePolicyID); + favoredNodesList, r.storagePolicyID, flags); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index faebc89f296..5f82d99ce18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -141,6 +141,7 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.CryptoCodec; import org.apache.hadoop.crypto.key.KeyProvider.Metadata; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@ -2442,7 +2443,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, */ LocatedBlock getAdditionalBlock( String src, long fileId, String clientName, ExtendedBlock previous, - DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException { + DatanodeInfo[] excludedNodes, String[] favoredNodes, + EnumSet flags) throws IOException { NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" + " for {}", src, fileId, clientName); @@ -2466,7 +2468,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock( - blockManager, src, excludedNodes, favoredNodes, r); + blockManager, src, excludedNodes, favoredNodes, flags, r); checkOperation(OperationCategory.WRITE); writeLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 55c96098afc..4f628840c79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.ContentSummary; @@ -832,11 +833,11 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, - String[] favoredNodes) + String[] favoredNodes, EnumSet addBlockFlags) throws IOException { checkNNStartup(); LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, - clientName, previous, excludedNodes, favoredNodes); + clientName, previous, excludedNodes, favoredNodes, addBlockFlags); if (locatedBlock != null) { metrics.incrAddBlockOps(); } @@ -1143,7 +1144,7 @@ class NameNodeRpcServer implements NamenodeProtocols { DatanodeInfo results[] = namesystem.datanodeReport(type); return results; } - + @Override // ClientProtocol public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 0d97369e76b..0aa3d05fd04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1932,7 +1932,7 @@ public class DFSTestUtil { String clientName, ExtendedBlock previous, int len) throws Exception { fs.getClient().namenode.addBlock(file, clientName, previous, null, - fileNode.getId(), null); + fileNode.getId(), null, null); final BlockInfo lastBlock = fileNode.getLastBlock(); final int groupSize = fileNode.getPreferredBlockReplication(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index 53b80388daf..861bb2c90e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -1237,12 +1237,12 @@ public class TestBlockStoragePolicy { DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.emptyList(), false, - new HashSet(), 0, policy1); + new HashSet(), 0, policy1, null); System.out.println(Arrays.asList(targets)); Assert.assertEquals(3, targets.length); targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.emptyList(), false, - new HashSet(), 0, policy2); + new HashSet(), 0, policy2, null); System.out.println(Arrays.asList(targets)); Assert.assertEquals(3, targets.length); } @@ -1308,7 +1308,7 @@ public class TestBlockStoragePolicy { DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.emptyList(), false, - new HashSet(), 0, policy); + new HashSet(), 0, policy, null); System.out.println(policy.getName() + ": " + Arrays.asList(targets)); Assert.assertEquals(2, targets.length); Assert.assertEquals(StorageType.SSD, targets[0].getStorageType()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index d7b7b410ed1..6325957276b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -44,6 +44,7 @@ import java.net.URI; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -96,6 +97,7 @@ import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.invocation.InvocationOnMock; @@ -252,7 +254,9 @@ public class TestDFSClientRetries { anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), - anyLong(), any(String[].class))).thenAnswer(answer); + anyLong(), any(String[].class), + Matchers.>any())) + .thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( @@ -471,7 +475,8 @@ public class TestDFSClientRetries { } }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong(), Mockito. any()); + Mockito.anyLong(), Mockito. any(), + Mockito.> any()); doAnswer(new Answer() { @@ -513,7 +518,8 @@ public class TestDFSClientRetries { Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock( Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong(), Mockito. any()); + Mockito.anyLong(), Mockito. any(), + Mockito.> any()); Mockito.verify(spyNN, Mockito.atLeastOnce()).complete( Mockito.anyString(), Mockito.anyString(), Mockito.any(), anyLong()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index a404ac8174f..d9df1ff4073 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -22,19 +22,29 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.EnumSet; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; +import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.htrace.core.SpanId; import org.junit.AfterClass; import org.junit.Assert; @@ -42,8 +52,11 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class TestDFSOutputStream { @@ -52,7 +65,7 @@ public class TestDFSOutputStream { @BeforeClass public static void setup() throws IOException { Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); } /** @@ -80,7 +93,7 @@ public class TestDFSOutputStream { try { dos.close(); } catch (IOException e) { - Assert.assertEquals(e, dummy); + assertEquals(e, dummy); } thrown = (Throwable) Whitebox.getInternalState(ex, "thrown"); Assert.assertNull(thrown); @@ -127,7 +140,7 @@ public class TestDFSOutputStream { mock(HdfsFileStatus.class), mock(ExtendedBlock.class), client, - "foo", null, null, null, null, null); + "foo", null, null, null, null, null, null); DataOutputStream blockStream = mock(DataOutputStream.class); doThrow(new IOException()).when(blockStream).flush(); @@ -148,6 +161,47 @@ public class TestDFSOutputStream { Assert.assertTrue(congestedNodes.isEmpty()); } + @Test + public void testNoLocalWriteFlag() throws IOException { + DistributedFileSystem fs = cluster.getFileSystem(); + EnumSet flags = EnumSet.of(CreateFlag.NO_LOCAL_WRITE, + CreateFlag.CREATE); + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeManager dm = bm.getDatanodeManager(); + try(FSDataOutputStream os = fs.create(new Path("/test-no-local"), + FsPermission.getDefault(), + flags, 512, (short)2, 512, null)) { + // Inject a DatanodeManager that returns one DataNode as local node for + // the client. + DatanodeManager spyDm = spy(dm); + DatanodeDescriptor dn1 = dm.getDatanodeListForReport + (HdfsConstants.DatanodeReportType.LIVE).get(0); + doReturn(dn1).when(spyDm).getDatanodeByHost("127.0.0.1"); + Whitebox.setInternalState(bm, "datanodeManager", spyDm); + byte[] buf = new byte[512 * 16]; + new Random().nextBytes(buf); + os.write(buf); + } finally { + Whitebox.setInternalState(bm, "datanodeManager", dm); + } + cluster.triggerBlockReports(); + final String bpid = cluster.getNamesystem().getBlockPoolId(); + // Total number of DataNodes is 3. + assertEquals(3, cluster.getAllBlockReports(bpid).size()); + int numDataNodesWithData = 0; + for (Map dnBlocks : + cluster.getAllBlockReports(bpid)) { + for (BlockListAsLongs blocks : dnBlocks.values()) { + if (blocks.getNumberOfBlocks() > 0) { + numDataNodesWithData++; + break; + } + } + } + // Verify that only one DN has no data. + assertEquals(1, 3 - numDataNodesWithData); + } + @AfterClass public static void tearDown() { if (cluster != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 05c98ac3e8b..e47c8b13e1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -531,7 +531,7 @@ public class TestFileCreation { // add one block to the file LocatedBlock location = client.getNamenode().addBlock(file1.toString(), - client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); + client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null); System.out.println("testFileCreationError2: " + "Added block " + location.getBlock()); @@ -582,7 +582,7 @@ public class TestFileCreation { createFile(dfs, f, 3); try { cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName, - null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); + null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null); fail(); } catch(IOException ioe) { FileSystem.LOG.info("GOOD!", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java index 7dc52fa5fda..99986e6ac84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -156,6 +156,6 @@ abstract public class BaseReplicationPolicyTest { Set excludedNodes) { return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, false, excludedNodes, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java index f1e4e1cc612..a5090ccbbdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java @@ -142,7 +142,7 @@ public class TestAvailableSpaceBlockPlacementPolicy { .getBlockManager() .getBlockPlacementPolicy() .chooseTarget(file, replica, null, new ArrayList(), false, null, - blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); Assert.assertTrue(targets.length == replica); for (int j = 0; j < replica; j++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 2bfd3857460..21839c67fa8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -38,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -290,7 +293,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { excludedNodes.add(dataNodes[1]); chosenNodes.add(storages[2]); targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); //make sure that the chosen node is in the target. @@ -667,7 +671,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { .getNamesystem().getBlockManager().getBlockPlacementPolicy(); DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3, staleNodeInfo, new ArrayList(), false, null, - BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); assertEquals(targets.length, 3); assertFalse(isOnSameRack(targets[0], staleNodeInfo)); @@ -693,7 +698,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Call chooseTarget targets = replicator.chooseTarget(filename, 3, staleNodeInfo, new ArrayList(), false, null, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); assertEquals(targets.length, 3); assertTrue(isOnSameRack(targets[0], staleNodeInfo)); @@ -1490,8 +1495,42 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, Set excludedNodes, List favoredNodes) { + return chooseTarget(numOfReplicas, writer, excludedNodes, + favoredNodes, null); + } + + private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, Set excludedNodes, + List favoredNodes, EnumSet flags) { return replicator.chooseTarget(filename, numOfReplicas, writer, excludedNodes, BLOCK_SIZE, favoredNodes, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, flags); + } + + @Test + public void testAvoidLocalWrite() throws IOException { + DatanodeDescriptor writer = dataNodes[2]; + EnumSet flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE); + DatanodeStorageInfo[] targets; + targets = chooseTarget(5, writer, null, null, flags); + for (DatanodeStorageInfo info : targets) { + assertNotEquals(info.getDatanodeDescriptor(), writer); + } + } + + @Test + public void testAvoidLocalWriteNoEnoughNodes() throws IOException { + DatanodeDescriptor writer = dataNodes[2]; + EnumSet flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE); + DatanodeStorageInfo[] targets; + targets = chooseTarget(6, writer, null, null, flags); + assertEquals(6, targets.length); + boolean found = false; + for (DatanodeStorageInfo info : targets) { + if (info.getDatanodeDescriptor().equals(writer)) { + found = true; + } + } + assertTrue(found); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index 123e6354d1e..1992fcba842 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -112,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn, new ArrayList(), false, null, - 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); assertEquals(3, targets.length); Set targetSet = new HashSet<>( @@ -170,7 +170,7 @@ public class TestReplicationPolicyConsiderLoad DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn, new ArrayList(), false, null, - 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); for(DatanodeStorageInfo info : targets) { assertTrue("The node "+info.getDatanodeDescriptor().getName()+ " has higher load and should not have been picked!", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index edcab1000c8..1fb46f9a6bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -269,7 +269,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes List favoredNodes) { return replicator.chooseTarget(filename, numOfReplicas, writer, excludedNodes, BLOCK_SIZE, favoredNodes, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); } /** @@ -351,7 +351,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes Set excludedNodes = new HashSet<>(); excludedNodes.add(dataNodes[1]); targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); assertEquals(targets.length, 4); assertEquals(storages[0], targets[0]); @@ -369,7 +370,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes excludedNodes.add(dataNodes[1]); chosenNodes.add(storages[2]); targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); //make sure that the chosen node is in the target. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index c939220701f..c157ed1e185 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -187,7 +187,7 @@ public class TestReplicationPolicyWithUpgradeDomain chosenNodes.add(storages[2]); targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, excludedNodes, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index c93fd89d3d6..67445719754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -1204,7 +1204,7 @@ public class NNThroughputBenchmark implements Tool { for (int i = 0; i < 30; i++) { try { return clientProto.addBlock(src, clientName, - previous, excludeNodes, fileId, favoredNodes); + previous, excludeNodes, fileId, favoredNodes, null); } catch (NotReplicatedYetException|RemoteException e) { if (e instanceof RemoteException) { String className = ((RemoteException) e).getClassName(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index a8cd5b9a825..94abe3ef837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -101,13 +101,13 @@ public class TestAddBlockRetry { ns.readUnlock();; } DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock( - ns.getBlockManager(), src, null, null, r); + ns.getBlockManager(), src, null, null, null, r); assertNotNull("Targets must be generated", targets); // run second addBlock() LOG.info("Starting second addBlock for " + src); nn.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertTrue("Penultimate block must be complete", checkFileProgress(src, false)); LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); @@ -161,14 +161,14 @@ public class TestAddBlockRetry { // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertTrue("Block locations should be present", lb1.getLocations().length > 0); cluster.restartNameNode(); nameNodeRpc = cluster.getNameNodeRpc(); LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); assertTrue("Wrong locations with retry", lb2.getLocations().length > 0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java index 9f844d7fc7e..f40c464e24d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java @@ -113,7 +113,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); doTestLocatedBlock(replication, locatedBlock); //test chooseTarget for existing file. @@ -143,7 +143,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); doTestLocatedBlock(20, locatedBlock); DatanodeInfo[] locs = locatedBlock.getLocations(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 442873d8106..1455dae8596 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -165,7 +165,8 @@ public class TestDeadDatanode { // choose the targets, but local node should not get selected as this is not // part of the cluster anymore DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3, - clientNode, new HashSet(), 256 * 1024 * 1024L, null, (byte) 7); + clientNode, new HashSet(), 256 * 1024 * 1024L, null, (byte) 7, + null); for (DatanodeStorageInfo datanodeStorageInfo : results) { assertFalse("Dead node should not be choosen", datanodeStorageInfo .getDatanodeDescriptor().equals(clientNode)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index a54040b2e8d..1a10b7a5351 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -140,7 +140,7 @@ public class TestDefaultBlockPlacementPolicy { clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java index 7d4eb31bd31..15f697a9a06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.FileNotFoundException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -123,10 +125,11 @@ public class TestDeleteRace { boolean returnChosenNodes, Set excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet flags) { DatanodeStorageInfo[] results = super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes, - blocksize, storagePolicy); + blocksize, storagePolicy, flags); try { Thread.sleep(3000); } catch (InterruptedException e) {} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java index cc14fcbb04b..f9a2503b004 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java @@ -152,7 +152,7 @@ public class TestUpgradeDomainBlockPlacementPolicy { clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index 8b8343c7c7c..ee8ee902386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -837,7 +837,7 @@ public class TestHASafeMode { new ExtendedBlock(previousBlock), new DatanodeInfo[0], DFSClientAdapter.getFileId((DFSOutputStream) create - .getWrappedStream()), null); + .getWrappedStream()), null, null); cluster.restartNameNode(0, true); cluster.restartDataNode(0); cluster.transitionToActive(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java index 694d15e63f3..812bcc5b386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java @@ -198,7 +198,7 @@ public class TestOpenFilesWithSnapshot { String clientName = fs.getClient().getClientName(); // create one empty block nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); fs.createSnapshot(path, "s2"); fs.rename(new Path("/test/test"), new Path("/test/test-renamed")); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java index 3f06d3d9936..44e8b358ec9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java @@ -282,7 +282,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); @@ -319,7 +319,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); @@ -358,7 +358,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");