HDFS-6133. Add a feature for replica pinning so that a pinned replica will not be moved by Balancer/Mover. Contributed by zhaoyunjiong

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
This commit is contained in:
Tsz-Wo Nicholas Sze 2015-02-11 15:09:29 -08:00
parent 2ca76df21a
commit 65a6cf47ec
21 changed files with 274 additions and 39 deletions

View File

@ -30,6 +30,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
changes. (Xiaoyu Yao via Arpit Agarwal) changes. (Xiaoyu Yao via Arpit Agarwal)
HDFS-6133. Add a feature for replica pinning so that a pinned replica
will not be moved by Balancer/Mover. (zhaoyunjiong via szetszwo)
IMPROVEMENTS IMPROVEMENTS
HDFS-7055. Add tracing to DFSInputStream (cmccabe) HDFS-7055. Add tracing to DFSInputStream (cmccabe)

View File

@ -781,4 +781,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// 10 days // 10 days
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10); TimeUnit.DAYS.toMillis(10);
public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED =
"dfs.datanode.block-pinning.enabled";
public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
false;
} }

View File

@ -1442,11 +1442,13 @@ public class DFSOutputStream extends FSOutputSummer
ExtendedBlock blockCopy = new ExtendedBlock(block); ExtendedBlock blockCopy = new ExtendedBlock(block);
blockCopy.setNumBytes(blockSize); blockCopy.setNumBytes(blockSize);
boolean[] targetPinnings = getPinnings(nodes);
// send the request // send the request
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile); checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1534,6 +1536,24 @@ public class DFSOutputStream extends FSOutputSummer
} }
} }
private boolean[] getPinnings(DatanodeInfo[] nodes) {
if (favoredNodes == null) {
return null;
} else {
boolean[] pinnings = new boolean[nodes.length];
for (int i = 0; i < nodes.length; i++) {
pinnings[i] = false;
for (int j = 0; j < favoredNodes.length; j++) {
if (nodes[i].getXferAddrWithHostname().equals(favoredNodes[j])) {
pinnings[i] = true;
break;
}
}
}
return pinnings;
}
}
private LocatedBlock locateFollowingBlock(long start, private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes) throws IOException { DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;

View File

@ -346,9 +346,10 @@ public class DistributedFileSystem extends FileSystem {
* Progressable)} with the addition of favoredNodes that is a hint to * Progressable)} with the addition of favoredNodes that is a hint to
* where the namenode should place the file blocks. * where the namenode should place the file blocks.
* The favored nodes hint is not persisted in HDFS. Hence it may be honored * The favored nodes hint is not persisted in HDFS. Hence it may be honored
* at the creation time only. HDFS could move the blocks during balancing or * at the creation time only. And with favored nodes, blocks will be pinned
* replication, to move the blocks from favored nodes. A value of null means * on the datanodes to prevent balancing move the block. HDFS could move the
* no favored nodes for this create * blocks during replication, to move the blocks from favored nodes. A value
* of null means no favored nodes for this create
*/ */
public HdfsDataOutputStream create(final Path f, public HdfsDataOutputStream create(final Path f,
final FsPermission permission, final boolean overwrite, final FsPermission permission, final boolean overwrite,

View File

@ -92,6 +92,8 @@ public interface DataTransferProtocol {
* @param minBytesRcvd minimum number of bytes received. * @param minBytesRcvd minimum number of bytes received.
* @param maxBytesRcvd maximum number of bytes received. * @param maxBytesRcvd maximum number of bytes received.
* @param latestGenerationStamp the latest generation stamp of the block. * @param latestGenerationStamp the latest generation stamp of the block.
* @param pinning whether to pin the block, so Balancer won't move it.
* @param targetPinnings whether to pin the block on target datanode
*/ */
public void writeBlock(final ExtendedBlock blk, public void writeBlock(final ExtendedBlock blk,
final StorageType storageType, final StorageType storageType,
@ -107,7 +109,9 @@ public interface DataTransferProtocol {
final long latestGenerationStamp, final long latestGenerationStamp,
final DataChecksum requestedChecksum, final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy, final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException; final boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException;
/** /**
* Transfer a block to another datanode. * Transfer a block to another datanode.
* The block stage must be * The block stage must be

View File

@ -149,7 +149,9 @@ public abstract class Receiver implements DataTransferProtocol {
(proto.hasCachingStrategy() ? (proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) : getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()), CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false)); (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
(proto.hasPinning() ? proto.getPinning(): false),
(PBHelper.convertBooleanList(proto.getTargetPinningsList())));
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }

View File

@ -129,7 +129,9 @@ public class Sender implements DataTransferProtocol {
final long latestGenerationStamp, final long latestGenerationStamp,
DataChecksum requestedChecksum, DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy, final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException { final boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken); blk, clientName, blockToken);
@ -148,7 +150,9 @@ public class Sender implements DataTransferProtocol {
.setLatestGenerationStamp(latestGenerationStamp) .setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto) .setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy)) .setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist); .setAllowLazyPersist(allowLazyPersist)
.setPinning(pinning)
.addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
if (source != null) { if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source)); proto.setSource(PBHelper.convertDatanodeInfo(source));

View File

@ -2954,4 +2954,25 @@ public class PBHelper {
ezKeyVersionName); ezKeyVersionName);
} }
public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
List<Boolean> pinnings = new ArrayList<Boolean>();
if (targetPinnings == null) {
pinnings.add(Boolean.FALSE);
} else {
for (; idx < targetPinnings.length; ++idx) {
pinnings.add(Boolean.valueOf(targetPinnings[idx]));
}
}
return pinnings;
}
public static boolean[] convertBooleanList(
List<Boolean> targetPinningsList) {
final boolean[] targetPinnings = new boolean[targetPinningsList.size()];
for (int i = 0; i < targetPinningsList.size(); i++) {
targetPinnings[i] = targetPinningsList.get(i);
}
return targetPinnings;
}
} }

View File

@ -133,6 +133,8 @@ class BlockReceiver implements Closeable {
private boolean isReplaceBlock = false; private boolean isReplaceBlock = false;
private DataOutputStream replyOut = null; private DataOutputStream replyOut = null;
private boolean pinning;
BlockReceiver(final ExtendedBlock block, final StorageType storageType, BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in, final DataInputStream in,
final String inAddr, final String myAddr, final String inAddr, final String myAddr,
@ -141,7 +143,8 @@ class BlockReceiver implements Closeable {
final String clientname, final DatanodeInfo srcDataNode, final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum, final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy, CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException { final boolean allowLazyPersist,
final boolean pinning) throws IOException {
try{ try{
this.block = block; this.block = block;
this.in = in; this.in = in;
@ -165,12 +168,14 @@ class BlockReceiver implements Closeable {
this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED; || stage == BlockConstructionStage.TRANSFER_FINALIZED;
this.pinning = pinning;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": " + block LOG.debug(getClass().getSimpleName() + ": " + block
+ "\n isClient =" + isClient + ", clientname=" + clientname + "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode + "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr + "\n inAddr=" + inAddr + ", myAddr=" + myAddr
+ "\n cachingStrategy = " + cachingStrategy + "\n cachingStrategy = " + cachingStrategy
+ "\n pinning=" + pinning
); );
} }
@ -1287,6 +1292,11 @@ class BlockReceiver implements Closeable {
: 0; : 0;
block.setNumBytes(replicaInfo.getNumBytes()); block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block); datanode.data.finalizeBlock(block);
if (pinning) {
datanode.data.setPinning(block);
}
datanode.closeBlock( datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid()); block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) { if (ClientTraceLog.isInfoEnabled() && isClient) {

View File

@ -2117,7 +2117,7 @@ public class DataNode extends ReconfigurableBase
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode, clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
false); false, false, null);
// send data & checksum // send data & checksum
blockSender.sendBlock(out, unbufOut, null); blockSender.sendBlock(out, unbufOut, null);

View File

@ -581,7 +581,9 @@ class DataXceiver extends Receiver implements Runnable {
final long latestGenerationStamp, final long latestGenerationStamp,
DataChecksum requestedChecksum, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy, CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException { final boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException {
previousOpClientName = clientname; previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block); updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0; final boolean isDatanode = clientname.length() == 0;
@ -601,7 +603,7 @@ class DataXceiver extends Receiver implements Runnable {
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]" + ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets) + "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
); + ", pinning=" + pinning);
LOG.debug("isDatanode=" + isDatanode LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient + ", isClient=" + isClient
+ ", isTransfer=" + isTransfer); + ", isTransfer=" + isTransfer);
@ -643,7 +645,7 @@ class DataXceiver extends Receiver implements Runnable {
peer.getLocalAddressString(), peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum, clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist); cachingStrategy, allowLazyPersist, pinning);
storageUuid = blockReceiver.getStorageUuid(); storageUuid = blockReceiver.getStorageUuid();
} else { } else {
@ -686,10 +688,19 @@ class DataXceiver extends Receiver implements Runnable {
mirrorIn = new DataInputStream(unbufMirrorIn); mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes. // Do not propagate allowLazyPersist to downstream DataNodes.
if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode, blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy, false); latestGenerationStamp, requestedChecksum, cachingStrategy,
false, targetPinnings[0], targetPinnings);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, false, targetPinnings);
}
mirrorOut.flush(); mirrorOut.flush();
@ -950,6 +961,13 @@ class DataXceiver extends Receiver implements Runnable {
} }
if (datanode.data.getPinning(block)) {
String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because it's pinned ";
LOG.info(msg);
sendResponse(ERROR, msg);
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to copy block " + block.getBlockId() + " " + String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because threads " + "to " + peer.getRemoteAddressString() + " because threads " +
@ -1109,7 +1127,7 @@ class DataXceiver extends Receiver implements Runnable {
proxyReply, proxySock.getRemoteSocketAddress().toString(), proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum, null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind(), false); CachingStrategy.newDropBehind(), false, false);
// receive a block // receive a block
blockReceiver.receiveBlock(null, null, replyOut, null, blockReceiver.receiveBlock(null, null, replyOut, null,

View File

@ -516,4 +516,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/ */
public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
StorageType targetStorageType) throws IOException; StorageType targetStorageType) throws IOException;
/**
* Set a block to be pinned on this datanode so that it cannot be moved
* by Balancer/Mover.
*
* It is a no-op when dfs.datanode.block-pinning.enabled is set to false.
*/
public void setPinning(ExtendedBlock block) throws IOException;
/**
* Check whether the block was pinned
*/
public boolean getPinning(ExtendedBlock block) throws IOException;
} }

View File

@ -51,6 +51,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
@ -242,6 +246,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Used for synchronizing access to usage stats // Used for synchronizing access to usage stats
private final Object statsLock = new Object(); private final Object statsLock = new Object();
final LocalFileSystem localFS;
private boolean blockPinningEnabled;
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
*/ */
@ -301,6 +309,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter = new Daemon(new LazyWriter(conf));
lazyWriter.start(); lazyWriter.start();
registerMBean(datanode.getDatanodeUuid()); registerMBean(datanode.getDatanodeUuid());
localFS = FileSystem.getLocal(conf);
blockPinningEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
} }
private void addVolume(Collection<StorageLocation> dataLocations, private void addVolume(Collection<StorageLocation> dataLocations,
@ -2870,5 +2882,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
shouldRun = false; shouldRun = false;
} }
} }
@Override
public void setPinning(ExtendedBlock block) throws IOException {
if (!blockPinningEnabled) {
return;
}
File f = getBlockFile(block);
Path p = new Path(f.getAbsolutePath());
FsPermission oldPermission = localFS.getFileStatus(
new Path(f.getAbsolutePath())).getPermission();
//sticky bit is used for pinning purpose
FsPermission permission = new FsPermission(oldPermission.getUserAction(),
oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
localFS.setPermission(p, permission);
}
@Override
public boolean getPinning(ExtendedBlock block) throws IOException {
if (!blockPinningEnabled) {
return false;
}
File f = getBlockFile(block);
FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
return fss.getPermission().getStickyBit();
}
} }

