diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java index 539b5113d39..d480fc9f4c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java @@ -103,7 +103,14 @@ public enum CreateFlag { * Append data to a new block instead of the end of the last partial block. * This is only useful for APPEND. */ - NEW_BLOCK((short) 0x20); + NEW_BLOCK((short) 0x20), + + /** + * Advise that a block replica NOT be written to the local DataNode where + * 'local' means the same host as the client is being run on. + */ + @InterfaceAudience.LimitedPrivate({"HBase"}) + NO_LOCAL_WRITE((short) 0x40); private final short mode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java new file mode 100644 index 00000000000..6a0805bb71b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CreateFlag; + +/** + * AddBlockFlag provides hints for new block allocation and placement. + * Users can use this flag to control per DFSOutputStream + * {@see ClientProtocol#addBlock()} behavior. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public enum AddBlockFlag { + + /** + * Advise that a block replica NOT be written to the local DataNode where + * 'local' means the same host as the client is being run on. + * + * @see CreateFlag#NO_LOCAL_WRITE + */ + NO_LOCAL_WRITE((short) 0x01); + + private final short mode; + + AddBlockFlag(short mode) { + this.mode = mode; + } + + public static AddBlockFlag valueOf(short mode) { + for (AddBlockFlag flag : AddBlockFlag.values()) { + if (flag.getMode() == mode) { + return flag; + } + } + return null; + } + + public short getMode() { + return mode; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 0f8279943d2..cc919da812b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -117,6 +117,7 @@ public class DFSOutputStream extends FSOutputSummer private long initialFileSize = 0; // at time of file open private final short blockReplication; // replication factor of file protected boolean shouldSyncBlock = false; // force blocks to disk upon close + private final EnumSet addBlockFlags; protected final AtomicReference cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; @@ -179,6 +180,7 @@ public class DFSOutputStream extends FSOutputSummer } private DFSOutputStream(DFSClient dfsClient, String src, + EnumSet flag, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) { super(getChecksum4Compute(checksum, stat)); this.dfsClient = dfsClient; @@ -189,6 +191,10 @@ public class DFSOutputStream extends FSOutputSummer this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.cachingStrategy = new AtomicReference<>( dfsClient.getDefaultWriteCachingStrategy()); + this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class); + if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { + this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); + } if (progress != null) { DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " +"{}", src); @@ -212,7 +218,7 @@ public class DFSOutputStream extends FSOutputSummer protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, DataChecksum checksum, String[] favoredNodes, boolean createStreamer) { - this(dfsClient, src, progress, stat, checksum); + this(dfsClient, src, flag, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), @@ -220,7 +226,8 @@ public class DFSOutputStream extends FSOutputSummer if (createStreamer) { streamer = new DataStreamer(stat, null, dfsClient, src, progress, - checksum, cachingStrategy, byteArrayManager, favoredNodes); + checksum, cachingStrategy, byteArrayManager, favoredNodes, + addBlockFlags); } } @@ -289,7 +296,7 @@ public class DFSOutputStream extends FSOutputSummer EnumSet flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { - this(dfsClient, src, progress, stat, checksum); + this(dfsClient, src, flags, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); @@ -310,7 +317,8 @@ public class DFSOutputStream extends FSOutputSummer bytesPerChecksum); streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src, - progress, checksum, cachingStrategy, byteArrayManager, favoredNodes); + progress, checksum, cachingStrategy, byteArrayManager, favoredNodes, + addBlockFlags); } } @@ -844,6 +852,10 @@ public class DFSOutputStream extends FSOutputSummer return initialFileSize; } + protected EnumSet getAddBlockFlags() { + return addBlockFlags; + } + /** * @return the FileEncryptionInfo for this stream, or null if not encrypted. */ @@ -916,7 +928,8 @@ public class DFSOutputStream extends FSOutputSummer static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId, - String[] favoredNodes) throws IOException { + String[] favoredNodes, EnumSet allocFlags) + throws IOException { final DfsClientConf conf = dfsClient.getConf(); int retries = conf.getNumBlockWriteLocateFollowingRetry(); long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); @@ -924,7 +937,7 @@ public class DFSOutputStream extends FSOutputSummer while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, - excludedNodes, fileId, favoredNodes); + excludedNodes, fileId, favoredNodes, allocFlags); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index b049286e412..403e50fa94f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -301,7 +301,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { for (short i = 0; i < numAllBlocks; i++) { StripedDataStreamer streamer = new StripedDataStreamer(stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, - favoredNodes, i, coordinator); + favoredNodes, i, coordinator, getAddBlockFlags()); streamers.add(streamer); } currentPackets = new DFSPacket[streamers.size()]; @@ -406,7 +406,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat, dfsClient, src, oldStreamer.progress, oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager, - favoredNodes, i, coordinator); + favoredNodes, i, coordinator, getAddBlockFlags()); streamers.set(i, streamer); currentPackets[i] = null; if (i == currentIndex) { @@ -458,7 +458,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { LOG.debug("Allocating new block group. The previous block group: " + currentBlockGroup); final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src, - currentBlockGroup, fileId, favoredNodes); + currentBlockGroup, fileId, favoredNodes, getAddBlockFlags()); assert lb.isStriped(); if (lb.getLocations().length < numDataBlocks) { throw new IOException("Failed to get " + numDataBlocks diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 9ae443d2d54..4d5f16cbf13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -32,6 +32,7 @@ import java.net.Socket; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -412,13 +413,15 @@ class DataStreamer extends Daemon { protected final LoadingCache excludedNodes; private final String[] favoredNodes; + private final EnumSet addBlockFlags; private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage, - boolean isAppend, String[] favoredNodes) { + boolean isAppend, String[] favoredNodes, + EnumSet flags) { this.block = block; this.dfsClient = dfsClient; this.src = src; @@ -430,11 +433,11 @@ class DataStreamer extends Daemon { this.isLazyPersistFile = isLazyPersist(stat); this.isAppend = isAppend; this.favoredNodes = favoredNodes; - final DfsClientConf conf = dfsClient.getConf(); this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs(); this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); + this.addBlockFlags = flags; } /** @@ -443,9 +446,10 @@ class DataStreamer extends Daemon { DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, - ByteArrayManager byteArrayManage, String[] favoredNodes) { + ByteArrayManager byteArrayManage, String[] favoredNodes, + EnumSet flags) { this(stat, block, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, false, favoredNodes); + byteArrayManage, false, favoredNodes, flags); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -459,7 +463,7 @@ class DataStreamer extends Daemon { AtomicReference cachingStrategy, ByteArrayManager byteArrayManage) { this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, true, null); + byteArrayManage, true, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); @@ -1679,7 +1683,7 @@ class DataStreamer extends Daemon { private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block, - stat.getFileId(), favoredNodes); + stat.getFileId(), favoredNodes, addBlockFlags); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index ebd04d67a1e..8f4efccaa5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import java.util.EnumSet; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; @@ -52,9 +53,10 @@ public class StripedDataStreamer extends DataStreamer { Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage, String[] favoredNodes, - short index, Coordinator coordinator) { + short index, Coordinator coordinator, + final EnumSet flags) { super(stat, null, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, favoredNodes); + byteArrayManage, favoredNodes, flags); this.index = index; this.coordinator = coordinator; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index f524d7c6416..15bbe5161b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -390,6 +391,8 @@ public interface ClientProtocol { * @param fileId the id uniquely identifying a file * @param favoredNodes the list of nodes where the client wants the blocks. * Nodes are identified by either host name or address. + * @param addBlockFlags flags to advise the behavior of allocating and placing + * a new block. * * @return LocatedBlock allocated block information. * @@ -408,7 +411,7 @@ public interface ClientProtocol { @Idempotent LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) + String[] favoredNodes, EnumSet addBlockFlags) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index e3255974bc2..9c2b54d2cab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -404,7 +404,8 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) throws IOException { + String[] favoredNodes, EnumSet addBlockFlags) + throws IOException { AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) @@ -414,6 +415,10 @@ public class ClientNamenodeProtocolTranslatorPB implements if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } + if (addBlockFlags != null) { + req.addAllFlags(PBHelperClient.convertAddBlockFlags( + addBlockFlags)); + } try { return PBHelperClient.convertLocatedBlockProto( rpcProxy.addBlock(null, req.build()).getBlock()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 47593732123..56ec6c01fd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -37,6 +37,7 @@ import com.google.protobuf.CodedInputStream; import org.apache.hadoop.crypto.CipherOption; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -97,6 +98,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; @@ -2528,4 +2530,29 @@ public class PBHelperClient { } return builder.build(); } + + public static EnumSet convertAddBlockFlags( + List addBlockFlags) { + EnumSet flags = + EnumSet.noneOf(AddBlockFlag.class); + for (AddBlockFlagProto af : addBlockFlags) { + AddBlockFlag flag = AddBlockFlag.valueOf((short)af.getNumber()); + if (flag != null) { + flags.add(flag); + } + } + return flags; + } + + public static List convertAddBlockFlags( + EnumSet flags) { + List ret = new ArrayList<>(); + for (AddBlockFlag flag : flags) { + AddBlockFlagProto abfp = AddBlockFlagProto.valueOf(flag.getMode()); + if (abfp != null) { + ret.add(abfp); + } + } + return ret; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 30732efc21d..7acb394e09e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -162,6 +162,10 @@ message AbandonBlockRequestProto { message AbandonBlockResponseProto { // void response } +enum AddBlockFlagProto { + NO_LOCAL_WRITE = 1; // avoid writing to local node. +} + message AddBlockRequestProto { required string src = 1; required string clientName = 2; @@ -169,6 +173,7 @@ message AddBlockRequestProto { repeated DatanodeInfoProto excludeNodes = 4; optional uint64 fileId = 5 [default = 0]; // default as a bogus id repeated string favoredNodes = 6; //the set of datanodes to use for the block + repeated AddBlockFlagProto flags = 7; // default to empty. } message AddBlockResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index da7c524eb4f..7a4ff8259ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FsServerDefaults; @@ -505,6 +506,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements try { List excl = req.getExcludeNodesList(); List favor = req.getFavoredNodesList(); + EnumSet flags = + PBHelperClient.convertAddBlockFlags(req.getFlagsList()); LocatedBlock result = server.addBlock( req.getSrc(), req.getClientName(), @@ -512,7 +515,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements (excl == null || excl.size() == 0) ? null : PBHelperClient.convert(excl .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(), (favor == null || favor.size() == 0) ? null : favor - .toArray(new String[favor.size()])); + .toArray(new String[favor.size()]), + flags); return AddBlockResponseProto.newBuilder() .setBlock(PBHelperClient.convertLocatedBlock(result)).build(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ff54f489130..70086e6518b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -48,6 +48,7 @@ import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtilClient; @@ -1782,7 +1783,7 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor clientnode, Set excludes, long blocksize) { return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode, Collections.emptyList(), false, excludes, - blocksize, storagePolicySuite.getDefaultPolicy()); + blocksize, storagePolicySuite.getDefaultPolicy(), null); } /** Choose target for getting additional datanodes for an existing pipeline. */ @@ -1797,7 +1798,7 @@ public class BlockManager implements BlockStatsMXBean { final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped); return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, - chosen, true, excludes, blocksize, storagePolicy); + chosen, true, excludes, blocksize, storagePolicy, null); } /** @@ -1814,14 +1815,15 @@ public class BlockManager implements BlockStatsMXBean { final long blocksize, final List favoredNodes, final byte storagePolicyID, - final boolean isStriped) throws IOException { + final boolean isStriped, + final EnumSet flags) throws IOException { List favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped); final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, - favoredDatanodeDescriptors, storagePolicy); + favoredDatanodeDescriptors, storagePolicy, flags); if (targets.length < minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 92b03d22d86..732a2dc7b09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -26,6 +27,7 @@ import java.util.Set; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -64,6 +66,7 @@ public abstract class BlockPlacementPolicy { * @param returnChosenNodes decide if the chosenNodes are returned. * @param excludedNodes datanodes that should not be considered as targets. * @param blocksize size of the data to be written. + * @param flags Block placement flags. * @return array of DatanodeDescriptor instances chosen as target * and sorted as a pipeline. */ @@ -74,7 +77,8 @@ public abstract class BlockPlacementPolicy { boolean returnChosenNodes, Set excludedNodes, long blocksize, - BlockStoragePolicy storagePolicy); + BlockStoragePolicy storagePolicy, + EnumSet flags); /** * @param favoredNodes datanodes that should be favored as targets. This @@ -86,14 +90,15 @@ public abstract class BlockPlacementPolicy { Set excludedNodes, long blocksize, List favoredNodes, - BlockStoragePolicy storagePolicy) { + BlockStoragePolicy storagePolicy, + EnumSet flags) { // This class does not provide the functionality of placing // a block in favored datanodes. The implementations of this class // are expected to provide this functionality return chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 474a5e7799a..3f1e09ae129 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -23,6 +23,7 @@ import java.util.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -111,9 +112,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet flags) { return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } @Override @@ -123,13 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { Set excludedNodes, long blocksize, List favoredNodes, - BlockStoragePolicy storagePolicy) { + BlockStoragePolicy storagePolicy, + EnumSet flags) { try { if (favoredNodes == null || favoredNodes.size() == 0) { // Favored nodes not specified, fall back to regular block placement. return chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } Set favoriteAndExcludedNodes = excludedNodes == null ? @@ -164,7 +167,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { DatanodeStorageInfo[] remainingTargets = chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - favoriteAndExcludedNodes, blocksize, storagePolicy); + favoriteAndExcludedNodes, blocksize, storagePolicy, flags); for (int i = 0; i < remainingTargets.length; i++) { results.add(remainingTargets[i]); } @@ -179,7 +182,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Fall back to regular block placement disregarding favored nodes hint return chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } } @@ -213,7 +216,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet addBlockFlags) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return DatanodeStorageInfo.EMPTY_ARRAY; } @@ -226,17 +230,42 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { numOfReplicas = result[0]; int maxNodesPerRack = result[1]; - final List results = new ArrayList<>(chosenStorage); for (DatanodeStorageInfo storage : chosenStorage) { // add localMachine and related nodes to excludedNodes addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes); } + List results = null; + Node localNode = null; boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); - final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, - EnumSet.noneOf(StorageType.class), results.isEmpty()); + boolean avoidLocalNode = (addBlockFlags != null + && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE) + && writer != null + && !excludedNodes.contains(writer)); + // Attempt to exclude local node if the client suggests so. If no enough + // nodes can be obtained, it falls back to the default block placement + // policy. + if (avoidLocalNode) { + results = new ArrayList<>(chosenStorage); + Set excludedNodeCopy = new HashSet<>(excludedNodes); + excludedNodeCopy.add(writer); + localNode = chooseTarget(numOfReplicas, writer, + excludedNodeCopy, blocksize, maxNodesPerRack, results, + avoidStaleNodes, storagePolicy, + EnumSet.noneOf(StorageType.class), results.isEmpty()); + if (results.size() < numOfReplicas) { + // not enough nodes; discard results and fall back + results = null; + } + } + if (results == null) { + results = new ArrayList<>(chosenStorage); + localNode = chooseTarget(numOfReplicas, writer, excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes, + storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty()); + } + if (!returnChosenNodes) { results.removeAll(chosenStorage); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index d110b300983..082e949e84c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -61,7 +61,7 @@ class ErasureCodingWork extends BlockReconstructionWork { getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0], getLiveReplicaStorages(), false, excludedNodes, getBlock().getNumBytes(), - storagePolicySuite.getPolicy(getBc().getStoragePolicyID())); + storagePolicySuite.getPolicy(getBc().getStoragePolicyID()), null); setTargets(chosenTargets); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java index f4d274a4b65..6078b1dfb75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java @@ -47,7 +47,8 @@ class ReplicationWork extends BlockReconstructionWork { getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0], getLiveReplicaStorages(), false, excludedNodes, getBlock().getNumBytes(), - storagePolicySuite.getPolicy(getBc().getStoragePolicyID())); + storagePolicySuite.getPolicy(getBc().getStoragePolicyID()), + null); setTargets(chosenTargets); } finally { getSrcNodes()[0].decrementPendingReplicationWithoutTargets(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index ab2f0fa32f7..01743ba6c1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -23,6 +23,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; @@ -282,8 +283,9 @@ class FSDirWriteFileOp { } static DatanodeStorageInfo[] chooseTargetForNewBlock( - BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[] - favoredNodes, ValidateAddBlockResult r) throws IOException { + BlockManager bm, String src, DatanodeInfo[] excludedNodes, + String[] favoredNodes, EnumSet flags, + ValidateAddBlockResult r) throws IOException { Node clientNode = bm.getDatanodeManager() .getDatanodeByHost(r.clientMachine); if (clientNode == null) { @@ -297,12 +299,11 @@ class FSDirWriteFileOp { } List favoredNodesList = (favoredNodes == null) ? null : Arrays.asList(favoredNodes); - // choose targets for the new block to be allocated. return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, excludedNodesSet, r.blockSize, favoredNodesList, r.storagePolicyID, - r.isStriped); + r.isStriped, flags); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 722d283e9ea..b06ea0acf4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -136,6 +136,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProvider.Metadata; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@ -2468,7 +2469,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, */ LocatedBlock getAdditionalBlock( String src, long fileId, String clientName, ExtendedBlock previous, - DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException { + DatanodeInfo[] excludedNodes, String[] favoredNodes, + EnumSet flags) throws IOException { NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" + " for {}", src, fileId, clientName); @@ -2492,7 +2494,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock( - blockManager, src, excludedNodes, favoredNodes, r); + blockManager, src, excludedNodes, favoredNodes, flags, r); checkOperation(OperationCategory.WRITE); writeLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0cd76be3578..2501be948b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.ContentSummary; @@ -837,11 +838,11 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, - String[] favoredNodes) + String[] favoredNodes, EnumSet addBlockFlags) throws IOException { checkNNStartup(); LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, - clientName, previous, excludedNodes, favoredNodes); + clientName, previous, excludedNodes, favoredNodes, addBlockFlags); if (locatedBlock != null) { metrics.incrAddBlockOps(); } @@ -1149,7 +1150,7 @@ class NameNodeRpcServer implements NamenodeProtocols { DatanodeInfo results[] = namesystem.datanodeReport(type); return results; } - + @Override // ClientProtocol public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d159fc55d0e..eda99107f23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1967,7 +1967,7 @@ public class DFSTestUtil { String clientName, ExtendedBlock previous, int numStripes, int len) throws Exception { fs.getClient().namenode.addBlock(file, clientName, previous, null, - fileNode.getId(), null); + fileNode.getId(), null, null); final BlockInfo lastBlock = fileNode.getLastBlock(); final int groupSize = fileNode.getPreferredBlockReplication(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index 62d1a3e2dea..624fc745693 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -1237,12 +1237,12 @@ public class TestBlockStoragePolicy { DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.emptyList(), false, - new HashSet(), 0, policy1); + new HashSet(), 0, policy1, null); System.out.println(Arrays.asList(targets)); Assert.assertEquals(3, targets.length); targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.emptyList(), false, - new HashSet(), 0, policy2); + new HashSet(), 0, policy2, null); System.out.println(Arrays.asList(targets)); Assert.assertEquals(3, targets.length); } @@ -1284,7 +1284,7 @@ public class TestBlockStoragePolicy { DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.emptyList(), false, - new HashSet(), 0, policy); + new HashSet(), 0, policy, null); System.out.println(policy.getName() + ": " + Arrays.asList(targets)); Assert.assertEquals(2, targets.length); Assert.assertEquals(StorageType.SSD, targets[0].getStorageType()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 62dfe790f8a..c7997d7d0fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -44,6 +44,7 @@ import java.net.URI; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -96,6 +97,7 @@ import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.invocation.InvocationOnMock; @@ -252,7 +254,9 @@ public class TestDFSClientRetries { anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), - anyLong(), any(String[].class))).thenAnswer(answer); + anyLong(), any(String[].class), + Matchers.>any())) + .thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( @@ -471,7 +475,8 @@ public class TestDFSClientRetries { } }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong(), Mockito. any()); + Mockito.anyLong(), Mockito. any(), + Mockito.> any()); doAnswer(new Answer() { @@ -513,7 +518,8 @@ public class TestDFSClientRetries { Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock( Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong(), Mockito. any()); + Mockito.anyLong(), Mockito. any(), + Mockito.> any()); Mockito.verify(spyNN, Mockito.atLeastOnce()).complete( Mockito.anyString(), Mockito.anyString(), Mockito.any(), anyLong()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index a404ac8174f..d9df1ff4073 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -22,19 +22,29 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.EnumSet; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; +import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.htrace.core.SpanId; import org.junit.AfterClass; import org.junit.Assert; @@ -42,8 +52,11 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class TestDFSOutputStream { @@ -52,7 +65,7 @@ public class TestDFSOutputStream { @BeforeClass public static void setup() throws IOException { Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); } /** @@ -80,7 +93,7 @@ public class TestDFSOutputStream { try { dos.close(); } catch (IOException e) { - Assert.assertEquals(e, dummy); + assertEquals(e, dummy); } thrown = (Throwable) Whitebox.getInternalState(ex, "thrown"); Assert.assertNull(thrown); @@ -127,7 +140,7 @@ public class TestDFSOutputStream { mock(HdfsFileStatus.class), mock(ExtendedBlock.class), client, - "foo", null, null, null, null, null); + "foo", null, null, null, null, null, null); DataOutputStream blockStream = mock(DataOutputStream.class); doThrow(new IOException()).when(blockStream).flush(); @@ -148,6 +161,47 @@ public class TestDFSOutputStream { Assert.assertTrue(congestedNodes.isEmpty()); } + @Test + public void testNoLocalWriteFlag() throws IOException { + DistributedFileSystem fs = cluster.getFileSystem(); + EnumSet flags = EnumSet.of(CreateFlag.NO_LOCAL_WRITE, + CreateFlag.CREATE); + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeManager dm = bm.getDatanodeManager(); + try(FSDataOutputStream os = fs.create(new Path("/test-no-local"), + FsPermission.getDefault(), + flags, 512, (short)2, 512, null)) { + // Inject a DatanodeManager that returns one DataNode as local node for + // the client. + DatanodeManager spyDm = spy(dm); + DatanodeDescriptor dn1 = dm.getDatanodeListForReport + (HdfsConstants.DatanodeReportType.LIVE).get(0); + doReturn(dn1).when(spyDm).getDatanodeByHost("127.0.0.1"); + Whitebox.setInternalState(bm, "datanodeManager", spyDm); + byte[] buf = new byte[512 * 16]; + new Random().nextBytes(buf); + os.write(buf); + } finally { + Whitebox.setInternalState(bm, "datanodeManager", dm); + } + cluster.triggerBlockReports(); + final String bpid = cluster.getNamesystem().getBlockPoolId(); + // Total number of DataNodes is 3. + assertEquals(3, cluster.getAllBlockReports(bpid).size()); + int numDataNodesWithData = 0; + for (Map dnBlocks : + cluster.getAllBlockReports(bpid)) { + for (BlockListAsLongs blocks : dnBlocks.values()) { + if (blocks.getNumberOfBlocks() > 0) { + numDataNodesWithData++; + break; + } + } + } + // Verify that only one DN has no data. + assertEquals(1, 3 - numDataNodesWithData); + } + @AfterClass public static void tearDown() { if (cluster != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 05c98ac3e8b..e47c8b13e1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -531,7 +531,7 @@ public class TestFileCreation { // add one block to the file LocatedBlock location = client.getNamenode().addBlock(file1.toString(), - client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); + client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null); System.out.println("testFileCreationError2: " + "Added block " + location.getBlock()); @@ -582,7 +582,7 @@ public class TestFileCreation { createFile(dfs, f, 3); try { cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName, - null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); + null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null); fail(); } catch(IOException ioe) { FileSystem.LOG.info("GOOD!", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java index 7dc52fa5fda..99986e6ac84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -156,6 +156,6 @@ abstract public class BaseReplicationPolicyTest { Set excludedNodes) { return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, false, excludedNodes, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java index f1e4e1cc612..a5090ccbbdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java @@ -142,7 +142,7 @@ public class TestAvailableSpaceBlockPlacementPolicy { .getBlockManager() .getBlockPlacementPolicy() .chooseTarget(file, replica, null, new ArrayList(), false, null, - blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); Assert.assertTrue(targets.length == replica); for (int j = 0; j < replica; j++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 9f8985ab088..9c623833c45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -38,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -290,7 +293,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { excludedNodes.add(dataNodes[1]); chosenNodes.add(storages[2]); targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); //make sure that the chosen node is in the target. @@ -667,7 +671,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { .getNamesystem().getBlockManager().getBlockPlacementPolicy(); DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3, staleNodeInfo, new ArrayList(), false, null, - BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); assertEquals(targets.length, 3); assertFalse(isOnSameRack(targets[0], staleNodeInfo)); @@ -693,7 +698,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Call chooseTarget targets = replicator.chooseTarget(filename, 3, staleNodeInfo, new ArrayList(), false, null, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); assertEquals(targets.length, 3); assertTrue(isOnSameRack(targets[0], staleNodeInfo)); @@ -1501,8 +1506,42 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, Set excludedNodes, List favoredNodes) { + return chooseTarget(numOfReplicas, writer, excludedNodes, + favoredNodes, null); + } + + private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, Set excludedNodes, + List favoredNodes, EnumSet flags) { return replicator.chooseTarget(filename, numOfReplicas, writer, excludedNodes, BLOCK_SIZE, favoredNodes, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, flags); + } + + @Test + public void testAvoidLocalWrite() throws IOException { + DatanodeDescriptor writer = dataNodes[2]; + EnumSet flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE); + DatanodeStorageInfo[] targets; + targets = chooseTarget(5, writer, null, null, flags); + for (DatanodeStorageInfo info : targets) { + assertNotEquals(info.getDatanodeDescriptor(), writer); + } + } + + @Test + public void testAvoidLocalWriteNoEnoughNodes() throws IOException { + DatanodeDescriptor writer = dataNodes[2]; + EnumSet flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE); + DatanodeStorageInfo[] targets; + targets = chooseTarget(6, writer, null, null, flags); + assertEquals(6, targets.length); + boolean found = false; + for (DatanodeStorageInfo info : targets) { + if (info.getDatanodeDescriptor().equals(writer)) { + found = true; + } + } + assertTrue(found); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index 47ff0ac910d..8336fec2985 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -112,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn, new ArrayList(), false, null, - 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); assertEquals(3, targets.length); Set targetSet = new HashSet<>( @@ -171,7 +171,7 @@ public class TestReplicationPolicyConsiderLoad DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn, new ArrayList(), false, null, - 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); for(DatanodeStorageInfo info : targets) { assertTrue("The node "+info.getDatanodeDescriptor().getName()+ " has higher load and should not have been picked!", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index ce210e62859..2f184bb143f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -269,7 +269,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes List favoredNodes) { return replicator.chooseTarget(filename, numOfReplicas, writer, excludedNodes, BLOCK_SIZE, favoredNodes, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); } /** @@ -351,7 +351,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes Set excludedNodes = new HashSet<>(); excludedNodes.add(dataNodes[1]); targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); assertEquals(targets.length, 4); assertEquals(storages[0], targets[0]); @@ -369,7 +370,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes excludedNodes.add(dataNodes[1]); chosenNodes.add(storages[2]); targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); //make sure that the chosen node is in the target. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index 69bc228bdaa..6f7d67a8af6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -187,7 +187,7 @@ public class TestReplicationPolicyWithUpgradeDomain chosenNodes.add(storages[2]); targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, excludedNodes, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index a8a8ba574bb..efd731e52a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -1204,7 +1204,7 @@ public class NNThroughputBenchmark implements Tool { for (int i = 0; i < 30; i++) { try { return clientProto.addBlock(src, clientName, - previous, excludeNodes, fileId, favoredNodes); + previous, excludeNodes, fileId, favoredNodes, null); } catch (NotReplicatedYetException|RemoteException e) { if (e instanceof RemoteException) { String className = ((RemoteException) e).getClassName(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index a8cd5b9a825..94abe3ef837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -101,13 +101,13 @@ public class TestAddBlockRetry { ns.readUnlock();; } DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock( - ns.getBlockManager(), src, null, null, r); + ns.getBlockManager(), src, null, null, null, r); assertNotNull("Targets must be generated", targets); // run second addBlock() LOG.info("Starting second addBlock for " + src); nn.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertTrue("Penultimate block must be complete", checkFileProgress(src, false)); LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); @@ -161,14 +161,14 @@ public class TestAddBlockRetry { // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertTrue("Block locations should be present", lb1.getLocations().length > 0); cluster.restartNameNode(); nameNodeRpc = cluster.getNameNodeRpc(); LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); assertTrue("Wrong locations with retry", lb2.getLocations().length > 0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java index b206f8351e1..768533ace3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -285,7 +285,7 @@ public class TestAddStripedBlocks { FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); cluster.getNamesystem().getAdditionalBlock(file.toString(), - fileNode.getId(), dfs.getClient().getClientName(), null, null, null); + fileNode.getId(), dfs.getClient().getClientName(), null, null, null, null); BlockInfo lastBlock = fileNode.getLastBlock(); DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java index 9f844d7fc7e..f40c464e24d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java @@ -113,7 +113,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); doTestLocatedBlock(replication, locatedBlock); //test chooseTarget for existing file. @@ -143,7 +143,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); doTestLocatedBlock(20, locatedBlock); DatanodeInfo[] locs = locatedBlock.getLocations(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index ff8f81b27c2..04f0b91a2af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -166,7 +166,7 @@ public class TestDeadDatanode { // part of the cluster anymore DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3, clientNode, new HashSet(), 256 * 1024 * 1024L, null, (byte) 7, - false); + false, null); for (DatanodeStorageInfo datanodeStorageInfo : results) { assertFalse("Dead node should not be choosen", datanodeStorageInfo .getDatanodeDescriptor().equals(clientNode)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index a54040b2e8d..1a10b7a5351 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -140,7 +140,7 @@ public class TestDefaultBlockPlacementPolicy { clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java index 7d4eb31bd31..15f697a9a06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.FileNotFoundException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -123,10 +125,11 @@ public class TestDeleteRace { boolean returnChosenNodes, Set excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet flags) { DatanodeStorageInfo[] results = super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes, - blocksize, storagePolicy); + blocksize, storagePolicy, flags); try { Thread.sleep(3000); } catch (InterruptedException e) {} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java index cc14fcbb04b..f9a2503b004 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java @@ -152,7 +152,7 @@ public class TestUpgradeDomainBlockPlacementPolicy { clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index 8b8343c7c7c..ee8ee902386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -837,7 +837,7 @@ public class TestHASafeMode { new ExtendedBlock(previousBlock), new DatanodeInfo[0], DFSClientAdapter.getFileId((DFSOutputStream) create - .getWrappedStream()), null); + .getWrappedStream()), null, null); cluster.restartNameNode(0, true); cluster.restartDataNode(0); cluster.transitionToActive(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java index 694d15e63f3..812bcc5b386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java @@ -198,7 +198,7 @@ public class TestOpenFilesWithSnapshot { String clientName = fs.getClient().getClientName(); // create one empty block nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); fs.createSnapshot(path, "s2"); fs.rename(new Path("/test/test"), new Path("/test/test-renamed")); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java index 76f1e8f8beb..ff45c0a707d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java @@ -282,7 +282,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); @@ -319,7 +319,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); @@ -358,7 +358,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");