HDFS-3702. Add an option for NOT writing the blocks locally if there is a datanode on the same box as the client. (Contributed by Lei (Eddy) Xu)
(cherry picked from commit 0a152103f1
)
This commit is contained in:
parent
50b7a35d56
commit
4289cb8b36
|
@ -103,7 +103,14 @@ public enum CreateFlag {
|
|||
* Append data to a new block instead of the end of the last partial block.
|
||||
* This is only useful for APPEND.
|
||||
*/
|
||||
NEW_BLOCK((short) 0x20);
|
||||
NEW_BLOCK((short) 0x20),
|
||||
|
||||
/**
|
||||
* Advise that a block replica NOT be written to the local DataNode where
|
||||
* 'local' means the same host as the client is being run on.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HBase"})
|
||||
NO_LOCAL_WRITE((short) 0x40);
|
||||
|
||||
private final short mode;
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
||||
/**
|
||||
* AddBlockFlag provides hints for new block allocation and placement.
|
||||
* Users can use this flag to control <em>per DFSOutputStream</em>
|
||||
* {@see ClientProtocol#addBlock()} behavior.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public enum AddBlockFlag {
|
||||
|
||||
/**
|
||||
* Advise that a block replica NOT be written to the local DataNode where
|
||||
* 'local' means the same host as the client is being run on.
|
||||
*
|
||||
* @see CreateFlag#NO_LOCAL_WRITE
|
||||
*/
|
||||
NO_LOCAL_WRITE((short) 0x01);
|
||||
|
||||
private final short mode;
|
||||
|
||||
AddBlockFlag(short mode) {
|
||||
this.mode = mode;
|
||||
}
|
||||
|
||||
public static AddBlockFlag valueOf(short mode) {
|
||||
for (AddBlockFlag flag : AddBlockFlag.values()) {
|
||||
if (flag.getMode() == mode) {
|
||||
return flag;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public short getMode() {
|
||||
return mode;
|
||||
}
|
||||
}
|
|
@ -116,6 +116,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private long initialFileSize = 0; // at time of file open
|
||||
private final short blockReplication; // replication factor of file
|
||||
protected boolean shouldSyncBlock = false; // force blocks to disk upon close
|
||||
private final EnumSet<AddBlockFlag> addBlockFlags;
|
||||
protected final AtomicReference<CachingStrategy> cachingStrategy;
|
||||
private FileEncryptionInfo fileEncryptionInfo;
|
||||
|
||||
|
@ -178,6 +179,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
private DFSOutputStream(DFSClient dfsClient, String src,
|
||||
EnumSet<CreateFlag> flag,
|
||||
Progressable progress, HdfsFileStatus stat, DataChecksum checksum) {
|
||||
super(getChecksum4Compute(checksum, stat));
|
||||
this.dfsClient = dfsClient;
|
||||
|
@ -188,6 +190,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
||||
this.cachingStrategy = new AtomicReference<>(
|
||||
dfsClient.getDefaultWriteCachingStrategy());
|
||||
this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class);
|
||||
if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
|
||||
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
|
||||
}
|
||||
if (progress != null) {
|
||||
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
|
||||
+"{}", src);
|
||||
|
@ -211,14 +217,14 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||
EnumSet<CreateFlag> flag, Progressable progress,
|
||||
DataChecksum checksum, String[] favoredNodes) throws IOException {
|
||||
this(dfsClient, src, progress, stat, checksum);
|
||||
this(dfsClient, src, flag, progress, stat, checksum);
|
||||
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
|
||||
|
||||
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
|
||||
bytesPerChecksum);
|
||||
|
||||
streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
|
||||
cachingStrategy, byteArrayManager, favoredNodes);
|
||||
cachingStrategy, byteArrayManager, favoredNodes, addBlockFlags);
|
||||
}
|
||||
|
||||
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
|
@ -280,7 +286,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
|
||||
HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
|
||||
throws IOException {
|
||||
this(dfsClient, src, progress, stat, checksum);
|
||||
this(dfsClient, src, flags, progress, stat, checksum);
|
||||
initialFileSize = stat.getLen(); // length of file when opened
|
||||
this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
|
||||
|
||||
|
@ -301,7 +307,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
bytesPerChecksum);
|
||||
streamer = new DataStreamer(stat,
|
||||
lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src,
|
||||
progress, checksum, cachingStrategy, byteArrayManager, favoredNodes);
|
||||
progress, checksum, cachingStrategy, byteArrayManager, favoredNodes,
|
||||
addBlockFlags);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -848,6 +855,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
return initialFileSize;
|
||||
}
|
||||
|
||||
protected EnumSet<AddBlockFlag> getAddBlockFlags() {
|
||||
return addBlockFlags;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the FileEncryptionInfo for this stream, or null if not encrypted.
|
||||
*/
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.net.Socket;
|
|||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -392,12 +393,15 @@ class DataStreamer extends Daemon {
|
|||
|
||||
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
|
||||
private final String[] favoredNodes;
|
||||
private final EnumSet<AddBlockFlag> addBlockFlags;
|
||||
|
||||
private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
|
||||
Progressable progress, DataChecksum checksum,
|
||||
AtomicReference<CachingStrategy> cachingStrategy,
|
||||
ByteArrayManager byteArrayManage,
|
||||
boolean isAppend, String[] favoredNodes) {
|
||||
boolean isAppend, String[] favoredNodes,
|
||||
EnumSet<AddBlockFlag> flags) {
|
||||
this.block = block;
|
||||
this.dfsClient = dfsClient;
|
||||
this.src = src;
|
||||
this.progress = progress;
|
||||
|
@ -408,11 +412,11 @@ class DataStreamer extends Daemon {
|
|||
this.isLazyPersistFile = isLazyPersist(stat);
|
||||
this.isAppend = isAppend;
|
||||
this.favoredNodes = favoredNodes;
|
||||
|
||||
final DfsClientConf conf = dfsClient.getConf();
|
||||
this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
|
||||
this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
|
||||
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
|
||||
this.addBlockFlags = flags;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -421,9 +425,10 @@ class DataStreamer extends Daemon {
|
|||
DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
|
||||
String src, Progressable progress, DataChecksum checksum,
|
||||
AtomicReference<CachingStrategy> cachingStrategy,
|
||||
ByteArrayManager byteArrayManage, String[] favoredNodes) {
|
||||
ByteArrayManager byteArrayManage, String[] favoredNodes,
|
||||
EnumSet<AddBlockFlag> flags) {
|
||||
this(stat, dfsClient, src, progress, checksum, cachingStrategy,
|
||||
byteArrayManage, false, favoredNodes);
|
||||
byteArrayManage, false, favoredNodes, flags);
|
||||
this.block = block;
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
}
|
||||
|
@ -438,7 +443,7 @@ class DataStreamer extends Daemon {
|
|||
AtomicReference<CachingStrategy> cachingStrategy,
|
||||
ByteArrayManager byteArrayManage) throws IOException {
|
||||
this(stat, dfsClient, src, progress, checksum, cachingStrategy,
|
||||
byteArrayManage, true, null);
|
||||
byteArrayManage, true, null, null);
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
|
||||
block = lastBlock.getBlock();
|
||||
bytesSent = block.getNumBytes();
|
||||
|
@ -1643,7 +1648,8 @@ class DataStreamer extends Daemon {
|
|||
while (true) {
|
||||
try {
|
||||
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
|
||||
block, excludedNodes, stat.getFileId(), favoredNodes);
|
||||
block, excludedNodes, stat.getFileId(), favoredNodes,
|
||||
addBlockFlags);
|
||||
} catch (RemoteException e) {
|
||||
IOException ue =
|
||||
e.unwrapRemoteException(FileNotFoundException.class,
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
@ -390,6 +391,8 @@ public interface ClientProtocol {
|
|||
* @param fileId the id uniquely identifying a file
|
||||
* @param favoredNodes the list of nodes where the client wants the blocks.
|
||||
* Nodes are identified by either host name or address.
|
||||
* @param addBlockFlags flags to advise the behavior of allocating and placing
|
||||
* a new block.
|
||||
*
|
||||
* @return LocatedBlock allocated block information.
|
||||
*
|
||||
|
@ -408,7 +411,7 @@ public interface ClientProtocol {
|
|||
@Idempotent
|
||||
LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
|
||||
String[] favoredNodes)
|
||||
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
|
|||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
|
@ -396,7 +397,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
@Override
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
|
||||
String[] favoredNodes) throws IOException {
|
||||
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
|
||||
throws IOException {
|
||||
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
|
||||
.setSrc(src).setClientName(clientName).setFileId(fileId);
|
||||
if (previous != null)
|
||||
|
@ -406,6 +408,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
if (favoredNodes != null) {
|
||||
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
|
||||
}
|
||||
if (addBlockFlags != null) {
|
||||
req.addAllFlags(PBHelperClient.convertAddBlockFlags(
|
||||
addBlockFlags));
|
||||
}
|
||||
try {
|
||||
return PBHelperClient.convert(rpcProxy.addBlock(null, req.build()).getBlock());
|
||||
} catch (ServiceException e) {
|
||||
|
|
|
@ -36,6 +36,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
import org.apache.hadoop.crypto.CipherOption;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
@ -94,6 +95,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp
|
|||
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockFlagProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
|
||||
|
@ -2394,4 +2396,29 @@ public class PBHelperClient {
|
|||
}
|
||||
return Arrays.asList(ret);
|
||||
}
|
||||
|
||||
public static EnumSet<AddBlockFlag> convertAddBlockFlags(
|
||||
List<AddBlockFlagProto> addBlockFlags) {
|
||||
EnumSet<AddBlockFlag> flags =
|
||||
EnumSet.noneOf(AddBlockFlag.class);
|
||||
for (AddBlockFlagProto af : addBlockFlags) {
|
||||
AddBlockFlag flag = AddBlockFlag.valueOf((short)af.getNumber());
|
||||
if (flag != null) {
|
||||
flags.add(flag);
|
||||
}
|
||||
}
|
||||
return flags;
|
||||
}
|
||||
|
||||
public static List<AddBlockFlagProto> convertAddBlockFlags(
|
||||
EnumSet<AddBlockFlag> flags) {
|
||||
List<AddBlockFlagProto> ret = new ArrayList<>();
|
||||
for (AddBlockFlag flag : flags) {
|
||||
AddBlockFlagProto abfp = AddBlockFlagProto.valueOf(flag.getMode());
|
||||
if (abfp != null) {
|
||||
ret.add(abfp);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -161,6 +161,10 @@ message AbandonBlockRequestProto {
|
|||
message AbandonBlockResponseProto { // void response
|
||||
}
|
||||
|
||||
enum AddBlockFlagProto {
|
||||
NO_LOCAL_WRITE = 1; // avoid writing to local node.
|
||||
}
|
||||
|
||||
message AddBlockRequestProto {
|
||||
required string src = 1;
|
||||
required string clientName = 2;
|
||||
|
@ -168,6 +172,7 @@ message AddBlockRequestProto {
|
|||
repeated DatanodeInfoProto excludeNodes = 4;
|
||||
optional uint64 fileId = 5 [default = 0]; // default as a bogus id
|
||||
repeated string favoredNodes = 6; //the set of datanodes to use for the block
|
||||
repeated AddBlockFlagProto flags = 7; // default to empty.
|
||||
}
|
||||
|
||||
message AddBlockResponseProto {
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
|
@ -500,6 +501,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
try {
|
||||
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
|
||||
List<String> favor = req.getFavoredNodesList();
|
||||
EnumSet<AddBlockFlag> flags =
|
||||
PBHelperClient.convertAddBlockFlags(req.getFlagsList());
|
||||
LocatedBlock result = server.addBlock(
|
||||
req.getSrc(),
|
||||
req.getClientName(),
|
||||
|
@ -507,7 +510,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
(excl == null || excl.size() == 0) ? null : PBHelperClient.convert(excl
|
||||
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
|
||||
(favor == null || favor.size() == 0) ? null : favor
|
||||
.toArray(new String[favor.size()]));
|
||||
.toArray(new String[favor.size()]),
|
||||
flags);
|
||||
return AddBlockResponseProto.newBuilder()
|
||||
.setBlock(PBHelperClient.convert(result)).build();
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -47,6 +47,8 @@ import javax.management.ObjectName;
|
|||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -1591,7 +1593,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
|
||||
return blockplacement.chooseTarget(src, 1, clientnode,
|
||||
Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
|
||||
blocksize, storagePolicySuite.getDefaultPolicy());
|
||||
blocksize, storagePolicySuite.getDefaultPolicy(), null);
|
||||
}
|
||||
|
||||
/** Choose target for getting additional datanodes for an existing pipeline. */
|
||||
|
@ -1605,7 +1607,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
|
||||
return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
|
||||
chosen, true, excludes, blocksize, storagePolicy);
|
||||
chosen, true, excludes, blocksize, storagePolicy, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1621,13 +1623,14 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
final Set<Node> excludedNodes,
|
||||
final long blocksize,
|
||||
final List<String> favoredNodes,
|
||||
final byte storagePolicyID) throws IOException {
|
||||
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
||||
final byte storagePolicyID,
|
||||
final EnumSet<AddBlockFlag> flags) throws IOException {
|
||||
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
||||
getDatanodeDescriptors(favoredNodes);
|
||||
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
|
||||
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
|
||||
numOfReplicas, client, excludedNodes, blocksize,
|
||||
favoredDatanodeDescriptors, storagePolicy);
|
||||
favoredDatanodeDescriptors, storagePolicy, flags);
|
||||
if (targets.length < minReplication) {
|
||||
throw new IOException("File " + src + " could only be replicated to "
|
||||
+ targets.length + " nodes instead of minReplication (="
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -26,6 +27,7 @@ import java.util.Set;
|
|||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -66,6 +68,7 @@ public abstract class BlockPlacementPolicy {
|
|||
* @param returnChosenNodes decide if the chosenNodes are returned.
|
||||
* @param excludedNodes datanodes that should not be considered as targets.
|
||||
* @param blocksize size of the data to be written.
|
||||
* @param flags Block placement flags.
|
||||
* @return array of DatanodeDescriptor instances chosen as target
|
||||
* and sorted as a pipeline.
|
||||
*/
|
||||
|
@ -76,7 +79,8 @@ public abstract class BlockPlacementPolicy {
|
|||
boolean returnChosenNodes,
|
||||
Set<Node> excludedNodes,
|
||||
long blocksize,
|
||||
BlockStoragePolicy storagePolicy);
|
||||
BlockStoragePolicy storagePolicy,
|
||||
EnumSet<AddBlockFlag> flags);
|
||||
|
||||
/**
|
||||
* Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
|
||||
|
@ -90,14 +94,15 @@ public abstract class BlockPlacementPolicy {
|
|||
Set<Node> excludedNodes,
|
||||
long blocksize,
|
||||
List<DatanodeDescriptor> favoredNodes,
|
||||
BlockStoragePolicy storagePolicy) {
|
||||
BlockStoragePolicy storagePolicy,
|
||||
EnumSet<AddBlockFlag> flags) {
|
||||
// This class does not provide the functionality of placing
|
||||
// a block in favored datanodes. The implementations of this class
|
||||
// are expected to provide this functionality
|
||||
|
||||
return chooseTarget(src, numOfReplicas, writer,
|
||||
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
|
||||
excludedNodes, blocksize, storagePolicy);
|
||||
excludedNodes, blocksize, storagePolicy, flags);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.*;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -111,9 +112,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
boolean returnChosenNodes,
|
||||
Set<Node> excludedNodes,
|
||||
long blocksize,
|
||||
final BlockStoragePolicy storagePolicy) {
|
||||
final BlockStoragePolicy storagePolicy,
|
||||
EnumSet<AddBlockFlag> flags) {
|
||||
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
|
||||
excludedNodes, blocksize, storagePolicy);
|
||||
excludedNodes, blocksize, storagePolicy, flags);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,13 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
Set<Node> excludedNodes,
|
||||
long blocksize,
|
||||
List<DatanodeDescriptor> favoredNodes,
|
||||
BlockStoragePolicy storagePolicy) {
|
||||
BlockStoragePolicy storagePolicy,
|
||||
EnumSet<AddBlockFlag> flags) {
|
||||
try {
|
||||
if (favoredNodes == null || favoredNodes.size() == 0) {
|
||||
// Favored nodes not specified, fall back to regular block placement.
|
||||
return chooseTarget(src, numOfReplicas, writer,
|
||||
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
|
||||
excludedNodes, blocksize, storagePolicy);
|
||||
excludedNodes, blocksize, storagePolicy, flags);
|
||||
}
|
||||
|
||||
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
|
||||
|
@ -164,7 +167,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
DatanodeStorageInfo[] remainingTargets =
|
||||
chooseTarget(src, numOfReplicas, writer,
|
||||
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
|
||||
favoriteAndExcludedNodes, blocksize, storagePolicy);
|
||||
favoriteAndExcludedNodes, blocksize, storagePolicy, flags);
|
||||
for (int i = 0; i < remainingTargets.length; i++) {
|
||||
results.add(remainingTargets[i]);
|
||||
}
|
||||
|
@ -179,7 +182,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
// Fall back to regular block placement disregarding favored nodes hint
|
||||
return chooseTarget(src, numOfReplicas, writer,
|
||||
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
|
||||
excludedNodes, blocksize, storagePolicy);
|
||||
excludedNodes, blocksize, storagePolicy, flags);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -213,7 +216,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
boolean returnChosenNodes,
|
||||
Set<Node> excludedNodes,
|
||||
long blocksize,
|
||||
final BlockStoragePolicy storagePolicy) {
|
||||
final BlockStoragePolicy storagePolicy,
|
||||
EnumSet<AddBlockFlag> addBlockFlags) {
|
||||
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
||||
return DatanodeStorageInfo.EMPTY_ARRAY;
|
||||
}
|
||||
|
@ -226,17 +230,42 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
numOfReplicas = result[0];
|
||||
int maxNodesPerRack = result[1];
|
||||
|
||||
final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage);
|
||||
for (DatanodeStorageInfo storage : chosenStorage) {
|
||||
// add localMachine and related nodes to excludedNodes
|
||||
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
||||
}
|
||||
|
||||
List<DatanodeStorageInfo> results = null;
|
||||
Node localNode = null;
|
||||
boolean avoidStaleNodes = (stats != null
|
||||
&& stats.isAvoidingStaleDataNodesForWrite());
|
||||
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
|
||||
EnumSet.noneOf(StorageType.class), results.isEmpty());
|
||||
boolean avoidLocalNode = (addBlockFlags != null
|
||||
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
|
||||
&& writer != null
|
||||
&& !excludedNodes.contains(writer));
|
||||
// Attempt to exclude local node if the client suggests so. If no enough
|
||||
// nodes can be obtained, it falls back to the default block placement
|
||||
// policy.
|
||||
if (avoidLocalNode) {
|
||||
results = new ArrayList<>(chosenStorage);
|
||||
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
|
||||
excludedNodeCopy.add(writer);
|
||||
localNode = chooseTarget(numOfReplicas, writer,
|
||||
excludedNodeCopy, blocksize, maxNodesPerRack, results,
|
||||
avoidStaleNodes, storagePolicy,
|
||||
EnumSet.noneOf(StorageType.class), results.isEmpty());
|
||||
if (results.size() < numOfReplicas) {
|
||||
// not enough nodes; discard results and fall back
|
||||
results = null;
|
||||
}
|
||||
}
|
||||
if (results == null) {
|
||||
results = new ArrayList<>(chosenStorage);
|
||||
localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results, avoidStaleNodes,
|
||||
storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty());
|
||||
}
|
||||
|
||||
if (!returnChosenNodes) {
|
||||
results.removeAll(chosenStorage);
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ class ReplicationWork {
|
|||
targets = blockplacement.chooseTarget(bc.getName(),
|
||||
additionalReplRequired, srcNode, liveReplicaStorages, false,
|
||||
excludedNodes, block.getNumBytes(),
|
||||
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
|
||||
storagePolicySuite.getPolicy(bc.getStoragePolicyID()), null);
|
||||
} finally {
|
||||
srcNode.decrementPendingReplicationWithoutTargets();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
|
|||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
|
@ -265,8 +266,9 @@ class FSDirWriteFileOp {
|
|||
}
|
||||
|
||||
static DatanodeStorageInfo[] chooseTargetForNewBlock(
|
||||
BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[]
|
||||
favoredNodes, ValidateAddBlockResult r) throws IOException {
|
||||
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
|
||||
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
|
||||
ValidateAddBlockResult r) throws IOException {
|
||||
Node clientNode = bm.getDatanodeManager()
|
||||
.getDatanodeByHost(r.clientMachine);
|
||||
if (clientNode == null) {
|
||||
|
@ -280,11 +282,10 @@ class FSDirWriteFileOp {
|
|||
}
|
||||
List<String> favoredNodesList = (favoredNodes == null) ? null
|
||||
: Arrays.asList(favoredNodes);
|
||||
|
||||
// choose targets for the new block to be allocated.
|
||||
return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
|
||||
excludedNodesSet, r.blockSize,
|
||||
favoredNodesList, r.storagePolicyID);
|
||||
favoredNodesList, r.storagePolicyID, flags);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -141,6 +141,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
|
|||
import org.apache.hadoop.crypto.CryptoCodec;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.Metadata;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
|
@ -2442,7 +2443,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
*/
|
||||
LocatedBlock getAdditionalBlock(
|
||||
String src, long fileId, String clientName, ExtendedBlock previous,
|
||||
DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException {
|
||||
DatanodeInfo[] excludedNodes, String[] favoredNodes,
|
||||
EnumSet<AddBlockFlag> flags) throws IOException {
|
||||
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
|
||||
" for {}", src, fileId, clientName);
|
||||
|
||||
|
@ -2466,7 +2468,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
|
||||
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
|
||||
blockManager, src, excludedNodes, favoredNodes, r);
|
||||
blockManager, src, excludedNodes, favoredNodes, flags, r);
|
||||
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
|
@ -832,11 +833,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
@Override
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
|
||||
String[] favoredNodes)
|
||||
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
|
||||
throws IOException {
|
||||
checkNNStartup();
|
||||
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
|
||||
clientName, previous, excludedNodes, favoredNodes);
|
||||
clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
|
||||
if (locatedBlock != null) {
|
||||
metrics.incrAddBlockOps();
|
||||
}
|
||||
|
@ -1143,7 +1144,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
DatanodeInfo results[] = namesystem.datanodeReport(type);
|
||||
return results;
|
||||
}
|
||||
|
||||
|
||||
@Override // ClientProtocol
|
||||
public DatanodeStorageReport[] getDatanodeStorageReport(
|
||||
DatanodeReportType type) throws IOException {
|
||||
|
|
|
@ -1932,7 +1932,7 @@ public class DFSTestUtil {
|
|||
String clientName, ExtendedBlock previous, int len)
|
||||
throws Exception {
|
||||
fs.getClient().namenode.addBlock(file, clientName, previous, null,
|
||||
fileNode.getId(), null);
|
||||
fileNode.getId(), null, null);
|
||||
|
||||
final BlockInfo lastBlock = fileNode.getLastBlock();
|
||||
final int groupSize = fileNode.getPreferredBlockReplication();
|
||||
|
|
|
@ -1237,12 +1237,12 @@ public class TestBlockStoragePolicy {
|
|||
|
||||
DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
|
||||
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
|
||||
new HashSet<Node>(), 0, policy1);
|
||||
new HashSet<Node>(), 0, policy1, null);
|
||||
System.out.println(Arrays.asList(targets));
|
||||
Assert.assertEquals(3, targets.length);
|
||||
targets = replicator.chooseTarget("/foo", 3,
|
||||
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
|
||||
new HashSet<Node>(), 0, policy2);
|
||||
new HashSet<Node>(), 0, policy2, null);
|
||||
System.out.println(Arrays.asList(targets));
|
||||
Assert.assertEquals(3, targets.length);
|
||||
}
|
||||
|
@ -1308,7 +1308,7 @@ public class TestBlockStoragePolicy {
|
|||
|
||||
DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
|
||||
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
|
||||
new HashSet<Node>(), 0, policy);
|
||||
new HashSet<Node>(), 0, policy, null);
|
||||
System.out.println(policy.getName() + ": " + Arrays.asList(targets));
|
||||
Assert.assertEquals(2, targets.length);
|
||||
Assert.assertEquals(StorageType.SSD, targets[0].getStorageType());
|
||||
|
|
|
@ -44,6 +44,7 @@ import java.net.URI;
|
|||
import java.security.MessageDigest;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -96,6 +97,7 @@ import org.apache.log4j.Level;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.stubbing.answers.ThrowsException;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
|
@ -252,7 +254,9 @@ public class TestDFSClientRetries {
|
|||
anyString(),
|
||||
any(ExtendedBlock.class),
|
||||
any(DatanodeInfo[].class),
|
||||
anyLong(), any(String[].class))).thenAnswer(answer);
|
||||
anyLong(), any(String[].class),
|
||||
Matchers.<EnumSet<AddBlockFlag>>any()))
|
||||
.thenAnswer(answer);
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
|
@ -471,7 +475,8 @@ public class TestDFSClientRetries {
|
|||
}
|
||||
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
|
||||
Mockito.anyLong(), Mockito.<String[]> any());
|
||||
Mockito.anyLong(), Mockito.<String[]> any(),
|
||||
Mockito.<EnumSet<AddBlockFlag>> any());
|
||||
|
||||
doAnswer(new Answer<Boolean>() {
|
||||
|
||||
|
@ -513,7 +518,8 @@ public class TestDFSClientRetries {
|
|||
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
|
||||
Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
|
||||
Mockito.anyLong(), Mockito.<String[]> any());
|
||||
Mockito.anyLong(), Mockito.<String[]> any(),
|
||||
Mockito.<EnumSet<AddBlockFlag>> any());
|
||||
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
|
||||
Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock>any(), anyLong());
|
||||
|
|
|
@ -22,19 +22,29 @@ import java.io.IOException;
|
|||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FsTracer;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.htrace.core.SpanId;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -42,8 +52,11 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestDFSOutputStream {
|
||||
|
@ -52,7 +65,7 @@ public class TestDFSOutputStream {
|
|||
@BeforeClass
|
||||
public static void setup() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -80,7 +93,7 @@ public class TestDFSOutputStream {
|
|||
try {
|
||||
dos.close();
|
||||
} catch (IOException e) {
|
||||
Assert.assertEquals(e, dummy);
|
||||
assertEquals(e, dummy);
|
||||
}
|
||||
thrown = (Throwable) Whitebox.getInternalState(ex, "thrown");
|
||||
Assert.assertNull(thrown);
|
||||
|
@ -127,7 +140,7 @@ public class TestDFSOutputStream {
|
|||
mock(HdfsFileStatus.class),
|
||||
mock(ExtendedBlock.class),
|
||||
client,
|
||||
"foo", null, null, null, null, null);
|
||||
"foo", null, null, null, null, null, null);
|
||||
|
||||
DataOutputStream blockStream = mock(DataOutputStream.class);
|
||||
doThrow(new IOException()).when(blockStream).flush();
|
||||
|
@ -148,6 +161,47 @@ public class TestDFSOutputStream {
|
|||
Assert.assertTrue(congestedNodes.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoLocalWriteFlag() throws IOException {
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.NO_LOCAL_WRITE,
|
||||
CreateFlag.CREATE);
|
||||
BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
|
||||
DatanodeManager dm = bm.getDatanodeManager();
|
||||
try(FSDataOutputStream os = fs.create(new Path("/test-no-local"),
|
||||
FsPermission.getDefault(),
|
||||
flags, 512, (short)2, 512, null)) {
|
||||
// Inject a DatanodeManager that returns one DataNode as local node for
|
||||
// the client.
|
||||
DatanodeManager spyDm = spy(dm);
|
||||
DatanodeDescriptor dn1 = dm.getDatanodeListForReport
|
||||
(HdfsConstants.DatanodeReportType.LIVE).get(0);
|
||||
doReturn(dn1).when(spyDm).getDatanodeByHost("127.0.0.1");
|
||||
Whitebox.setInternalState(bm, "datanodeManager", spyDm);
|
||||
byte[] buf = new byte[512 * 16];
|
||||
new Random().nextBytes(buf);
|
||||
os.write(buf);
|
||||
} finally {
|
||||
Whitebox.setInternalState(bm, "datanodeManager", dm);
|
||||
}
|
||||
cluster.triggerBlockReports();
|
||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
// Total number of DataNodes is 3.
|
||||
assertEquals(3, cluster.getAllBlockReports(bpid).size());
|
||||
int numDataNodesWithData = 0;
|
||||
for (Map<DatanodeStorage, BlockListAsLongs> dnBlocks :
|
||||
cluster.getAllBlockReports(bpid)) {
|
||||
for (BlockListAsLongs blocks : dnBlocks.values()) {
|
||||
if (blocks.getNumberOfBlocks() > 0) {
|
||||
numDataNodesWithData++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Verify that only one DN has no data.
|
||||
assertEquals(1, 3 - numDataNodesWithData);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
if (cluster != null) {
|
||||
|
|
|
@ -531,7 +531,7 @@ public class TestFileCreation {
|
|||
|
||||
// add one block to the file
|
||||
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
|
||||
client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||
client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||
System.out.println("testFileCreationError2: "
|
||||
+ "Added block " + location.getBlock());
|
||||
|
||||
|
@ -582,7 +582,7 @@ public class TestFileCreation {
|
|||
createFile(dfs, f, 3);
|
||||
try {
|
||||
cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
|
||||
null, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||
null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||
fail();
|
||||
} catch(IOException ioe) {
|
||||
FileSystem.LOG.info("GOOD!", ioe);
|
||||
|
|
|
@ -156,6 +156,6 @@ abstract public class BaseReplicationPolicyTest {
|
|||
Set<Node> excludedNodes) {
|
||||
return replicator.chooseTarget(filename, numOfReplicas, writer,
|
||||
chosenNodes, false, excludedNodes, BLOCK_SIZE,
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
}
|
||||
}
|
|
@ -142,7 +142,7 @@ public class TestAvailableSpaceBlockPlacementPolicy {
|
|||
.getBlockManager()
|
||||
.getBlockPlacementPolicy()
|
||||
.chooseTarget(file, replica, null, new ArrayList<DatanodeStorageInfo>(), false, null,
|
||||
blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
|
||||
Assert.assertTrue(targets.length == replica);
|
||||
for (int j = 0; j < replica; j++) {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -30,6 +31,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -38,6 +40,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -290,7 +293,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
excludedNodes.add(dataNodes[1]);
|
||||
chosenNodes.add(storages[2]);
|
||||
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
|
||||
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
|
||||
null);
|
||||
System.out.println("targets=" + Arrays.asList(targets));
|
||||
assertEquals(2, targets.length);
|
||||
//make sure that the chosen node is in the target.
|
||||
|
@ -667,7 +671,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
|
||||
DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
|
||||
staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
|
||||
BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
|
||||
null);
|
||||
|
||||
assertEquals(targets.length, 3);
|
||||
assertFalse(isOnSameRack(targets[0], staleNodeInfo));
|
||||
|
@ -693,7 +698,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
// Call chooseTarget
|
||||
targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
|
||||
new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
assertEquals(targets.length, 3);
|
||||
assertTrue(isOnSameRack(targets[0], staleNodeInfo));
|
||||
|
||||
|
@ -1490,8 +1495,42 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
||||
DatanodeDescriptor writer, Set<Node> excludedNodes,
|
||||
List<DatanodeDescriptor> favoredNodes) {
|
||||
return chooseTarget(numOfReplicas, writer, excludedNodes,
|
||||
favoredNodes, null);
|
||||
}
|
||||
|
||||
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
||||
DatanodeDescriptor writer, Set<Node> excludedNodes,
|
||||
List<DatanodeDescriptor> favoredNodes, EnumSet<AddBlockFlag> flags) {
|
||||
return replicator.chooseTarget(filename, numOfReplicas, writer,
|
||||
excludedNodes, BLOCK_SIZE, favoredNodes,
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, flags);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvoidLocalWrite() throws IOException {
|
||||
DatanodeDescriptor writer = dataNodes[2];
|
||||
EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
|
||||
DatanodeStorageInfo[] targets;
|
||||
targets = chooseTarget(5, writer, null, null, flags);
|
||||
for (DatanodeStorageInfo info : targets) {
|
||||
assertNotEquals(info.getDatanodeDescriptor(), writer);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvoidLocalWriteNoEnoughNodes() throws IOException {
|
||||
DatanodeDescriptor writer = dataNodes[2];
|
||||
EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
|
||||
DatanodeStorageInfo[] targets;
|
||||
targets = chooseTarget(6, writer, null, null, flags);
|
||||
assertEquals(6, targets.length);
|
||||
boolean found = false;
|
||||
for (DatanodeStorageInfo info : targets) {
|
||||
if (info.getDatanodeDescriptor().equals(writer)) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
assertTrue(found);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad
|
|||
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
||||
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
|
||||
writerDn, new ArrayList<DatanodeStorageInfo>(), false, null,
|
||||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
|
||||
assertEquals(3, targets.length);
|
||||
Set<DatanodeStorageInfo> targetSet = new HashSet<>(
|
||||
|
@ -170,7 +170,7 @@ public class TestReplicationPolicyConsiderLoad
|
|||
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
||||
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn,
|
||||
new ArrayList<DatanodeStorageInfo>(), false, null,
|
||||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
for(DatanodeStorageInfo info : targets) {
|
||||
assertTrue("The node "+info.getDatanodeDescriptor().getName()+
|
||||
" has higher load and should not have been picked!",
|
||||
|
|
|
@ -269,7 +269,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
List<DatanodeDescriptor> favoredNodes) {
|
||||
return replicator.chooseTarget(filename, numOfReplicas, writer,
|
||||
excludedNodes, BLOCK_SIZE, favoredNodes,
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -351,7 +351,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
Set<Node> excludedNodes = new HashSet<>();
|
||||
excludedNodes.add(dataNodes[1]);
|
||||
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
|
||||
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
|
||||
null);
|
||||
assertEquals(targets.length, 4);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
|
||||
|
@ -369,7 +370,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
excludedNodes.add(dataNodes[1]);
|
||||
chosenNodes.add(storages[2]);
|
||||
targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
|
||||
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
|
||||
null);
|
||||
System.out.println("targets=" + Arrays.asList(targets));
|
||||
assertEquals(2, targets.length);
|
||||
//make sure that the chosen node is in the target.
|
||||
|
|
|
@ -187,7 +187,7 @@ public class TestReplicationPolicyWithUpgradeDomain
|
|||
chosenNodes.add(storages[2]);
|
||||
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes,
|
||||
true, excludedNodes, BLOCK_SIZE,
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
System.out.println("targets=" + Arrays.asList(targets));
|
||||
assertEquals(2, targets.length);
|
||||
}
|
||||
|
|
|
@ -1204,7 +1204,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
for (int i = 0; i < 30; i++) {
|
||||
try {
|
||||
return clientProto.addBlock(src, clientName,
|
||||
previous, excludeNodes, fileId, favoredNodes);
|
||||
previous, excludeNodes, fileId, favoredNodes, null);
|
||||
} catch (NotReplicatedYetException|RemoteException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
String className = ((RemoteException) e).getClassName();
|
||||
|
|
|
@ -101,13 +101,13 @@ public class TestAddBlockRetry {
|
|||
ns.readUnlock();;
|
||||
}
|
||||
DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock(
|
||||
ns.getBlockManager(), src, null, null, r);
|
||||
ns.getBlockManager(), src, null, null, null, r);
|
||||
assertNotNull("Targets must be generated", targets);
|
||||
|
||||
// run second addBlock()
|
||||
LOG.info("Starting second addBlock for " + src);
|
||||
nn.addBlock(src, "clientName", null, null,
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||
assertTrue("Penultimate block must be complete",
|
||||
checkFileProgress(src, false));
|
||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||
|
@ -161,14 +161,14 @@ public class TestAddBlockRetry {
|
|||
// start first addBlock()
|
||||
LOG.info("Starting first addBlock for " + src);
|
||||
LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null,
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||
assertTrue("Block locations should be present",
|
||||
lb1.getLocations().length > 0);
|
||||
|
||||
cluster.restartNameNode();
|
||||
nameNodeRpc = cluster.getNameNodeRpc();
|
||||
LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null,
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||
assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
|
||||
assertTrue("Wrong locations with retry", lb2.getLocations().length > 0);
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
|
|||
|
||||
//test chooseTarget for new file
|
||||
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
|
||||
null, null, fileStatus.getFileId(), null);
|
||||
null, null, fileStatus.getFileId(), null, null);
|
||||
doTestLocatedBlock(replication, locatedBlock);
|
||||
|
||||
//test chooseTarget for existing file.
|
||||
|
@ -143,7 +143,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
|
|||
|
||||
//test chooseTarget for new file
|
||||
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
|
||||
null, null, fileStatus.getFileId(), null);
|
||||
null, null, fileStatus.getFileId(), null, null);
|
||||
doTestLocatedBlock(20, locatedBlock);
|
||||
|
||||
DatanodeInfo[] locs = locatedBlock.getLocations();
|
||||
|
|
|
@ -165,7 +165,8 @@ public class TestDeadDatanode {
|
|||
// choose the targets, but local node should not get selected as this is not
|
||||
// part of the cluster anymore
|
||||
DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3,
|
||||
clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7);
|
||||
clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7,
|
||||
null);
|
||||
for (DatanodeStorageInfo datanodeStorageInfo : results) {
|
||||
assertFalse("Dead node should not be choosen", datanodeStorageInfo
|
||||
.getDatanodeDescriptor().equals(clientNode));
|
||||
|
|
|
@ -140,7 +140,7 @@ public class TestDefaultBlockPlacementPolicy {
|
|||
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
|
||||
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
|
||||
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
|
||||
null, null, fileStatus.getFileId(), null);
|
||||
null, null, fileStatus.getFileId(), null, null);
|
||||
|
||||
assertEquals("Block should be allocated sufficient locations",
|
||||
REPLICATION_FACTOR, locatedBlock.getLocations().length);
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -28,6 +29,7 @@ import java.util.Set;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -123,10 +125,11 @@ public class TestDeleteRace {
|
|||
boolean returnChosenNodes,
|
||||
Set<Node> excludedNodes,
|
||||
long blocksize,
|
||||
final BlockStoragePolicy storagePolicy) {
|
||||
final BlockStoragePolicy storagePolicy,
|
||||
EnumSet<AddBlockFlag> flags) {
|
||||
DatanodeStorageInfo[] results = super.chooseTarget(srcPath,
|
||||
numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes,
|
||||
blocksize, storagePolicy);
|
||||
blocksize, storagePolicy, flags);
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {}
|
||||
|
|
|
@ -152,7 +152,7 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
|||
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
|
||||
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
|
||||
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
|
||||
null, null, fileStatus.getFileId(), null);
|
||||
null, null, fileStatus.getFileId(), null, null);
|
||||
|
||||
assertEquals("Block should be allocated sufficient locations",
|
||||
REPLICATION_FACTOR, locatedBlock.getLocations().length);
|
||||
|
|
|
@ -837,7 +837,7 @@ public class TestHASafeMode {
|
|||
new ExtendedBlock(previousBlock),
|
||||
new DatanodeInfo[0],
|
||||
DFSClientAdapter.getFileId((DFSOutputStream) create
|
||||
.getWrappedStream()), null);
|
||||
.getWrappedStream()), null, null);
|
||||
cluster.restartNameNode(0, true);
|
||||
cluster.restartDataNode(0);
|
||||
cluster.transitionToActive(0);
|
||||
|
|
|
@ -198,7 +198,7 @@ public class TestOpenFilesWithSnapshot {
|
|||
String clientName = fs.getClient().getClientName();
|
||||
// create one empty block
|
||||
nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null,
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||
fs.createSnapshot(path, "s2");
|
||||
|
||||
fs.rename(new Path("/test/test"), new Path("/test/test-renamed"));
|
||||
|
|
|
@ -282,7 +282,7 @@ public class TestSnapshotBlocksMap {
|
|||
ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
|
||||
cluster.getNameNodeRpc()
|
||||
.addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
|
||||
null, barNode.getId(), null);
|
||||
null, barNode.getId(), null, null);
|
||||
|
||||
SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");
|
||||
|
||||
|
@ -319,7 +319,7 @@ public class TestSnapshotBlocksMap {
|
|||
ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
|
||||
cluster.getNameNodeRpc()
|
||||
.addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
|
||||
null, barNode.getId(), null);
|
||||
null, barNode.getId(), null, null);
|
||||
|
||||
SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");
|
||||
|
||||
|
@ -358,7 +358,7 @@ public class TestSnapshotBlocksMap {
|
|||
ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
|
||||
cluster.getNameNodeRpc()
|
||||
.addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
|
||||
null, barNode.getId(), null);
|
||||
null, barNode.getId(), null, null);
|
||||
|
||||
SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");
|
||||
|
||||
|
|
Loading…
Reference in New Issue