HDFS-3702. Add an option for NOT writing the blocks locally if there is a datanode on the same box as the client. (Contributed by Lei (Eddy) Xu)

This commit is contained in:
Lei Xu 2016-04-27 14:22:51 -07:00
parent f16722d2ef
commit 0a152103f1
41 changed files with 367 additions and 93 deletions

View File

@ -103,7 +103,14 @@ public enum CreateFlag {
* Append data to a new block instead of the end of the last partial block.
* This is only useful for APPEND.
*/
NEW_BLOCK((short) 0x20);
NEW_BLOCK((short) 0x20),
/**
* Advise that a block replica NOT be written to the local DataNode where
* 'local' means the same host as the client is being run on.
*/
@InterfaceAudience.LimitedPrivate({"HBase"})
NO_LOCAL_WRITE((short) 0x40);
private final short mode;

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CreateFlag;
/**
* AddBlockFlag provides hints for new block allocation and placement.
* Users can use this flag to control <em>per DFSOutputStream</em>
* {@see ClientProtocol#addBlock()} behavior.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public enum AddBlockFlag {
/**
* Advise that a block replica NOT be written to the local DataNode where
* 'local' means the same host as the client is being run on.
*
* @see CreateFlag#NO_LOCAL_WRITE
*/
NO_LOCAL_WRITE((short) 0x01);
private final short mode;
AddBlockFlag(short mode) {
this.mode = mode;
}
public static AddBlockFlag valueOf(short mode) {
for (AddBlockFlag flag : AddBlockFlag.values()) {
if (flag.getMode() == mode) {
return flag;
}
}
return null;
}
public short getMode() {
return mode;
}
}

View File

