diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 923e8f83cde..7fe580e938f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -30,6 +30,9 @@ Release 2.7.0 - UNRELEASED HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol changes. (Xiaoyu Yao via Arpit Agarwal) + HDFS-6133. Add a feature for replica pinning so that a pinned replica + will not be moved by Balancer/Mover. (zhaoyunjiong via szetszwo) + IMPROVEMENTS HDFS-7055. Add tracing to DFSInputStream (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d1a2b244ba7..9c87b738c57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -781,4 +781,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // 10 days public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = TimeUnit.DAYS.toMillis(10); + public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED = + "dfs.datanode.block-pinning.enabled"; + public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT = + false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 601560cb638..243d2635934 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1442,11 +1442,13 @@ public class DFSOutputStream extends FSOutputSummer ExtendedBlock blockCopy = new ExtendedBlock(block); blockCopy.setNumBytes(blockSize); + boolean[] targetPinnings = getPinnings(nodes); // send the request new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, - checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile); + checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, + (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( @@ -1534,6 +1536,24 @@ public class DFSOutputStream extends FSOutputSummer } } + private boolean[] getPinnings(DatanodeInfo[] nodes) { + if (favoredNodes == null) { + return null; + } else { + boolean[] pinnings = new boolean[nodes.length]; + for (int i = 0; i < nodes.length; i++) { + pinnings[i] = false; + for (int j = 0; j < favoredNodes.length; j++) { + if (nodes[i].getXferAddrWithHostname().equals(favoredNodes[j])) { + pinnings[i] = true; + break; + } + } + } + return pinnings; + } + } + private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 67758965ff6..46933c8174d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -346,9 +346,10 @@ public class DistributedFileSystem extends FileSystem { * Progressable)} with the addition of favoredNodes that is a hint to * where the namenode should place the file blocks. * The favored nodes hint is not persisted in HDFS. Hence it may be honored - * at the creation time only. HDFS could move the blocks during balancing or - * replication, to move the blocks from favored nodes. A value of null means - * no favored nodes for this create + * at the creation time only. And with favored nodes, blocks will be pinned + * on the datanodes to prevent balancing move the block. HDFS could move the + * blocks during replication, to move the blocks from favored nodes. A value + * of null means no favored nodes for this create */ public HdfsDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index f6b99e61601..5fc263ef317 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -92,6 +92,8 @@ public interface DataTransferProtocol { * @param minBytesRcvd minimum number of bytes received. * @param maxBytesRcvd maximum number of bytes received. * @param latestGenerationStamp the latest generation stamp of the block. + * @param pinning whether to pin the block, so Balancer won't move it. + * @param targetPinnings whether to pin the block on target datanode */ public void writeBlock(final ExtendedBlock blk, final StorageType storageType, @@ -107,7 +109,9 @@ public interface DataTransferProtocol { final long latestGenerationStamp, final DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, - final boolean allowLazyPersist) throws IOException; + final boolean allowLazyPersist, + final boolean pinning, + final boolean[] targetPinnings) throws IOException; /** * Transfer a block to another datanode. * The block stage must be diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index ffbfc21e341..7994027c6ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -149,10 +149,12 @@ public abstract class Receiver implements DataTransferProtocol { (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy()), - (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false)); - } finally { - if (traceScope != null) traceScope.close(); - } + (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false), + (proto.hasPinning() ? proto.getPinning(): false), + (PBHelper.convertBooleanList(proto.getTargetPinningsList()))); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#TRANSFER_BLOCK} */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 844c2708144..eb30afbcf06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -129,7 +129,9 @@ public class Sender implements DataTransferProtocol { final long latestGenerationStamp, DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, - final boolean allowLazyPersist) throws IOException { + final boolean allowLazyPersist, + final boolean pinning, + final boolean[] targetPinnings) throws IOException { ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken); @@ -148,7 +150,9 @@ public class Sender implements DataTransferProtocol { .setLatestGenerationStamp(latestGenerationStamp) .setRequestedChecksum(checksumProto) .setCachingStrategy(getCachingStrategy(cachingStrategy)) - .setAllowLazyPersist(allowLazyPersist); + .setAllowLazyPersist(allowLazyPersist) + .setPinning(pinning) + .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1)); if (source != null) { proto.setSource(PBHelper.convertDatanodeInfo(source)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 7c705cffc40..547823a62c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -2954,4 +2954,25 @@ public class PBHelper { ezKeyVersionName); } + public static List convert(boolean[] targetPinnings, int idx) { + List pinnings = new ArrayList(); + if (targetPinnings == null) { + pinnings.add(Boolean.FALSE); + } else { + for (; idx < targetPinnings.length; ++idx) { + pinnings.add(Boolean.valueOf(targetPinnings[idx])); + } + } + return pinnings; + } + + public static boolean[] convertBooleanList( + List targetPinningsList) { + final boolean[] targetPinnings = new boolean[targetPinningsList.size()]; + for (int i = 0; i < targetPinningsList.size(); i++) { + targetPinnings[i] = targetPinningsList.get(i); + } + return targetPinnings; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index cfd2442732e..3d6c66e434f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -132,6 +132,8 @@ class BlockReceiver implements Closeable { private long lastResponseTime = 0; private boolean isReplaceBlock = false; private DataOutputStream replyOut = null; + + private boolean pinning; BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, @@ -141,7 +143,8 @@ class BlockReceiver implements Closeable { final String clientname, final DatanodeInfo srcDataNode, final DataNode datanode, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, - final boolean allowLazyPersist) throws IOException { + final boolean allowLazyPersist, + final boolean pinning) throws IOException { try{ this.block = block; this.in = in; @@ -165,12 +168,14 @@ class BlockReceiver implements Closeable { this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED; + this.pinning = pinning; if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ": " + block + "\n isClient =" + isClient + ", clientname=" + clientname + "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode + "\n inAddr=" + inAddr + ", myAddr=" + myAddr + "\n cachingStrategy = " + cachingStrategy + + "\n pinning=" + pinning ); } @@ -1287,6 +1292,11 @@ class BlockReceiver implements Closeable { : 0; block.setNumBytes(replicaInfo.getNumBytes()); datanode.data.finalizeBlock(block); + + if (pinning) { + datanode.data.setPinning(block); + } + datanode.closeBlock( block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid()); if (ClientTraceLog.isInfoEnabled() && isClient) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b872bbb4729..cfd33cd752f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2117,7 +2117,7 @@ public class DataNode extends ReconfigurableBase new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, - false); + false, false, null); // send data & checksum blockSender.sendBlock(out, unbufOut, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 3a2723f16bd..bb5323af3c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -581,7 +581,9 @@ class DataXceiver extends Receiver implements Runnable { final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, - final boolean allowLazyPersist) throws IOException { + final boolean allowLazyPersist, + final boolean pinning, + final boolean[] targetPinnings) throws IOException { previousOpClientName = clientname; updateCurrentThreadName("Receiving block " + block); final boolean isDatanode = clientname.length() == 0; @@ -594,14 +596,14 @@ class DataXceiver extends Receiver implements Runnable { throw new IOException(stage + " does not support multiple targets " + Arrays.asList(targets)); } - + if (LOG.isDebugEnabled()) { LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname + "\n block =" + block + ", newGs=" + latestGenerationStamp + ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]" + "\n targets=" + Arrays.asList(targets) + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode - ); + + ", pinning=" + pinning); LOG.debug("isDatanode=" + isDatanode + ", isClient=" + isClient + ", isTransfer=" + isTransfer); @@ -643,7 +645,7 @@ class DataXceiver extends Receiver implements Runnable { peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, - cachingStrategy, allowLazyPersist); + cachingStrategy, allowLazyPersist, pinning); storageUuid = blockReceiver.getStorageUuid(); } else { @@ -686,10 +688,19 @@ class DataXceiver extends Receiver implements Runnable { mirrorIn = new DataInputStream(unbufMirrorIn); // Do not propagate allowLazyPersist to downstream DataNodes. - new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], + if (targetPinnings != null && targetPinnings.length > 0) { + new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, - latestGenerationStamp, requestedChecksum, cachingStrategy, false); + latestGenerationStamp, requestedChecksum, cachingStrategy, + false, targetPinnings[0], targetPinnings); + } else { + new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], + blockToken, clientname, targets, targetStorageTypes, srcDataNode, + stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy, + false, false, targetPinnings); + } mirrorOut.flush(); @@ -949,7 +960,14 @@ class DataXceiver extends Receiver implements Runnable { } } - + + if (datanode.data.getPinning(block)) { + String msg = "Not able to copy block " + block.getBlockId() + " " + + "to " + peer.getRemoteAddressString() + " because it's pinned "; + LOG.info(msg); + sendResponse(ERROR, msg); + } + if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start String msg = "Not able to copy block " + block.getBlockId() + " " + "to " + peer.getRemoteAddressString() + " because threads " + @@ -1109,7 +1127,7 @@ class DataXceiver extends Receiver implements Runnable { proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind(), false); + CachingStrategy.newDropBehind(), false, false); // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 25bc7a51ffc..cc7aec5ede1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -516,4 +516,17 @@ public interface FsDatasetSpi extends FSDatasetMBean { */ public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, StorageType targetStorageType) throws IOException; + + /** + * Set a block to be pinned on this datanode so that it cannot be moved + * by Balancer/Mover. + * + * It is a no-op when dfs.datanode.block-pinning.enabled is set to false. + */ + public void setPinning(ExtendedBlock block) throws IOException; + + /** + * Check whether the block was pinned + */ + public boolean getPinning(ExtendedBlock block) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 2abc94d3a21..cefb206edb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -51,6 +51,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.ExtendedBlockId; @@ -242,6 +246,10 @@ class FsDatasetImpl implements FsDatasetSpi { // Used for synchronizing access to usage stats private final Object statsLock = new Object(); + final LocalFileSystem localFS; + + private boolean blockPinningEnabled; + /** * An FSDataset has a directory where it loads its data files. */ @@ -301,6 +309,10 @@ class FsDatasetImpl implements FsDatasetSpi { lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); + localFS = FileSystem.getLocal(conf); + blockPinningEnabled = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, + DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT); } private void addVolume(Collection dataLocations, @@ -2870,5 +2882,33 @@ class FsDatasetImpl implements FsDatasetSpi { shouldRun = false; } } + + @Override + public void setPinning(ExtendedBlock block) throws IOException { + if (!blockPinningEnabled) { + return; + } + + File f = getBlockFile(block); + Path p = new Path(f.getAbsolutePath()); + + FsPermission oldPermission = localFS.getFileStatus( + new Path(f.getAbsolutePath())).getPermission(); + //sticky bit is used for pinning purpose + FsPermission permission = new FsPermission(oldPermission.getUserAction(), + oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); + localFS.setPermission(p, permission); + } + + @Override + public boolean getPinning(ExtendedBlock block) throws IOException { + if (!blockPinningEnabled) { + return false; + } + File f = getBlockFile(block); + + FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath())); + return fss.getPermission().getStickyBit(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 95126882d66..d72bb5e150e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -123,6 +123,9 @@ message OpWriteBlockProto { * to ignore this hint. */ optional bool allowLazyPersist = 13 [default = false]; + //whether to pin the block, so Balancer won't move it. + optional bool pinning = 14 [default = false]; + repeated bool targetPinnings = 15; } message OpTransferBlockProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 8296414e4d4..bfaa33b046e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2235,4 +2235,10 @@ + + dfs.datanode.block-pinning.enabled + false + Whether pin blocks on favored DataNode. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 3929f461985..8e659c32ac3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -317,13 +317,21 @@ public class DFSTestUtil { public static void createFile(FileSystem fs, Path fileName, int bufferLen, long fileLen, long blockSize, short replFactor, long seed) throws IOException { - createFile(fs, fileName, false, bufferLen, fileLen, blockSize, - replFactor, seed, false); + createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor, + seed, false); } public static void createFile(FileSystem fs, Path fileName, boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, short replFactor, long seed, boolean flush) throws IOException { + createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize, + replFactor, seed, flush, null); + } + + public static void createFile(FileSystem fs, Path fileName, + boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, + short replFactor, long seed, boolean flush, + InetSocketAddress[] favoredNodes) throws IOException { assert bufferLen > 0; if (!fs.mkdirs(fileName.getParent())) { throw new IOException("Mkdirs failed to create " + @@ -336,10 +344,19 @@ public class DFSTestUtil { createFlags.add(LAZY_PERSIST); } try { - out = fs.create(fileName, FsPermission.getFileDefault(), createFlags, - fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + if (favoredNodes == null) { + out = fs.create( + fileName, + FsPermission.getFileDefault(), + createFlags, + fs.getConf().getInt( + CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), replFactor, blockSize, null); - + } else { + out = ((DistributedFileSystem) fs).create(fileName, + FsPermission.getDefault(), true, bufferLen, replFactor, blockSize, + null, favoredNodes); + } if (fileLen > 0) { byte[] toWrite = new byte[bufferLen]; Random rb = new Random(seed); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index fd4f1a52cc7..0dcdc9872e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -534,6 +534,6 @@ public class TestDataTransferProtocol { BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, - checksum, CachingStrategy.newDefaultStrategy(), false); + checksum, CachingStrategy.newDefaultStrategy(), false, false, null); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 16dbdfd5d0a..a5346b2c924 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -17,12 +17,7 @@ */ package org.apache.hadoop.hdfs.server.balancer; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.StorageType.DEFAULT; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.junit.Assert.assertEquals; @@ -33,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.URI; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -59,12 +55,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; @@ -309,6 +301,63 @@ public class TestBalancer { waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); } + /** + * Make sure that balancer can't move pinned blocks. + * If specified favoredNodes when create file, blocks will be pinned use + * sticky bit. + * @throws Exception + */ + @Test(timeout=100000) + public void testBalancerWithPinnedBlocks() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true); + + long[] capacities = new long[] { CAPACITY, CAPACITY }; + String[] racks = { RACK0, RACK1 }; + int numOfDatanodes = capacities.length; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) + .hosts(new String[]{"localhost", "localhost"}) + .racks(racks).simulatedCapacities(capacities).build(); + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + + // fill up the cluster to be 80% full + long totalCapacity = sum(capacities); + long totalUsedSpace = totalCapacity * 8 / 10; + InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; + for (int i = 0; i < favoredNodes.length; i++) { + favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress(); + } + + DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, + totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE, + (short) numOfDatanodes, 0, false, favoredNodes); + + // start up an empty node with the same capacity + cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 }, + new long[] { CAPACITY }); + + totalCapacity += CAPACITY; + + // run balancer and validate results + waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + + // start rebalancing + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + + } finally { + cluster.shutdown(); + } + + } + /** * Wait until balanced: each datanode gives utilization within * BALANCE_ALLOWED_VARIANCE of average diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index ad6466ed72e..5c543a72887 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -128,6 +128,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { SimulatedOutputStream oStream = null; private long bytesAcked; private long bytesRcvd; + private boolean pinned = false; BInfo(String bpid, Block b, boolean forWriting) throws IOException { theBlock = new Block(b); if (theBlock.getNumBytes() < 0) { @@ -1275,5 +1276,15 @@ public class SimulatedFSDataset implements FsDatasetSpi { // TODO Auto-generated method stub return null; } + + @Override + public void setPinning(ExtendedBlock b) throws IOException { + blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true; + } + + @Override + public boolean getPinning(ExtendedBlock b) throws IOException { + return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index f440bb6fe5f..fb219d7e8b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -152,7 +152,7 @@ public class TestDiskError { BlockTokenSecretManager.DUMMY_TOKEN, "", new DatanodeInfo[0], new StorageType[0], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, - checksum, CachingStrategy.newDefaultStrategy(), false); + checksum, CachingStrategy.newDefaultStrategy(), false, false, null); out.flush(); // close the connection before sending the content of the block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 87a0f100752..bc91625acd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -406,4 +406,13 @@ public class ExternalDatasetImpl implements FsDatasetSpi { public long getNumBlocksFailedToUncache() { return 0; } + + @Override + public void setPinning(ExtendedBlock block) throws IOException { + } + + @Override + public boolean getPinning(ExtendedBlock block) throws IOException { + return false; + } }