HDFS-10800: [SPS]: Daemon thread in Namenode to find blocks placed in other storage than what the policy specifies. Contributed by Uma Maheswara Rao G

This commit is contained in:
Uma Maheswara Rao G 2016-09-23 13:41:29 -07:00 committed by Uma Maheswara Rao Gangumalla
parent 5692887395
commit 1438da4944
10 changed files with 791 additions and 32 deletions

View File

@ -53,6 +53,7 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -73,6 +74,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -1716,4 +1718,43 @@ public class DFSUtil {
}
return id;
}
/**
* Remove the overlap between the expected types and the existing types.
*
* @param expected
* - Expected storage types list.
* @param existing
* - Existing storage types list.
* @param ignoreNonMovable
* ignore non-movable storage types by removing them from both
* expected and existing storage type list to prevent non-movable
* storage from being moved.
* @returns if the existing types or the expected types is empty after
* removing the overlap.
*/
public static boolean removeOverlapBetweenStorageTypes(
List<StorageType> expected,
List<StorageType> existing, boolean ignoreNonMovable) {
for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (expected.remove(t)) {
i.remove();
}
}
if (ignoreNonMovable) {
removeNonMovable(existing);
removeNonMovable(expected);
}
return expected.isEmpty() || existing.isEmpty();
}
private static void removeNonMovable(List<StorageType> types) {
for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (!t.isMovable()) {
i.remove();
}
}
}
}

View File