@ -117,6 +117,7 @@ public class DFSOutputStream extends FSOutputSummer
private long initialFileSize = 0; // at time of file open
private final short blockReplication; // replication factor of file
protected boolean shouldSyncBlock = false; // force blocks to disk upon close
private final EnumSet<AddBlockFlag> addBlockFlags;
protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo;
@ -179,6 +180,7 @@ public class DFSOutputStream extends FSOutputSummer
}
private DFSOutputStream(DFSClient dfsClient, String src,
EnumSet<CreateFlag> flag,
Progressable progress, HdfsFileStatus stat, DataChecksum checksum) {
super(getChecksum4Compute(checksum, stat));
this.dfsClient = dfsClient;
@ -189,6 +191,10 @@ public class DFSOutputStream extends FSOutputSummer
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
this.cachingStrategy = new AtomicReference<>(
dfsClient.getDefaultWriteCachingStrategy());
this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class);
if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
}
if (progress != null) {
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
+"{}", src);
@ -212,7 +218,7 @@ public class DFSOutputStream extends FSOutputSummer
protected DFSOutputStream(DFSClient dfsClient, String src,
HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
this(dfsClient, src, progress, stat, checksum);
this(dfsClient, src, flag, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
@ -220,7 +226,8 @@ public class DFSOutputStream extends FSOutputSummer
if (createStreamer) {
streamer = new DataStreamer(stat, null, dfsClient, src, progress,
checksum, cachingStrategy, byteArrayManager, favoredNodes);
checksum, cachingStrategy, byteArrayManager, favoredNodes,
addBlockFlags);
}
}
@ -289,7 +296,7 @@ public class DFSOutputStream extends FSOutputSummer
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
throws IOException {
this(dfsClient, src, progress, stat, checksum);
this(dfsClient, src, flags, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened
this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
@ -310,7 +317,8 @@ public class DFSOutputStream extends FSOutputSummer
bytesPerChecksum);
streamer = new DataStreamer(stat,
lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src,
progress, checksum, cachingStrategy, byteArrayManager, favoredNodes);
progress, checksum, cachingStrategy, byteArrayManager, favoredNodes,
addBlockFlags);
}
}
@ -844,6 +852,10 @@ public class DFSOutputStream extends FSOutputSummer
return initialFileSize;
}
protected EnumSet<AddBlockFlag> getAddBlockFlags() {
return addBlockFlags;
}
/**
* @return the FileEncryptionInfo for this stream, or null if not encrypted.
*/
@ -916,7 +928,8 @@ public class DFSOutputStream extends FSOutputSummer
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
String[] favoredNodes) throws IOException {
String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
int retries = conf.getNumBlockWriteLocateFollowingRetry();
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
@ -924,7 +937,7 @@ public class DFSOutputStream extends FSOutputSummer
while (true) {
try {
return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
excludedNodes, fileId, favoredNodes);
excludedNodes, fileId, favoredNodes, allocFlags);
} catch (RemoteException e) {
IOException ue = e.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class,

View File

@ -301,7 +301,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator);
favoredNodes, i, coordinator, getAddBlockFlags());
streamers.add(streamer);
}
currentPackets = new DFSPacket[streamers.size()];
@ -406,7 +406,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
dfsClient, src, oldStreamer.progress,
oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator);
favoredNodes, i, coordinator, getAddBlockFlags());
streamers.set(i, streamer);
currentPackets[i] = null;
if (i == currentIndex) {
@ -458,7 +458,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
LOG.debug("Allocating new block group. The previous block group: "
+ currentBlockGroup);
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
currentBlockGroup, fileId, favoredNodes);
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
assert lb.isStriped();
if (lb.getLocations().length < numDataBlocks) {
throw new IOException("Failed to get " + numDataBlocks

View File

@ -32,6 +32,7 @@ import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@ -412,13 +413,15 @@ class DataStreamer extends Daemon {
protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes;
private final EnumSet<AddBlockFlag> addBlockFlags;
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage,
boolean isAppend, String[] favoredNodes) {
boolean isAppend, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) {
this.block = block;
this.dfsClient = dfsClient;
this.src = src;
@ -430,11 +433,11 @@ class DataStreamer extends Daemon {
this.isLazyPersistFile = isLazyPersist(stat);
this.isAppend = isAppend;
this.favoredNodes = favoredNodes;
final DfsClientConf conf = dfsClient.getConf();
this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
this.addBlockFlags = flags;
}
/**
@ -443,9 +446,10 @@ class DataStreamer extends Daemon {
DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
String src, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, String[] favoredNodes) {
ByteArrayManager byteArrayManage, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) {
this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, false, favoredNodes);
byteArrayManage, false, favoredNodes, flags);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@ -459,7 +463,7 @@ class DataStreamer extends Daemon {
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage) {
this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, true, null);
byteArrayManage, true, null, null);
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
@ -1679,7 +1683,7 @@ class DataStreamer extends Daemon {
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block,
stat.getFileId(), favoredNodes);
stat.getFileId(), favoredNodes, addBlockFlags);
}
/**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience;
@ -52,9 +53,10 @@ public class StripedDataStreamer extends DataStreamer {
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, String[] favoredNodes,
short index, Coordinator coordinator) {
short index, Coordinator coordinator,
final EnumSet<AddBlockFlag> flags) {
super(stat, null, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, favoredNodes);
byteArrayManage, favoredNodes, flags);
this.index = index;
this.coordinator = coordinator;
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -390,6 +391,8 @@ public interface ClientProtocol {
* @param fileId the id uniquely identifying a file
* @param favoredNodes the list of nodes where the client wants the blocks.
* Nodes are identified by either host name or address.
* @param addBlockFlags flags to advise the behavior of allocating and placing
* a new block.
*
* @return LocatedBlock allocated block information.
*
@ -408,7 +411,7 @@ public interface ClientProtocol {
@Idempotent
LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes)
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException;
/**

View File

@ -404,7 +404,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes) throws IOException {
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException {
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
.setSrc(src).setClientName(clientName).setFileId(fileId);
if (previous != null)
@ -414,6 +415,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
if (favoredNodes != null) {
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
if (addBlockFlags != null) {
req.addAllFlags(PBHelperClient.convertAddBlockFlags(
addBlockFlags));
}
try {
return PBHelperClient.convertLocatedBlockProto(
rpcProxy.addBlock(null, req.build()).getBlock());

View File

@ -37,6 +37,7 @@ import com.google.protobuf.CodedInputStream;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -97,6 +98,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
@ -2528,4 +2530,29 @@ public class PBHelperClient {
}
return builder.build();
}
public static EnumSet<AddBlockFlag> convertAddBlockFlags(
List<AddBlockFlagProto> addBlockFlags) {
EnumSet<AddBlockFlag> flags =
EnumSet.noneOf(AddBlockFlag.class);
for (AddBlockFlagProto af : addBlockFlags) {
AddBlockFlag flag = AddBlockFlag.valueOf((short)af.getNumber());
if (flag != null) {
flags.add(flag);
}
}
return flags;
}
public static List<AddBlockFlagProto> convertAddBlockFlags(
EnumSet<AddBlockFlag> flags) {
List<AddBlockFlagProto> ret = new ArrayList<>();
for (AddBlockFlag flag : flags) {
AddBlockFlagProto abfp = AddBlockFlagProto.valueOf(flag.getMode());
if (abfp != null) {
ret.add(abfp);
}
}
return ret;
}
}

View File

@ -162,6 +162,10 @@ message AbandonBlockRequestProto {
message AbandonBlockResponseProto { // void response
}
enum AddBlockFlagProto {
NO_LOCAL_WRITE = 1; // avoid writing to local node.
}
message AddBlockRequestProto {
required string src = 1;
required string clientName = 2;
@ -169,6 +173,7 @@ message AddBlockRequestProto {
repeated DatanodeInfoProto excludeNodes = 4;
optional uint64 fileId = 5 [default = 0]; // default as a bogus id
repeated string favoredNodes = 6; //the set of datanodes to use for the block
repeated AddBlockFlagProto flags = 7; // default to empty.
}
message AddBlockResponseProto {

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
@ -505,6 +506,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
try {
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
List<String> favor = req.getFavoredNodesList();
EnumSet<AddBlockFlag> flags =
PBHelperClient.convertAddBlockFlags(req.getFlagsList());
LocatedBlock result = server.addBlock(
req.getSrc(),
req.getClientName(),
@ -512,7 +515,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
(excl == null || excl.size() == 0) ? null : PBHelperClient.convert(excl
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
(favor == null || favor.size() == 0) ? null : favor
.toArray(new String[favor.size()]));
.toArray(new String[favor.size()]),
flags);
return AddBlockResponseProto.newBuilder()
.setBlock(PBHelperClient.convertLocatedBlock(result)).build();
} catch (IOException e) {

View File

@ -48,6 +48,7 @@ import javax.management.ObjectName;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
@ -1782,7 +1783,7 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode,
Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
blocksize, storagePolicySuite.getDefaultPolicy());
blocksize, storagePolicySuite.getDefaultPolicy(), null);
}
/** Choose target for getting additional datanodes for an existing pipeline. */
@ -1797,7 +1798,7 @@ public class BlockManager implements BlockStatsMXBean {
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
chosen, true, excludes, blocksize, storagePolicy);
chosen, true, excludes, blocksize, storagePolicy, null);
}
/**
@ -1814,14 +1815,15 @@ public class BlockManager implements BlockStatsMXBean {
final long blocksize,
final List<String> favoredNodes,
final byte storagePolicyID,
final boolean isStriped) throws IOException {
final boolean isStriped,
final EnumSet<AddBlockFlag> flags) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
favoredDatanodeDescriptors, storagePolicy);
favoredDatanodeDescriptors, storagePolicy, flags);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -26,6 +27,7 @@ import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -64,6 +66,7 @@ public abstract class BlockPlacementPolicy {
* @param returnChosenNodes decide if the chosenNodes are returned.
* @param excludedNodes datanodes that should not be considered as targets.
* @param blocksize size of the data to be written.
* @param flags Block placement flags.
* @return array of DatanodeDescriptor instances chosen as target
* and sorted as a pipeline.
*/
@ -74,7 +77,8 @@ public abstract class BlockPlacementPolicy {
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
BlockStoragePolicy storagePolicy);
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags);
/**
* @param favoredNodes datanodes that should be favored as targets. This
@ -86,14 +90,15 @@ public abstract class BlockPlacementPolicy {
Set<Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes,
BlockStoragePolicy storagePolicy) {
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
// This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class
// are expected to provide this functionality
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storagePolicy);
excludedNodes, blocksize, storagePolicy, flags);
}
/**

View File

@ -23,6 +23,7 @@ import java.util.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -111,9 +112,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy) {
final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize, storagePolicy);
excludedNodes, blocksize, storagePolicy, flags);
}
@Override
@ -123,13 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Set<Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes,
BlockStoragePolicy storagePolicy) {
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
try {
if (favoredNodes == null || favoredNodes.size() == 0) {
// Favored nodes not specified, fall back to regular block placement.
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storagePolicy);
excludedNodes, blocksize, storagePolicy, flags);
}
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
@ -164,7 +167,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
DatanodeStorageInfo[] remainingTargets =
chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
favoriteAndExcludedNodes, blocksize, storagePolicy);
favoriteAndExcludedNodes, blocksize, storagePolicy, flags);
for (int i = 0; i < remainingTargets.length; i++) {
results.add(remainingTargets[i]);
}
@ -179,7 +182,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// Fall back to regular block placement disregarding favored nodes hint
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storagePolicy);
excludedNodes, blocksize, storagePolicy, flags);
}
}
@ -213,7 +216,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy) {
final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> addBlockFlags) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeStorageInfo.EMPTY_ARRAY;
}
@ -226,17 +230,42 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage);
for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
List<DatanodeStorageInfo> results = null;
Node localNode = null;
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty());
boolean avoidLocalNode = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null
&& !excludedNodes.contains(writer));
// Attempt to exclude local node if the client suggests so. If no enough
// nodes can be obtained, it falls back to the default block placement
// policy.
if (avoidLocalNode) {
results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
excludedNodeCopy.add(writer);
localNode = chooseTarget(numOfReplicas, writer,
excludedNodeCopy, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty());
if (results.size() < numOfReplicas) {
// not enough nodes; discard results and fall back
results = null;
}
}
if (results == null) {
results = new ArrayList<>(chosenStorage);
localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes,
storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty());
}
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}

View File

@ -61,7 +61,7 @@ class ErasureCodingWork extends BlockReconstructionWork {
getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
getLiveReplicaStorages(), false, excludedNodes,
getBlock().getNumBytes(),
storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
storagePolicySuite.getPolicy(getBc().getStoragePolicyID()), null);
setTargets(chosenTargets);
}

View File

@ -47,7 +47,8 @@ class ReplicationWork extends BlockReconstructionWork {
getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
getLiveReplicaStorages(), false, excludedNodes,
getBlock().getNumBytes(),
storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
storagePolicySuite.getPolicy(getBc().getStoragePolicyID()),
null);
setTargets(chosenTargets);
} finally {
getSrcNodes()[0].decrementPendingReplicationWithoutTargets();

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
@ -282,8 +283,9 @@ class FSDirWriteFileOp {
}
static DatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[]
favoredNodes, ValidateAddBlockResult r) throws IOException {
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
ValidateAddBlockResult r) throws IOException {
Node clientNode = bm.getDatanodeManager()
.getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
@ -297,12 +299,11 @@ class FSDirWriteFileOp {
}
List<String> favoredNodesList = (favoredNodes == null) ? null
: Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID,
r.isStriped);
r.isStriped, flags);
}
/**

View File

@ -136,6 +136,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProvider.Metadata;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
@ -2468,7 +2469,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
LocatedBlock getAdditionalBlock(
String src, long fileId, String clientName, ExtendedBlock previous,
DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException {
DatanodeInfo[] excludedNodes, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) throws IOException {
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
" for {}", src, fileId, clientName);
@ -2492,7 +2494,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
blockManager, src, excludedNodes, favoredNodes, r);
blockManager, src, excludedNodes, favoredNodes, flags, r);
checkOperation(OperationCategory.WRITE);
writeLock();

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
@ -837,11 +838,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
String[] favoredNodes)
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException {
checkNNStartup();
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodes, favoredNodes);
clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
@ -1149,7 +1150,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
DatanodeInfo results[] = namesystem.datanodeReport(type);
return results;
}
@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {

View File

@ -1967,7 +1967,7 @@ public class DFSTestUtil {
String clientName, ExtendedBlock previous, int numStripes, int len)
throws Exception {
fs.getClient().namenode.addBlock(file, clientName, previous, null,
fileNode.getId(), null);
fileNode.getId(), null, null);
final BlockInfo lastBlock = fileNode.getLastBlock();
final int groupSize = fileNode.getPreferredBlockReplication();

View File

@ -1237,12 +1237,12 @@ public class TestBlockStoragePolicy {
DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
new HashSet<Node>(), 0, policy1);
new HashSet<Node>(), 0, policy1, null);
System.out.println(Arrays.asList(targets));
Assert.assertEquals(3, targets.length);
targets = replicator.chooseTarget("/foo", 3,
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
new HashSet<Node>(), 0, policy2);
new HashSet<Node>(), 0, policy2, null);
System.out.println(Arrays.asList(targets));
Assert.assertEquals(3, targets.length);
}
@ -1284,7 +1284,7 @@ public class TestBlockStoragePolicy {
DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
new HashSet<Node>(), 0, policy);
new HashSet<Node>(), 0, policy, null);
System.out.println(policy.getName() + ": " + Arrays.asList(targets));
Assert.assertEquals(2, targets.length);
Assert.assertEquals(StorageType.SSD, targets[0].getStorageType());

View File

@ -44,6 +44,7 @@ import java.net.URI;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ -96,6 +97,7 @@ import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.ThrowsException;
import org.mockito.invocation.InvocationOnMock;
@ -252,7 +254,9 @@ public class TestDFSClientRetries {
anyString(),
any(ExtendedBlock.class),
any(DatanodeInfo[].class),
anyLong(), any(String[].class))).thenAnswer(answer);
anyLong(), any(String[].class),
Matchers.<EnumSet<AddBlockFlag>>any()))
.thenAnswer(answer);
Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
@ -471,7 +475,8 @@ public class TestDFSClientRetries {
}
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
Mockito.anyLong(), Mockito.<String[]> any());
Mockito.anyLong(), Mockito.<String[]> any(),
Mockito.<EnumSet<AddBlockFlag>> any());
doAnswer(new Answer<Boolean>() {
@ -513,7 +518,8 @@ public class TestDFSClientRetries {
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
Mockito.anyLong(), Mockito.<String[]> any());
Mockito.anyLong(), Mockito.<String[]> any(),
Mockito.<EnumSet<AddBlockFlag>> any());
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), anyLong());

View File

@ -22,19 +22,29 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.htrace.core.SpanId;
import org.junit.AfterClass;
import org.junit.Assert;
@ -42,8 +52,11 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestDFSOutputStream {
@ -52,7 +65,7 @@ public class TestDFSOutputStream {
@BeforeClass
public static void setup() throws IOException {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
}
/**
@ -80,7 +93,7 @@ public class TestDFSOutputStream {
try {
dos.close();
} catch (IOException e) {
Assert.assertEquals(e, dummy);
assertEquals(e, dummy);
}
thrown = (Throwable) Whitebox.getInternalState(ex, "thrown");
Assert.assertNull(thrown);
@ -127,7 +140,7 @@ public class TestDFSOutputStream {
mock(HdfsFileStatus.class),
mock(ExtendedBlock.class),
client,
"foo", null, null, null, null, null);
"foo", null, null, null, null, null, null);
DataOutputStream blockStream = mock(DataOutputStream.class);
doThrow(new IOException()).when(blockStream).flush();
@ -148,6 +161,47 @@ public class TestDFSOutputStream {
Assert.assertTrue(congestedNodes.isEmpty());
}
@Test
public void testNoLocalWriteFlag() throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.NO_LOCAL_WRITE,
CreateFlag.CREATE);
BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
DatanodeManager dm = bm.getDatanodeManager();
try(FSDataOutputStream os = fs.create(new Path("/test-no-local"),
FsPermission.getDefault(),
flags, 512, (short)2, 512, null)) {
// Inject a DatanodeManager that returns one DataNode as local node for
// the client.
DatanodeManager spyDm = spy(dm);
DatanodeDescriptor dn1 = dm.getDatanodeListForReport
(HdfsConstants.DatanodeReportType.LIVE).get(0);
doReturn(dn1).when(spyDm).getDatanodeByHost("127.0.0.1");
Whitebox.setInternalState(bm, "datanodeManager", spyDm);
byte[] buf = new byte[512 * 16];
new Random().nextBytes(buf);
os.write(buf);
} finally {
Whitebox.setInternalState(bm, "datanodeManager", dm);
}
cluster.triggerBlockReports();
final String bpid = cluster.getNamesystem().getBlockPoolId();
// Total number of DataNodes is 3.
assertEquals(3, cluster.getAllBlockReports(bpid).size());
int numDataNodesWithData = 0;
for (Map<DatanodeStorage, BlockListAsLongs> dnBlocks :
cluster.getAllBlockReports(bpid)) {
for (BlockListAsLongs blocks : dnBlocks.values()) {
if (blocks.getNumberOfBlocks() > 0) {
numDataNodesWithData++;
break;
}
}
}
// Verify that only one DN has no data.
assertEquals(1, 3 - numDataNodesWithData);
}
@AfterClass
public static void tearDown() {
if (cluster != null) {

View File

@ -531,7 +531,7 @@ public class TestFileCreation {
// add one block to the file
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null);
System.out.println("testFileCreationError2: "
+ "Added block " + location.getBlock());
@ -582,7 +582,7 @@ public class TestFileCreation {
createFile(dfs, f, 3);
try {
cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
null, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null);
fail();
} catch(IOException ioe) {
FileSystem.LOG.info("GOOD!", ioe);

View File

@ -156,6 +156,6 @@ abstract public class BaseReplicationPolicyTest {
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer,
chosenNodes, false, excludedNodes, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
}
}

View File

@ -142,7 +142,7 @@ public class TestAvailableSpaceBlockPlacementPolicy {
.getBlockManager()
.getBlockPlacementPolicy()
.chooseTarget(file, replica, null, new ArrayList<DatanodeStorageInfo>(), false, null,
blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
Assert.assertTrue(targets.length == replica);
for (int j = 0; j < replica; j++) {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -30,6 +31,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -38,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -290,7 +293,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
excludedNodes.add(dataNodes[1]);
chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
null);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
@ -667,7 +671,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
null);
assertEquals(targets.length, 3);
assertFalse(isOnSameRack(targets[0], staleNodeInfo));
@ -693,7 +698,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
assertEquals(targets.length, 3);
assertTrue(isOnSameRack(targets[0], staleNodeInfo));
@ -1501,8 +1506,42 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, Set<Node> excludedNodes,
List<DatanodeDescriptor> favoredNodes) {
return chooseTarget(numOfReplicas, writer, excludedNodes,
favoredNodes, null);
}
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, Set<Node> excludedNodes,
List<DatanodeDescriptor> favoredNodes, EnumSet<AddBlockFlag> flags) {
return replicator.chooseTarget(filename, numOfReplicas, writer,
excludedNodes, BLOCK_SIZE, favoredNodes,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, flags);
}
@Test
public void testAvoidLocalWrite() throws IOException {
DatanodeDescriptor writer = dataNodes[2];
EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
DatanodeStorageInfo[] targets;
targets = chooseTarget(5, writer, null, null, flags);
for (DatanodeStorageInfo info : targets) {
assertNotEquals(info.getDatanodeDescriptor(), writer);
}
}
@Test
public void testAvoidLocalWriteNoEnoughNodes() throws IOException {
DatanodeDescriptor writer = dataNodes[2];
EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
DatanodeStorageInfo[] targets;
targets = chooseTarget(6, writer, null, null, flags);
assertEquals(6, targets.length);
boolean found = false;
for (DatanodeStorageInfo info : targets) {
if (info.getDatanodeDescriptor().equals(writer)) {
found = true;
}
}
assertTrue(found);
}
}

View File

@ -112,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
writerDn, new ArrayList<DatanodeStorageInfo>(), false, null,
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
assertEquals(3, targets.length);
Set<DatanodeStorageInfo> targetSet = new HashSet<>(
@ -171,7 +171,7 @@ public class TestReplicationPolicyConsiderLoad
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn,
new ArrayList<DatanodeStorageInfo>(), false, null,
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
for(DatanodeStorageInfo info : targets) {
assertTrue("The node "+info.getDatanodeDescriptor().getName()+
" has higher load and should not have been picked!",

View File

@ -269,7 +269,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
List<DatanodeDescriptor> favoredNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer,
excludedNodes, BLOCK_SIZE, favoredNodes,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
}
/**
@ -351,7 +351,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
Set<Node> excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[1]);
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
null);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
@ -369,7 +370,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
excludedNodes.add(dataNodes[1]);
chosenNodes.add(storages[2]);
targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
null);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.

View File

@ -187,7 +187,7 @@ public class TestReplicationPolicyWithUpgradeDomain
chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes,
true, excludedNodes, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
}

View File

@ -1204,7 +1204,7 @@ public class NNThroughputBenchmark implements Tool {
for (int i = 0; i < 30; i++) {
try {
return clientProto.addBlock(src, clientName,
previous, excludeNodes, fileId, favoredNodes);
previous, excludeNodes, fileId, favoredNodes, null);
} catch (NotReplicatedYetException|RemoteException e) {
if (e instanceof RemoteException) {
String className = ((RemoteException) e).getClassName();

View File

@ -101,13 +101,13 @@ public class TestAddBlockRetry {
ns.readUnlock();;
}
DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock(
ns.getBlockManager(), src, null, null, r);
ns.getBlockManager(), src, null, null, null, r);
assertNotNull("Targets must be generated", targets);
// run second addBlock()
LOG.info("Starting second addBlock for " + src);
nn.addBlock(src, "clientName", null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null);
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
assertTrue("Penultimate block must be complete",
checkFileProgress(src, false));
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
@ -161,14 +161,14 @@ public class TestAddBlockRetry {
// start first addBlock()
LOG.info("Starting first addBlock for " + src);
LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null);
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
assertTrue("Block locations should be present",
lb1.getLocations().length > 0);
cluster.restartNameNode();
nameNodeRpc = cluster.getNameNodeRpc();
LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null);
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
assertTrue("Wrong locations with retry", lb2.getLocations().length > 0);
}

View File

@ -285,7 +285,7 @@ public class TestAddStripedBlocks {
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
cluster.getNamesystem().getAdditionalBlock(file.toString(),
fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
fileNode.getId(), dfs.getClient().getClientName(), null, null, null, null);
BlockInfo lastBlock = fileNode.getLastBlock();
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()

View File

@ -113,7 +113,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
//test chooseTarget for new file
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null);
null, null, fileStatus.getFileId(), null, null);
doTestLocatedBlock(replication, locatedBlock);
//test chooseTarget for existing file.
@ -143,7 +143,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
//test chooseTarget for new file
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null);
null, null, fileStatus.getFileId(), null, null);
doTestLocatedBlock(20, locatedBlock);
DatanodeInfo[] locs = locatedBlock.getLocations();

View File

@ -166,7 +166,7 @@ public class TestDeadDatanode {
// part of the cluster anymore
DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3,
clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7,
false);
false, null);
for (DatanodeStorageInfo datanodeStorageInfo : results) {
assertFalse("Dead node should not be choosen", datanodeStorageInfo
.getDatanodeDescriptor().equals(clientNode));

View File

@ -140,7 +140,7 @@ public class TestDefaultBlockPlacementPolicy {
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null);
null, null, fileStatus.getFileId(), null, null);
assertEquals("Block should be allocated sufficient locations",
REPLICATION_FACTOR, locatedBlock.getLocations().length);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -28,6 +29,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -123,10 +125,11 @@ public class TestDeleteRace {
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy) {
final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
DatanodeStorageInfo[] results = super.chooseTarget(srcPath,
numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes,
blocksize, storagePolicy);
blocksize, storagePolicy, flags);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}

View File

@ -152,7 +152,7 @@ public class TestUpgradeDomainBlockPlacementPolicy {
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null);
null, null, fileStatus.getFileId(), null, null);
assertEquals("Block should be allocated sufficient locations",
REPLICATION_FACTOR, locatedBlock.getLocations().length);

View File

@ -837,7 +837,7 @@ public class TestHASafeMode {
new ExtendedBlock(previousBlock),
new DatanodeInfo[0],
DFSClientAdapter.getFileId((DFSOutputStream) create
.getWrappedStream()), null);
.getWrappedStream()), null, null);
cluster.restartNameNode(0, true);
cluster.restartDataNode(0);
cluster.transitionToActive(0);

View File

@ -198,7 +198,7 @@ public class TestOpenFilesWithSnapshot {
String clientName = fs.getClient().getClientName();
// create one empty block
nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null);
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
fs.createSnapshot(path, "s2");
fs.rename(new Path("/test/test"), new Path("/test/test-renamed"));

View File

@ -282,7 +282,7 @@ public class TestSnapshotBlocksMap {
ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
cluster.getNameNodeRpc()
.addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
null, barNode.getId(), null);
null, barNode.getId(), null, null);
SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");
@ -319,7 +319,7 @@ public class TestSnapshotBlocksMap {
ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
cluster.getNameNodeRpc()
.addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
null, barNode.getId(), null);
null, barNode.getId(), null, null);
SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");
@ -358,7 +358,7 @@ public class TestSnapshotBlocksMap {
ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
cluster.getNameNodeRpc()
.addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
null, barNode.getId(), null);
null, barNode.getId(), null, null);
SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");