View File

@ -123,6 +123,9 @@ message OpWriteBlockProto {
* to ignore this hint. * to ignore this hint.
*/ */
optional bool allowLazyPersist = 13 [default = false]; optional bool allowLazyPersist = 13 [default = false];
//whether to pin the block, so Balancer won't move it.
optional bool pinning = 14 [default = false];
repeated bool targetPinnings = 15;
} }
message OpTransferBlockProto { message OpTransferBlockProto {

View File

@ -2235,4 +2235,10 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.block-pinning.enabled</name>
<value>false</value>
<description>Whether pin blocks on favored DataNode.</description>
</property>
</configuration> </configuration>

View File

@ -317,13 +317,21 @@ public class DFSTestUtil {
public static void createFile(FileSystem fs, Path fileName, int bufferLen, public static void createFile(FileSystem fs, Path fileName, int bufferLen,
long fileLen, long blockSize, short replFactor, long seed) long fileLen, long blockSize, short replFactor, long seed)
throws IOException { throws IOException {
createFile(fs, fileName, false, bufferLen, fileLen, blockSize, createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor,
replFactor, seed, false); seed, false);
} }
public static void createFile(FileSystem fs, Path fileName, public static void createFile(FileSystem fs, Path fileName,
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
short replFactor, long seed, boolean flush) throws IOException { short replFactor, long seed, boolean flush) throws IOException {
createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize,
replFactor, seed, flush, null);
}
public static void createFile(FileSystem fs, Path fileName,
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
short replFactor, long seed, boolean flush,
InetSocketAddress[] favoredNodes) throws IOException {
assert bufferLen > 0; assert bufferLen > 0;
if (!fs.mkdirs(fileName.getParent())) { if (!fs.mkdirs(fileName.getParent())) {
throw new IOException("Mkdirs failed to create " + throw new IOException("Mkdirs failed to create " +
@ -336,10 +344,19 @@ public class DFSTestUtil {
createFlags.add(LAZY_PERSIST); createFlags.add(LAZY_PERSIST);
} }
try { try {
out = fs.create(fileName, FsPermission.getFileDefault(), createFlags, if (favoredNodes == null) {
fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), out = fs.create(
fileName,
FsPermission.getFileDefault(),
createFlags,
fs.getConf().getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
replFactor, blockSize, null); replFactor, blockSize, null);
} else {
out = ((DistributedFileSystem) fs).create(fileName,
FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
null, favoredNodes);
}
if (fileLen > 0) { if (fileLen > 0) {
byte[] toWrite = new byte[bufferLen]; byte[] toWrite = new byte[bufferLen];
Random rb = new Random(seed); Random rb = new Random(seed);

View File

@ -534,6 +534,6 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage, new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS, 0, block.getNumBytes(), block.getNumBytes(), newGS,
checksum, CachingStrategy.newDefaultStrategy(), false); checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
} }
} }

View File

@ -17,12 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT; import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -33,6 +28,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.URI; import java.net.URI;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -59,12 +55,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
@ -309,6 +301,63 @@ public class TestBalancer {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
} }
/**
* Make sure that balancer can't move pinned blocks.
* If specified favoredNodes when create file, blocks will be pinned use
* sticky bit.
* @throws Exception
*/
@Test(timeout=100000)
public void testBalancerWithPinnedBlocks() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
long[] capacities = new long[] { CAPACITY, CAPACITY };
String[] racks = { RACK0, RACK1 };
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(new String[]{"localhost", "localhost"})
.racks(racks).simulatedCapacities(capacities).build();
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
long totalCapacity = sum(capacities);
long totalUsedSpace = totalCapacity * 8 / 10;
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
for (int i = 0; i < favoredNodes.length; i++) {
favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
}
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
(short) numOfDatanodes, 0, false, favoredNodes);
// start up an empty node with the same capacity
cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
new long[] { CAPACITY });
totalCapacity += CAPACITY;
// run balancer and validate results
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
} finally {
cluster.shutdown();
}
}
/** /**
* Wait until balanced: each datanode gives utilization within * Wait until balanced: each datanode gives utilization within
* BALANCE_ALLOWED_VARIANCE of average * BALANCE_ALLOWED_VARIANCE of average

View File

@ -128,6 +128,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
SimulatedOutputStream oStream = null; SimulatedOutputStream oStream = null;
private long bytesAcked; private long bytesAcked;
private long bytesRcvd; private long bytesRcvd;
private boolean pinned = false;
BInfo(String bpid, Block b, boolean forWriting) throws IOException { BInfo(String bpid, Block b, boolean forWriting) throws IOException {
theBlock = new Block(b); theBlock = new Block(b);
if (theBlock.getNumBytes() < 0) { if (theBlock.getNumBytes() < 0) {
@ -1275,5 +1276,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override
public void setPinning(ExtendedBlock b) throws IOException {
blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
}
@Override
public boolean getPinning(ExtendedBlock b) throws IOException {
return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
}
} }

View File

@ -152,7 +152,7 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "", BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null, new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum, CachingStrategy.newDefaultStrategy(), false); checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
out.flush(); out.flush();
// close the connection before sending the content of the block // close the connection before sending the content of the block

View File

@ -406,4 +406,13 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
public long getNumBlocksFailedToUncache() { public long getNumBlocksFailedToUncache() {
return 0; return 0;
} }
@Override
public void setPinning(ExtendedBlock block) throws IOException {
}
@Override
public boolean getPinning(ExtendedBlock block) throws IOException {
return false;
}
} }