@ -89,6 +89,8 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@ -425,6 +427,11 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockIdManager blockIdManager;
/** For satisfying block storage policies */
private final StoragePolicySatisfier sps;
private final BlockStorageMovementNeeded storageMovementNeeded =
new BlockStorageMovementNeeded();
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/
@ -464,6 +471,7 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);
sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@ -688,9 +696,11 @@ public class BlockManager implements BlockStatsMXBean {
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
sps.start();
}
public void close() {
sps.stop();
bmSafeMode.close();
try {
redundancyThread.interrupt();
@ -4980,4 +4990,14 @@ public class BlockManager implements BlockStatsMXBean {
public ProvidedStorageMap getProvidedStorageMap() {
return providedStorageMap;
}
/**
* Set file block collection for which storage movement needed for its blocks.
*
* @param id
* - file block collection id.
*/
public void satisfyStoragePolicy(long id) {
storageMovementNeeded.add(id);
}
}

View File

@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -206,6 +208,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();
/** A queue of blocks for moving its storage placements by this datanode. */
private final Queue<List<BlockMovingInfo>> storageMovementBlocks =
new LinkedList<>();
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
@ -1065,5 +1071,37 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
return false;
}
/**
* Add the block infos which needs to move its storage locations.
*
* @param storageMismatchedBlocks
* - storage mismatched block infos
*/
public void addBlocksToMoveStorage(
List<BlockMovingInfo> storageMismatchedBlocks) {
storageMovementBlocks.offer(storageMismatchedBlocks);
}
/**
* @return block infos which needs to move its storage locations.
*/
public List<BlockMovingInfo> getBlocksToMoveStorages() {
return storageMovementBlocks.poll();
}
// TODO: we will remove this method once DN side handling integrated. We can
// convert the test to check real block movements instead of this ds.
@VisibleForTesting
public List<BlockMovingInfo> getStorageMovementPendingItems() {
List<BlockMovingInfo> flatList = new ArrayList<>();
Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks
.iterator();
while (iterator.hasNext()) {
List<BlockMovingInfo> next = iterator.next();
flatList.addAll(next);
}
return flatList;
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
@ -125,7 +126,7 @@ public class StoragePolicySatisfyWorker {
return moverThreadPool;
}
public void processBlockMovingTasks(long trackID,
public void processBlockMovingTasks(long trackID, String blockPoolID,
List<BlockMovingInfo> blockMovingInfos) {
Future<Void> moveCallable = null;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
@ -133,13 +134,11 @@ public class StoragePolicySatisfyWorker {
.getSources().length == blkMovingInfo.getTargets().length;
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
BlockMovingTask blockMovingTask =
new BlockMovingTask(blkMovingInfo.getBlock(),
blkMovingInfo.getSources()[i],
blkMovingInfo.getTargets()[i],
BlockMovingTask blockMovingTask = new BlockMovingTask(
blkMovingInfo.getBlock(), blockPoolID,
blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
blkMovingInfo.getTargetStorageTypes()[i]);
moveCallable = moverExecutorCompletionService
.submit(blockMovingTask);
moveCallable = moverExecutorCompletionService.submit(blockMovingTask);
moverTaskFutures.add(moveCallable);
}
}
@ -163,14 +162,16 @@ public class StoragePolicySatisfyWorker {
* given target.
*/
private class BlockMovingTask implements Callable<Void> {
private final ExtendedBlock block;
private final Block block;
private final DatanodeInfo source;
private final DatanodeInfo target;
private final StorageType targetStorageType;
private String blockPoolID;
BlockMovingTask(ExtendedBlock block, DatanodeInfo source,
BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source,
DatanodeInfo target, StorageType targetStorageType) {
this.block = block;
this.blockPoolID = blockPoolID;
this.source = source;
this.target = target;
this.targetStorageType = targetStorageType;
@ -201,12 +202,12 @@ public class StoragePolicySatisfyWorker {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
DataEncryptionKeyFactory keyFactory = datanode
.getDataEncryptionKeyFactoryForBlock(block);
.getDataEncryptionKeyFactoryForBlock(extendedBlock);
IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
unbufOut, unbufIn, keyFactory, accessToken, target);
unbufOut = saslStreams.out;
@ -215,10 +216,10 @@ public class StoragePolicySatisfyWorker {
new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream(
new BufferedInputStream(unbufIn, ioFileBufferSize));
sendRequest(out, block, accessToken, source, targetStorageType);
sendRequest(out, extendedBlock, accessToken, source, targetStorageType);
receiveResponse(in);
LOG.debug(
LOG.info(
"Successfully moved block:{} from src:{} to destin:{} for"
+ " satisfying storageType:{}",
block, source, target, targetStorageType);

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A Class to track the block collection IDs for which physical storage movement
* needed as per the Namespace and StorageReports from DN.
*/
@InterfaceAudience.Private
public class BlockStorageMovementNeeded {
private final Queue<Long> storageMovementNeeded = new LinkedList<Long>();
/**
* Add the block collection id to tracking list for which storage movement
* expected if necessary.
*
* @param blockCollectionID
* - block collection id, which is nothing but inode id.
*/
public synchronized void add(Long blockCollectionID) {
storageMovementNeeded.add(blockCollectionID);
}
/**
* Gets the block collection id for which storage movements check necessary
* and make the movement if required.
*
* @return block collection ID
*/
public synchronized Long get() {
return storageMovementNeeded.poll();
}
}

View File

@ -0,0 +1,397 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Setting storagePolicy on a file after the file write will only update the new
* storage policy type in Namespace, but physical block storage movement will
* not happen until user runs "Mover Tool" explicitly for such files. The
* StoragePolicySatisfier Daemon thread implemented for addressing the case
* where users may want to physically move the blocks by HDFS itself instead of
* running mover tool explicitly. Just calling client API to
* satisfyStoragePolicy on a file/dir will automatically trigger to move its
* physical storage locations as expected in asynchronous manner. Here Namenode
* will pick the file blocks which are expecting to change its storages, then it
* will build the mapping of source block location and expected storage type and
* location to move. After that this class will also prepare commands to send to
* Datanode for processing the physical block movements.
*/
@InterfaceAudience.Private
public class StoragePolicySatisfier implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread;
private final Namesystem namesystem;
private final BlockManager blockManager;
private final BlockStorageMovementNeeded storageMovementNeeded;
public StoragePolicySatisfier(final Namesystem namesystem,
final BlockStorageMovementNeeded storageMovementNeeded,
final BlockManager blkManager) {
this.namesystem = namesystem;
this.storageMovementNeeded = storageMovementNeeded;
this.blockManager = blkManager;
}
/**
* Start storage policy satisfier demon thread.
*/
public void start() {
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
}
/**
* Stop storage policy satisfier demon thread.
*/
public void stop() {
if (storagePolicySatisfierThread == null) {
return;
}
storagePolicySatisfierThread.interrupt();
try {
storagePolicySatisfierThread.join(3000);
} catch (InterruptedException ie) {
}
}
@Override
public void run() {
while (namesystem.isRunning()) {
try {
Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) {
computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
}
// TODO: We can think to make this as configurable later, how frequently
// we want to check block movements.
Thread.sleep(3000);
} catch (Throwable t) {
if (!namesystem.isRunning()) {
LOG.info("Stopping StoragePolicySatisfier.");
if (!(t instanceof InterruptedException)) {
LOG.info("StoragePolicySatisfier received an exception"
+ " while shutting down.", t);
}
break;
}
LOG.error("StoragePolicySatisfier thread received runtime exception. "
+ "Stopping Storage policy satisfier work", t);
// TODO: Just break for now. Once we implement dynamic start/stop
// option, we can add conditions here when to break/terminate.
break;
}
}
}
private void computeAndAssignStorageMismatchedBlocksToDNs(
long blockCollectionID) {
BlockCollection blockCollection =
namesystem.getBlockCollection(blockCollectionID);
if (blockCollection == null) {
return;
}
byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
BlockStoragePolicy existingStoragePolicy =
blockManager.getStoragePolicy(existingStoragePolicyID);
if (!blockCollection.getLastBlock().isComplete()) {
// Postpone, currently file is under construction
// So, should we add back? or leave it to user
return;
}
// First datanode will be chosen as the co-ordinator node for storage
// movements. Later this can be optimized if needed.
DatanodeDescriptor coordinatorNode = null;
BlockInfo[] blocks = blockCollection.getBlocks();
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
for (int i = 0; i < blocks.length; i++) {
BlockInfo blockInfo = blocks[i];
List<StorageType> expectedStorageTypes =
existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication());
DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
StorageType[] storageTypes = new StorageType[storages.length];
for (int j = 0; j < storages.length; j++) {
DatanodeStorageInfo datanodeStorageInfo = storages[j];
StorageType storageType = datanodeStorageInfo.getStorageType();
storageTypes[j] = storageType;
}
List<StorageType> existing =
new LinkedList<StorageType>(Arrays.asList(storageTypes));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
List<DatanodeStorageInfo> existingBlockStorages =
new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
for (StorageType existingType : existing) {
Iterator<DatanodeStorageInfo> iterator =
existingBlockStorages.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
StorageType storageType = datanodeStorageInfo.getStorageType();
if (storageType == existingType) {
iterator.remove();
sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
datanodeStorageInfo.getDatanodeDescriptor()));
break;
}
}
}
StorageTypeNodeMap locsForExpectedStorageTypes =
findTargetsForExpectedStorageTypes(expectedStorageTypes);
BlockMovingInfo blockMovingInfo =
findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap,
expectedStorageTypes, locsForExpectedStorageTypes);
if (coordinatorNode == null) {
// For now, first datanode will be chosen as the co-ordinator. Later
// this can be optimized if needed.
coordinatorNode =
(DatanodeDescriptor) blockMovingInfo.getSources()[0];
}
blockMovingInfos.add(blockMovingInfo);
}
}
if (blockMovingInfos.size() < 1) {
// TODO: Major: handle this case. I think we need retry cases to
// be implemented. Idea is, if some files are not getting storage movement
// chances, then we can just retry limited number of times and exit.
return;
}
coordinatorNode.addBlocksToMoveStorage(blockMovingInfos);
}
/**
* Find the good target node for each source node for which block storages was
* misplaced.
*
* @param blockInfo
* - Block
* @param existing
* - Existing storage types of block
* @param sourceWithStorageList
* - Source Datanode with storages list
* @param expected
* - Expecting storages to move
* @param locsForExpectedStorageTypes
* - Available DNs for expected storage types
* @return list of block source and target node pair
*/
private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo,
List<StorageType> existing,
List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expected,
StorageTypeNodeMap locsForExpectedStorageTypes) {
List<DatanodeInfo> sourceNodes = new ArrayList<>();
List<StorageType> sourceStorageTypes = new ArrayList<>();
List<DatanodeInfo> targetNodes = new ArrayList<>();
List<StorageType> targetStorageTypes = new ArrayList<>();
List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
for (int i = 0; i < sourceWithStorageList.size(); i++) {
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
StorageTypeNodePair chosenTarget =
chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected,
locsForExpectedStorageTypes, chosenNodes);
if (chosenTarget == null && blockManager.getDatanodeManager()
.getNetworkTopology().isNodeGroupAware()) {
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
chosenNodes);
}
// Then, match nodes on the same rack
if (chosenTarget == null) {
chosenTarget =
chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
Matcher.SAME_RACK, locsForExpectedStorageTypes, chosenNodes);
}
if (chosenTarget == null) {
chosenTarget =
chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
Matcher.ANY_OTHER, locsForExpectedStorageTypes, chosenNodes);
}
if (null != chosenTarget) {
sourceNodes.add(existingTypeNodePair.dn);
sourceStorageTypes.add(existingTypeNodePair.storageType);
targetNodes.add(chosenTarget.dn);
targetStorageTypes.add(chosenTarget.storageType);
chosenNodes.add(chosenTarget.dn);
// TODO: We can increment scheduled block count for this node?
} else {
// TODO: Failed to ChooseTargetNodes...So let just retry. Shall we
// proceed without this targets? Then what should be final result?
// How about pack empty target, means target node could not be chosen ,
// so result should be RETRY_REQUIRED from DN always.
// Log..unable to choose target node for source datanodeDescriptor
sourceNodes.add(existingTypeNodePair.dn);
sourceStorageTypes.add(existingTypeNodePair.storageType);
targetNodes.add(null);
targetStorageTypes.add(null);
}
}
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
return blkMovingInfo;
}
/**
* Choose the target storage within same Datanode if possible.
*
* @param locsForExpectedStorageTypes
* @param chosenNodes
*/
private StorageTypeNodePair chooseTargetTypeInSameNode(
DatanodeDescriptor source, List<StorageType> targetTypes,
StorageTypeNodeMap locsForExpectedStorageTypes,
List<DatanodeDescriptor> chosenNodes) {
for (StorageType t : targetTypes) {
DatanodeStorageInfo chooseStorage4Block =
source.chooseStorage4Block(t, 0);
if (chooseStorage4Block != null) {
return new StorageTypeNodePair(t, source);
}
}
return null;
}
private StorageTypeNodePair chooseTarget(Block block,
DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
StorageTypeNodeMap locsForExpectedStorageTypes,
List<DatanodeDescriptor> chosenNodes) {
for (StorageType t : targetTypes) {
List<DatanodeDescriptor> nodesWithStorages =
locsForExpectedStorageTypes.getNodesWithStorages(t);
Collections.shuffle(nodesWithStorages);
for (DatanodeDescriptor target : nodesWithStorages) {
if (!chosenNodes.contains(target) && matcher.match(
blockManager.getDatanodeManager().getNetworkTopology(), source,
target)) {
if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
return new StorageTypeNodePair(t, target);
}
}
}
}
return null;
}
private static class StorageTypeNodePair {
public StorageType storageType = null;
public DatanodeDescriptor dn = null;
public StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
this.storageType = storageType;
this.dn = dn;
}
}
private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
List<StorageType> expected) {
StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.LIVE);
for (DatanodeDescriptor dn : reports) {
StorageReport[] storageReports = dn.getStorageReports();
for (StorageReport storageReport : storageReports) {
StorageType t = storageReport.getStorage().getStorageType();
if (expected.contains(t)) {
final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
if (maxRemaining > 0L) {
targetMap.add(t, dn);
}
}
}
}
return targetMap;
}
private static long getMaxRemaining(StorageReport[] storageReports,
StorageType t) {
long max = 0L;
for (StorageReport r : storageReports) {
if (r.getStorage().getStorageType() == t) {
if (r.getRemaining() > max) {
max = r.getRemaining();
}
}
}
return max;
}
private static class StorageTypeNodeMap {
private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
private void add(StorageType t, DatanodeDescriptor dn) {
List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
LinkedList<DatanodeDescriptor> value = null;
if (nodesWithStorages == null) {
value = new LinkedList<DatanodeDescriptor>();
value.add(dn);
typeNodeMap.put(t, value);
} else {
nodesWithStorages.add(dn);
}
}
/**
* @param type
* - Storage type
* @return datanodes which has the given storage type
*/
private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
return typeNodeMap.get(type);
}
}
}

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
/**
* A BlockStorageMovementCommand is an instruction to a DataNode to move the
@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
* NameNode about the movement status.
*/
public class BlockStorageMovementCommand extends DatanodeCommand {
// TODO: constructor needs to be refined based on the block movement data
// structure.
BlockStorageMovementCommand(int action) {
@ -46,13 +45,13 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
* Stores block to storage info that can be used for block movement.
*/
public static class BlockMovingInfo {
private ExtendedBlock blk;
private Block blk;
private DatanodeInfo[] sourceNodes;
private StorageType[] sourceStorageTypes;
private DatanodeInfo[] targetNodes;
private StorageType[] targetStorageTypes;
public BlockMovingInfo(ExtendedBlock block,
public BlockMovingInfo(Block block,
DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
this.blk = block;
@ -62,11 +61,11 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
this.targetStorageTypes = targetStorageTypes;
}
public void addBlock(ExtendedBlock block) {
public void addBlock(Block block) {
this.blk = block;
}
public ExtendedBlock getBlock() {
public Block getBlock() {
return this.blk;
}

View File

@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@ -71,14 +71,14 @@ public class TestStoragePolicySatisfyWorker {
public void testMoveSingleBlockToAnotherDatanode() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(4)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}})
.build();
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(4)
.storageTypes(
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
.build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
@ -108,12 +108,12 @@ public class TestStoragePolicySatisfyWorker {
src);
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
lb.getBlock(), lb.getLocations()[0], targetDnInfo,
lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
lb.getStorageTypes()[0], StorageType.ARCHIVE);
blockMovingInfos.add(blockMovingInfo);
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
worker.processBlockMovingTasks(inode.getId(),
blockMovingInfos);
cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
cluster.triggerHeartbeats();
// Wait till NameNode notified about the block location details
@ -150,7 +150,7 @@ public class TestStoragePolicySatisfyWorker {
}, 100, timeout);
}
BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block,
BlockMovingInfo prepareBlockMovingInfo(Block block,
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
StorageType targetStorageType) {
return new BlockMovingInfo(block, new DatanodeInfo[] {src},

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
/**
* Tests that StoragePolicySatisfier daemon is able to check the blocks to be
* moved and finding its suggested target locations to move.
*/
public class TestStoragePolicySatisfier {
private static final Logger LOG =
LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
private final Configuration config = new HdfsConfiguration();
private StorageType[][] allDiskTypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}};
private MiniDFSCluster hdfsCluster = null;
final private int numOfDatanodes = 3;
final private int storagesPerDatanode = 2;
final private long capacity = 2 * 256 * 1024 * 1024;
final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
private DistributedFileSystem distributedFS = null;
@Before
public void setUp() throws IOException {
config.setLong("dfs.block.size", 1024);
hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
storagesPerDatanode, capacity);
distributedFS = hdfsCluster.getFileSystem();
writeContent(distributedFS, file);
}
@Test(timeout = 300000)
public void testWhenStoragePolicySetToCOLD()
throws Exception {
try {
// Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "COLD");
Set<DatanodeDescriptor> previousNodes =
hdfsCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanodes();
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes,
6, 30000);
} finally {
hdfsCluster.shutdown();
}
}
@Test(timeout = 300000)
public void testWhenStoragePolicySetToALLSSD()
throws Exception {
try {
// Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "ALL_SSD");
Set<DatanodeDescriptor> previousNodes =
hdfsCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanodes();
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6,
30000);
} finally {
hdfsCluster.shutdown();
}
}
private void writeContent(final DistributedFileSystem dfs,
final String fileName) throws IOException {
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(fileName));
for (int i = 0; i < 1000; i++) {
out.writeChars("t");
}
out.close();
}
private void startAdditionalDNs(final Configuration conf,
int newNodesRequired, int existingNodesNum, StorageType[][] newTypes,
int storagesPerDatanode, long capacity, final MiniDFSCluster cluster)
throws IOException {
long[][] capacities;
existingNodesNum += newNodesRequired;
capacities = new long[newNodesRequired][storagesPerDatanode];
for (int i = 0; i < newNodesRequired; i++) {
for (int j = 0; j < storagesPerDatanode; j++) {
capacities[i][j] = capacity;
}
}
cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
null, capacities, null, false, false, false, null);
cluster.triggerHeartbeats();
}
private MiniDFSCluster startCluster(final Configuration conf,
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
long nodeCapacity) throws IOException {
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
for (int i = 0; i < numberOfDatanodes; i++) {
for (int j = 0; j < storagesPerDn; j++) {
capacities[i][j] = nodeCapacity;
}
}
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
.storageTypes(storageTypes).storageCapacities(capacities).build();
cluster.waitActive();
return cluster;
}
// TODO: this assertion can be changed to end to end based assertion later
// when DN side processing work integrated to this work.
private void waitExpectedStorageType(final StorageType expectedStorageType,
final DistributedFileSystem dfs,
final Set<DatanodeDescriptor> previousNodes, int expectedArchiveCount,
int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
Iterator<DatanodeDescriptor> iterator = previousNodes.iterator();
int archiveCount = 0;
while (iterator.hasNext()) {
DatanodeDescriptor dn = iterator.next();
List<BlockMovingInfo> pendingItemsToMove =
dn.getStorageMovementPendingItems();
for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) {
StorageType[] targetStorageTypes =
blkInfoToMoveStorage.getTargetStorageTypes();
for (StorageType storageType : targetStorageTypes) {
if (storageType == expectedStorageType) {
archiveCount++;
}
}
}
}
LOG.info(
expectedStorageType + " replica count, expected={} and actual={}",
expectedArchiveCount, archiveCount);
return expectedArchiveCount == archiveCount;
}
}, 100, timeout);
}
}