HDFS-12982 : [SPS]: Reduce the locking and cleanup the Namesystem access. Contributed by Rakesh R.

This commit is contained in:
Surendra Singh Lilhore 2018-01-08 15:13:11 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 78420719eb
commit 05d4daf6ba
14 changed files with 743 additions and 291 deletions

View File

@ -89,11 +89,12 @@
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@ -433,6 +434,7 @@ public long getTotalECBlockGroups() {
private final StoragePolicySatisfier sps;
private final boolean storagePolicyEnabled;
private boolean spsEnabled;
private Context spsctxt = null;
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/
@ -479,8 +481,8 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem);
sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt);
spsctxt = new IntraSPSNameNodeContext(namesystem, this, conf);
sps = new StoragePolicySatisfier(spsctxt);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@ -5039,8 +5041,8 @@ public void startSPS() {
LOG.info("Storage policy satisfier is already running.");
return;
}
sps.start(false);
// TODO: FSDirectory will get removed via HDFS-12911 modularization work
sps.start(false, namesystem.getFSDirectory());
}
/**
@ -5076,8 +5078,8 @@ public void enableSPS() {
LOG.info("Storage policy satisfier is already running.");
return;
}
sps.start(true);
// TODO: FSDirectory will get removed via HDFS-12911 modularization work
sps.start(true, namesystem.getFSDirectory());
}
/**

View File

@ -802,7 +802,7 @@ public int getBlocksScheduled() {
}
/** Increment the number of blocks scheduled. */
void incrementBlocksScheduled(StorageType t) {
public void incrementBlocksScheduled(StorageType t) {
currApproxBlocksScheduled.add(t, 1);
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@ -2045,5 +2046,26 @@ public void addDropSPSWorkCommandsToAllDNs() {
}
}
}
/**
* Generates datanode reports for the given report type.
*
* @param type
* type of the datanode report
* @return array of DatanodeStorageReports
*/
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) {
final List<DatanodeDescriptor> datanodes = getDatanodeListForReport(type);
DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes
.size()];
for (int i = 0; i < reports.length; i++) {
final DatanodeDescriptor d = datanodes.get(i);
reports[i] = new DatanodeStorageReport(
new DatanodeInfoBuilder().setFrom(d).build(), d.getStorageReports());
}
return reports;
}
}

View File

@ -90,6 +90,7 @@ static DirectoryListing getListingInt(FSDirectory fsd, FSPermissionChecker pc,
* @param srcArg The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
* @param needLocation if blockLocations need to be returned
*
* @param needLocation Include {@link LocatedBlocks} in result.
* @param needBlockToken Include block tokens in {@link LocatedBlocks}.

View File

@ -3127,6 +3127,29 @@ void removeLeasesAndINodes(List<Long> removedUCFiles,
}
}
/**
* Get the file info for a specific file.
*
* @param src The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
* @param needLocation if blockLocations need to be returned
*
* @throws AccessControlException
* if access is denied
* @throws UnresolvedLinkException
* if a symlink is encountered.
*
* @return object containing information regarding the file or null if file
* not found
* @throws StandbyException
*/
@Override
public HdfsFileStatus getFileInfo(final String src, boolean resolveLink,
boolean needLocation) throws IOException {
return getFileInfo(src, resolveLink, needLocation, false);
}
/**
* Get the file info for a specific file.
*
@ -3167,6 +3190,17 @@ HdfsFileStatus getFileInfo(final String src, boolean resolveLink,
return stat;
}
@Override
public String getFilePath(Long inodeId) {
readLock();
try {
INode inode = getFSDirectory().getInode(inodeId);
return inode == null ? null : inode.getFullPathName();
} finally {
readUnlock();
}
}
/**
* Returns true if the file is closed
*/
@ -4461,15 +4495,7 @@ DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
try {
checkOperation(OperationCategory.UNCHECKED);
final DatanodeManager dm = getBlockManager().getDatanodeManager();
final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type);
reports = new DatanodeStorageReport[datanodes.size()];
for (int i = 0; i < reports.length; i++) {
final DatanodeDescriptor d = datanodes.get(i);
reports[i] = new DatanodeStorageReport(
new DatanodeInfoBuilder().setFrom(d).build(),
d.getStorageReports());
}
reports = dm.getDatanodeStorageReport(type);
} finally {
readUnlock("getDatanodeStorageReport");
}

View File

@ -1,41 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
/**
* This class is the Namenode implementation for analyzing the file blocks which
* are expecting to change its storages and assigning the block storage
* movements to satisfy the storage policy.
*/
// TODO: Now, added one API which is required for sps package. Will refine
// this interface via HDFS-12911.
public class IntraNNSPSContext implements StoragePolicySatisfier.Context {
private final Namesystem namesystem;
public IntraNNSPSContext(Namesystem namesystem) {
this.namesystem = namesystem;
}
@Override
public int getNumLiveDataNodes() {
return namesystem.getFSDirectory().getBlockManager().getDatanodeManager()
.getNumLiveDataNodes();
}
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.util.RwLock;
@ -62,4 +63,27 @@ public interface Namesystem extends RwLock, SafeMode {
* @throws IOException
*/
void removeXattr(long id, String xattrName) throws IOException;
/**
* Gets the fileInfo of the given file path.
*
* @param filePath string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
* @param needLocation if blockLocations need to be returned
*
* @return hdfs file status details
* @throws IOException
*/
HdfsFileStatus getFileInfo(String filePath, boolean resolveLink,
boolean needLocation) throws IOException;
/**
* Gets the file path corresponds to the given file id.
*
* @param inodeId
* file id
* @return string file path
*/
String getFilePath(Long inodeId);
}

View File

@ -25,6 +25,11 @@
import java.util.Iterator;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
@ -66,15 +71,21 @@ public class BlockStorageMovementAttemptedItems {
//
private long minCheckTimeout = 1 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded blockStorageMovementNeeded;
private final Context ctxt;
public BlockStorageMovementAttemptedItems(long recheckTimeout,
long selfRetryTimeout,
public BlockStorageMovementAttemptedItems(Context context,
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
this.ctxt = context;
long recheckTimeout = ctxt.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT);
if (recheckTimeout > 0) {
this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
}
this.selfRetryTimeout = selfRetryTimeout;
this.selfRetryTimeout = ctxt.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new ArrayList<>();
movementFinishedBlocks = new ArrayList<>();

View File

@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
import java.io.IOException;
import java.util.ArrayList;
@ -35,10 +36,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@ -73,13 +73,11 @@ public class BlockStorageMovementNeeded {
private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>();
private final Namesystem namesystem;
private final Context ctxt;
// List of pending dir to satisfy the policy
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
private final StoragePolicySatisfier sps;
private Daemon inodeIdCollector;
private final int maxQueuedItem;
@ -88,11 +86,11 @@ public class BlockStorageMovementNeeded {
// NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000;
public BlockStorageMovementNeeded(Namesystem namesystem,
StoragePolicySatisfier sps, int queueLimit) {
this.namesystem = namesystem;
this.sps = sps;
this.maxQueuedItem = queueLimit;
public BlockStorageMovementNeeded(Context context) {
this.ctxt = context;
this.maxQueuedItem = ctxt.getConf().getInt(
DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
}
/**
@ -188,8 +186,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
// If track is part of some start inode then reduce the pending
// directory work count.
long startId = trackInfo.getStartId();
INode inode = namesystem.getFSDirectory().getInode(startId);
if (inode == null) {
if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
updateStatus(startId, isSuccess);
@ -198,7 +195,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
if (pendingWork != null) {
pendingWork.decrementPendingWorkCount();
if (pendingWork.isDirWorkDone()) {
namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
ctxt.removeSPSHint(startId);
pendingWorkForDirectory.remove(startId);
pendingWork.setFailure(!isSuccess);
updateStatus(startId, pendingWork.isPolicySatisfied());
@ -209,8 +206,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
namesystem.removeXattr(trackInfo.getTrackId(),
XATTR_SATISFY_STORAGE_POLICY);
ctxt.removeSPSHint(trackInfo.getTrackId());
updateStatus(trackInfo.getStartId(), isSuccess);
}
}
@ -256,7 +252,7 @@ public synchronized void clearQueuesWithNotification() {
while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
try {
// Remove xAttr for file
namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
ctxt.removeSPSHint(trackId);
} catch (IOException ie) {
LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
}
@ -269,8 +265,7 @@ public synchronized void clearQueuesWithNotification() {
try {
// Remove xAttr for file
if (!itemInfo.isDir()) {
namesystem.removeXattr(itemInfo.getTrackId(),
XATTR_SATISFY_STORAGE_POLICY);
ctxt.removeSPSHint(itemInfo.getTrackId());
}
} catch (IOException ie) {
LOG.warn(
@ -300,10 +295,9 @@ private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
long lastStatusCleanTime = 0;
while (namesystem.isRunning() && sps.isRunning()) {
while (ctxt.isRunning()) {
try {
if (!namesystem.isInSafeMode()) {
FSDirectory fsd = namesystem.getFSDirectory();
if (!ctxt.isInSafeMode()) {
Long startINodeId = spsDirsToBeTraveresed.poll();
if (startINodeId == null) {
// Waiting for SPS path
@ -311,7 +305,7 @@ public void run() {
spsDirsToBeTraveresed.wait(5000);
}
} else {
INode startInode = fsd.getInode(startINodeId);
INode startInode = getFSDirectory().getInode(startINodeId);
if (startInode != null) {
try {
remainingCapacity = remainingCapacity();
@ -333,8 +327,7 @@ public void run() {
DirPendingWorkInfo dirPendingWorkInfo =
pendingWorkForDirectory.get(startInode.getId());
if (dirPendingWorkInfo.isDirWorkDone()) {
namesystem.removeXattr(startInode.getId(),
XATTR_SATISFY_STORAGE_POLICY);
ctxt.removeSPSHint(startInode.getId());
pendingWorkForDirectory.remove(startInode.getId());
updateStatus(startInode.getId(), true);
}
@ -483,9 +476,10 @@ public void setFailure(boolean failure) {
}
}
public void init() {
// TODO: FSDirectory will get removed via HDFS-12911 modularization work
public void init(FSDirectory fsd) {
inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
namesystem.getFSDirectory()));
fsd));
inodeIdCollector.setName("FileInodeIdCollector");
inodeIdCollector.start();
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
import java.util.function.Supplier;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
/**
* An interface for the communication between NameNode and SPS module.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface Context {
/**
* Returns configuration object.
*/
Configuration getConf();
/**
* Returns true if the SPS is running, false otherwise.
*/
boolean isRunning();
/**
* Update the SPS running status.
*
* @param isSpsRunning
* true represents running, false otherwise
*/
void setSPSRunning(Supplier<Boolean> isSpsRunning);
/**
* Returns true if the Namenode in safe mode, false otherwise.
*/
boolean isInSafeMode();
/**
* Returns true if Mover tool is already running, false otherwise.
*/
boolean isMoverRunning();
/**
* Gets the Inode ID number for the given path.
*
* @param path
* - file/dir path
* @return Inode id number
*/
long getFileID(String path) throws UnresolvedLinkException,
AccessControlException, ParentNotDirectoryException;
/**
* Gets the network topology.
*
* @return network topology
*/
NetworkTopology getNetworkTopology();
/**
* Returns true if the give Inode exists in the Namespace.
*
* @param inodeId
* - Inode ID
* @return true if Inode exists, false otherwise.
*/
boolean isFileExist(long inodeId);
/**
* Gets the storage policy details for the given policy ID.
*
* @param policyId
* - Storage policy ID
* @return the detailed policy object
*/
BlockStoragePolicy getStoragePolicy(byte policyId);
/**
* Drop the SPS work in case if any previous work queued up.
*/
void addDropPreviousSPSWorkAtDNs();
/**
* Remove the hint which was added to track SPS call.
*
* @param inodeId
* - Inode ID
* @throws IOException
*/
void removeSPSHint(long inodeId) throws IOException;
/**
* Gets the number of live datanodes in the cluster.
*
* @return number of live datanodes
*/
int getNumLiveDataNodes();
/**
* Get the file info for a specific file.
*
* @param inodeID
* inode identifier
* @return file status metadata information
*/
HdfsFileStatus getFileInfo(long inodeID) throws IOException;
/**
* Returns all the live datanodes and its storage details.
*
* @throws IOException
*/
DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException;
/**
* Returns true if the given inode file has low redundancy blocks.
*
* @param inodeID
* inode identifier
* @return true if block collection has low redundancy blocks
*/
boolean hasLowRedundancyBlocks(long inodeID);
/**
* Assign the given block movement task to the target node present in
* {@link BlockMovingInfo}.
*
* @param blkMovingInfo
* block to storage info
* @throws IOException
*/
void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
throws IOException;
/**
* Checks whether the given datanode has sufficient space to occupy the given
* blockSize data.
*
* @param dn
* datanode info
* @param type
* storage type
* @param blockSize
* blockSize to be scheduled
* @return true if the given datanode has sufficient space to occupy blockSize
* data, false otherwise.
*/
boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
StorageType type, long blockSize);
}

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
import java.util.function.Supplier;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is the Namenode implementation for analyzing the file blocks which
* are expecting to change its storages and assigning the block storage
* movements to satisfy the storage policy.
*/
public class IntraSPSNameNodeContext implements Context {
private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class);
private final Namesystem namesystem;
private final BlockManager blockManager;
private final Configuration conf;
private Supplier<Boolean> isSpsRunning;
public IntraSPSNameNodeContext(Namesystem namesystem,
BlockManager blockManager, Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.conf = conf;
isSpsRunning = () -> false;
}
@Override
public int getNumLiveDataNodes() {
return blockManager.getDatanodeManager().getNumLiveDataNodes();
}
@Override
public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
String filePath = namesystem.getFilePath(inodeID);
if (StringUtils.isBlank(filePath)) {
LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
return null;
}
HdfsFileStatus fileInfo = null;
try {
fileInfo = namesystem.getFileInfo(filePath, true, true);
} catch (IOException e) {
LOG.debug("File path:{} doesn't exists!", filePath);
}
return fileInfo;
}
@Override
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
namesystem.readLock();
try {
return blockManager.getDatanodeManager()
.getDatanodeStorageReport(DatanodeReportType.LIVE);
} finally {
namesystem.readUnlock();
}
}
@Override
public boolean hasLowRedundancyBlocks(long inodeID) {
namesystem.readLock();
try {
BlockCollection bc = namesystem.getBlockCollection(inodeID);
return blockManager.hasLowRedundancyBlocks(bc);
} finally {
namesystem.readUnlock();
}
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public boolean isFileExist(long inodeId) {
return namesystem.getFSDirectory().getInode(inodeId) != null;
}
@Override
public void removeSPSHint(long inodeId) throws IOException {
this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
}
@Override
public boolean isRunning() {
// TODO : 'isSpsRunning' flag has been added to avoid the NN lock inside
// SPS. Context interface will be further refined as part of HDFS-12911
// modularization task. One idea is to introduce a cleaner interface similar
// to Namesystem for better abstraction.
return namesystem.isRunning() && isSpsRunning.get();
}
@Override
public void setSPSRunning(Supplier<Boolean> spsRunningFlag) {
this.isSpsRunning = spsRunningFlag;
}
@Override
public boolean isInSafeMode() {
return namesystem.isInSafeMode();
}
@Override
public boolean isMoverRunning() {
String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
return namesystem.isFileOpenedForWrite(moverId);
}
@Override
public void addDropPreviousSPSWorkAtDNs() {
namesystem.readLock();
try {
blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
} finally {
namesystem.readUnlock();
}
}
@Override
public BlockStoragePolicy getStoragePolicy(byte policyID) {
return blockManager.getStoragePolicy(policyID);
}
@Override
public NetworkTopology getNetworkTopology() {
return blockManager.getDatanodeManager().getNetworkTopology();
}
@Override
public long getFileID(String path) throws UnresolvedLinkException,
AccessControlException, ParentNotDirectoryException {
namesystem.readLock();
try {
INode inode = namesystem.getFSDirectory().getINode(path);
return inode == null ? -1 : inode.getId();
} finally {
namesystem.readUnlock();
}
}
@Override
public void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
throws IOException {
namesystem.readLock();
try {
DatanodeDescriptor dn = blockManager.getDatanodeManager()
.getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
if (dn == null) {
throw new IOException("Failed to schedule block movement task:"
+ blkMovingInfo + " as target datanode: "
+ blkMovingInfo.getTarget() + " doesn't exists");
}
dn.addBlocksToMoveStorage(blkMovingInfo);
dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
} finally {
namesystem.readUnlock();
}
}
@Override
public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
StorageType type, long blockSize) {
namesystem.readLock();
try {
DatanodeDescriptor datanode = blockManager.getDatanodeManager()
.getDatanode(dn.getDatanodeUuid());
if (datanode == null) {
LOG.debug("Target datanode: " + dn + " doesn't exists");
return false;
}
return null != datanode.chooseStorage4Block(type, blockSize);
} finally {
namesystem.readUnlock();
}
}
}

View File

@ -29,29 +29,28 @@
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
@ -79,8 +78,6 @@ public class StoragePolicySatisfier implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread;
private final Namesystem namesystem;
private final BlockManager blockManager;
private final BlockStorageMovementNeeded storageMovementNeeded;
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
@ -89,16 +86,6 @@ public class StoragePolicySatisfier implements Runnable {
private int blockMovementMaxRetry;
private final Context ctxt;
/**
* An interface for analyzing and assigning the block storage movements to
* worker nodes.
*/
// TODO: Now, added one API which is required for sps package. Will refine
// this interface via HDFS-12911.
public interface Context {
int getNumLiveDataNodes();
}
/**
* Represents the collective analysis status for all blocks.
*/
@ -124,7 +111,9 @@ enum Status {
BLOCKS_TARGET_PAIRING_SKIPPED,
// Represents that, All the reported blocks are satisfied the policy but
// some of the blocks are low redundant.
FEW_LOW_REDUNDANCY_BLOCKS
FEW_LOW_REDUNDANCY_BLOCKS,
// Represents that, movement failures due to unexpected errors.
BLOCKS_FAILED_TO_MOVE
}
private Status status = null;
@ -136,36 +125,27 @@ enum Status {
}
}
public StoragePolicySatisfier(final Namesystem namesystem,
final BlockManager blkManager, Configuration conf, Context ctxt) {
this.namesystem = namesystem;
this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
this, conf.getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
this.blockManager = blkManager;
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
conf.getLong(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT),
conf.getLong(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
public StoragePolicySatisfier(Context ctxt) {
this.ctxt = ctxt;
this.storageMovementNeeded = new BlockStorageMovementNeeded(ctxt);
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(ctxt,
storageMovementNeeded);
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
this.blockMovementMaxRetry = conf.getInt(
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(ctxt.getConf());
this.blockMovementMaxRetry = ctxt.getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
this.ctxt = ctxt;
}
/**
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
*
* // TODO: FSDirectory will get removed via HDFS-12911 modularization work.
*/
public synchronized void start(boolean reconfigStart) {
public synchronized void start(boolean reconfigStart, FSDirectory fsd) {
isRunning = true;
if (checkIfMoverRunning()) {
ctxt.setSPSRunning(this::isRunning);
if (ctxt.isMoverRunning()) {
isRunning = false;
LOG.error(
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
@ -183,7 +163,7 @@ public synchronized void start(boolean reconfigStart) {
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
storageMovementNeeded.init();
storageMovementNeeded.init(fsd);
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@ -199,7 +179,6 @@ public synchronized void start(boolean reconfigStart) {
*/
public synchronized void disable(boolean forceStop) {
isRunning = false;
if (storagePolicySatisfierThread == null) {
return;
}
@ -242,25 +221,19 @@ public boolean isRunning() {
return isRunning;
}
// Return true if a Mover instance is running
private boolean checkIfMoverRunning() {
String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
return namesystem.isFileOpenedForWrite(moverId);
}
/**
* Adding drop commands to all datanodes to stop performing the satisfier
* block movements, if any.
*/
private void addDropSPSWorkCommandsToAllDNs() {
this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
ctxt.addDropPreviousSPSWorkAtDNs();
}
@Override
public void run() {
while (namesystem.isRunning() && isRunning) {
while (ctxt.isRunning()) {
try {
if (!namesystem.isInSafeMode()) {
if (!ctxt.isInSafeMode()) {
ItemInfo itemInfo = storageMovementNeeded.get();
if (itemInfo != null) {
if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
@ -271,25 +244,28 @@ public void run() {
continue;
}
long trackId = itemInfo.getTrackId();
BlockCollection blockCollection;
BlocksMovingAnalysis status = null;
try {
namesystem.readLock();
blockCollection = namesystem.getBlockCollection(trackId);
// Check blockCollectionId existence.
if (blockCollection == null) {
// File doesn't exists (maybe got deleted), remove trackId from
// the queue
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
} else {
status =
analyseBlocksStorageMovementsAndAssignToDN(
blockCollection);
}
} finally {
namesystem.readUnlock();
}
if (blockCollection != null) {
DatanodeStorageReport[] liveDnReports;
BlockStoragePolicy existingStoragePolicy;
// TODO: presently, context internally acquire the lock
// and returns the result. Need to discuss to move the lock outside?
boolean hasLowRedundancyBlocks = ctxt
.hasLowRedundancyBlocks(trackId);
HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
// Check path existence.
if (fileStatus == null || fileStatus.isDir()) {
// File doesn't exists (maybe got deleted) or its a directory,
// just remove trackId from the queue
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
} else {
liveDnReports = ctxt.getLiveDatanodeStorageReport();
byte existingStoragePolicyID = fileStatus.getStoragePolicy();
existingStoragePolicy = ctxt
.getStoragePolicy(existingStoragePolicyID);
HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
status = analyseBlocksStorageMovementsAndAssignToDN(file,
hasLowRedundancyBlocks, existingStoragePolicy, liveDnReports);
switch (status.status) {
// Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY:
@ -317,6 +293,14 @@ public void run() {
}
this.storageMovementNeeded.add(itemInfo);
break;
case BLOCKS_FAILED_TO_MOVE:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID " + trackId
+ " back to retry queue as some of the blocks"
+ " movement failed.");
}
this.storageMovementNeeded.add(itemInfo);
break;
// Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED:
case BLOCKS_ALREADY_SATISFIED:
@ -350,14 +334,11 @@ private void handleException(Throwable t) {
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stopGracefully();
if (!namesystem.isRunning()) {
LOG.info("Stopping StoragePolicySatisfier.");
if (!(t instanceof InterruptedException)) {
LOG.info("StoragePolicySatisfier received an exception"
+ " while shutting down.", t);
}
return;
if (!(t instanceof InterruptedException)) {
LOG.info("StoragePolicySatisfier received an exception"
+ " while shutting down.", t);
}
LOG.info("Stopping StoragePolicySatisfier.");
}
}
}
@ -367,41 +348,43 @@ private void handleException(Throwable t) {
}
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
BlockCollection blockCollection) {
HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks,
BlockStoragePolicy existingStoragePolicy,
DatanodeStorageReport[] liveDns) {
BlocksMovingAnalysis.Status status =
BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
BlockStoragePolicy existingStoragePolicy =
blockManager.getStoragePolicy(existingStoragePolicyID);
if (!blockCollection.getLastBlock().isComplete()) {
final ErasureCodingPolicy ecPolicy = fileInfo.getErasureCodingPolicy();
final LocatedBlocks locatedBlocks = fileInfo.getLocatedBlocks();
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
if (!lastBlkComplete) {
// Postpone, currently file is under construction
// So, should we add back? or leave it to user
LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+ " this to the next retry iteration", blockCollection.getId());
+ " this to the next retry iteration", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
new ArrayList<>());
}
BlockInfo[] blocks = blockCollection.getBlocks();
if (blocks.length == 0) {
List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
if (blocks.size() == 0) {
LOG.info("BlockCollectionID: {} file is not having any blocks."
+ " So, skipping the analysis.", blockCollection.getId());
+ " So, skipping the analysis.", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new ArrayList<>());
}
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
for (int i = 0; i < blocks.length; i++) {
BlockInfo blockInfo = blocks[i];
for (int i = 0; i < blocks.size(); i++) {
LocatedBlock blockInfo = blocks.get(i);
List<StorageType> expectedStorageTypes;
if (blockInfo.isStriped()) {
if (ErasureCodingPolicyManager
.checkStoragePolicySuitableForECStripedMode(
existingStoragePolicyID)) {
existingStoragePolicy.getId())) {
expectedStorageTypes = existingStoragePolicy
.chooseStorageTypes((short) blockInfo.getCapacity());
.chooseStorageTypes((short) blockInfo.getLocations().length);
} else {
// Currently we support only limited policies (HOT, COLD, ALLSSD)
// for EC striped mode files. SPS will ignore to move the blocks if
@ -415,22 +398,16 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
}
} else {
expectedStorageTypes = existingStoragePolicy
.chooseStorageTypes(blockInfo.getReplication());
.chooseStorageTypes(fileInfo.getReplication());
}
DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
StorageType[] storageTypes = new StorageType[storages.length];
for (int j = 0; j < storages.length; j++) {
DatanodeStorageInfo datanodeStorageInfo = storages[j];
StorageType storageType = datanodeStorageInfo.getStorageType();
storageTypes[j] = storageType;
}
List<StorageType> existing =
new LinkedList<StorageType>(Arrays.asList(storageTypes));
List<StorageType> existing = new LinkedList<StorageType>(
Arrays.asList(blockInfo.getStorageTypes()));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
blockInfo, expectedStorageTypes, existing, storages);
blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(),
liveDns, ecPolicy);
if (blocksPaired) {
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
} else {
@ -439,7 +416,7 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
}
} else {
if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
if (hasLowRedundancyBlocks) {
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
}
@ -448,13 +425,15 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
List<Block> assignedBlockIds = new ArrayList<Block>();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
if (blkMovingInfo.getTarget() != null) {
// assign block storage movement task to the target node
((DatanodeDescriptor) blkMovingInfo.getTarget())
.addBlocksToMoveStorage(blkMovingInfo);
try {
ctxt.assignBlockMoveTaskToTargetNode(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
assignedBlockIds.add(blkMovingInfo.getBlock());
blockCount++;
} catch (IOException e) {
LOG.warn("Exception while scheduling movement task", e);
// failed to move the block.
status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
}
}
return new BlocksMovingAnalysis(status, assignedBlockIds);
@ -481,29 +460,29 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
* satisfy the storage policy, true otherwise
*/
private boolean computeBlockMovingInfos(
List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageType> expectedStorageTypes, List<StorageType> existing,
DatanodeStorageInfo[] storages) {
DatanodeInfo[] storages, DatanodeStorageReport[] liveDns,
ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
List<DatanodeStorageInfo> existingBlockStorages =
new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
Arrays.asList(storages));
// if expected type exists in source node already, local movement would be
// possible, so lets find such sources first.
Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator();
Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
if (checkSourceAndTargetTypeExists(
datanodeStorageInfo.getDatanodeDescriptor(), existing,
expectedStorageTypes)) {
DatanodeInfoWithStorage dnInfo = (DatanodeInfoWithStorage) iterator
.next();
if (checkSourceAndTargetTypeExists(dnInfo, existing,
expectedStorageTypes, liveDns)) {
sourceWithStorageMap
.add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
datanodeStorageInfo.getDatanodeDescriptor()));
.add(new StorageTypeNodePair(dnInfo.getStorageType(), dnInfo));
iterator.remove();
existing.remove(datanodeStorageInfo.getStorageType());
existing.remove(dnInfo.getStorageType());
}
}
@ -511,23 +490,25 @@ private boolean computeBlockMovingInfos(
for (StorageType existingType : existing) {
iterator = existingBlockStorages.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
StorageType storageType = datanodeStorageInfo.getStorageType();
DatanodeInfoWithStorage dnStorageInfo =
(DatanodeInfoWithStorage) iterator.next();
StorageType storageType = dnStorageInfo.getStorageType();
if (storageType == existingType) {
iterator.remove();
sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
datanodeStorageInfo.getDatanodeDescriptor()));
dnStorageInfo));
break;
}
}
}
StorageTypeNodeMap locsForExpectedStorageTypes =
findTargetsForExpectedStorageTypes(expectedStorageTypes);
findTargetsForExpectedStorageTypes(expectedStorageTypes, liveDns);
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
blockMovingInfos, blockInfo, sourceWithStorageMap,
expectedStorageTypes, locsForExpectedStorageTypes);
expectedStorageTypes, locsForExpectedStorageTypes,
ecPolicy);
}
return foundMatchingTargetNodesForBlock;
}
@ -550,12 +531,13 @@ private boolean computeBlockMovingInfos(
* satisfy the storage policy
*/
private boolean findSourceAndTargetToMove(
List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expected,
StorageTypeNodeMap locsForExpectedStorageTypes) {
StorageTypeNodeMap locsForExpectedStorageTypes,
ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
List<DatanodeInfo> excludeNodes = new ArrayList<>();
// Looping over all the source node locations and choose the target
// storage within same node if possible. This is done separately to
@ -566,13 +548,14 @@ private boolean findSourceAndTargetToMove(
// Check whether the block replica is already placed in the expected
// storage type in this source datanode.
if (!expected.contains(existingTypeNodePair.storageType)) {
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
blockInfo, existingTypeNodePair.dn, expected);
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo,
existingTypeNodePair.dn, expected);
if (chosenTarget != null) {
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
chosenTarget.storageType, blockMovingInfos,
ecPolicy);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
@ -596,7 +579,7 @@ private boolean findSourceAndTargetToMove(
if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
continue;
}
if (chosenTarget == null && blockManager.getDatanodeManager()
if (chosenTarget == null && ctxt
.getNetworkTopology().isNodeGroupAware()) {
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
@ -619,7 +602,7 @@ private boolean findSourceAndTargetToMove(
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
chosenTarget.storageType, blockMovingInfos, ecPolicy);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
@ -645,7 +628,7 @@ private boolean findSourceAndTargetToMove(
}
private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
DatanodeDescriptor dn) {
DatanodeInfo dn) {
for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
if (blockMovingInfo.getSource().equals(dn)) {
return true;
@ -654,37 +637,40 @@ private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
return false;
}
private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
private void buildContinuousBlockMovingInfos(LocatedBlock blockInfo,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
List<BlockMovingInfo> blkMovingInfos) {
Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
blockInfo.getGenerationStamp());
Block blk = ExtendedBlock.getLocalBlock(blockInfo.getBlock());
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
}
private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
private void buildStripedBlockMovingInfos(LocatedBlock blockInfo,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
List<BlockMovingInfo> blkMovingInfos) {
List<BlockMovingInfo> blkMovingInfos, ErasureCodingPolicy ecPolicy) {
// For a striped block, it needs to construct internal block at the given
// index of a block group. Here it is iterating over all the block indices
// and construct internal blocks which can be then considered for block
// movement.
BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo;
for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
if (si.getBlockIndex() >= 0) {
DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
if (sourceNode.equals(dn)) {
LocatedStripedBlock sBlockInfo = (LocatedStripedBlock) blockInfo;
byte[] indices = sBlockInfo.getBlockIndices();
DatanodeInfo[] locations = sBlockInfo.getLocations();
for (int i = 0; i < indices.length; i++) {
byte blkIndex = indices[i];
if (blkIndex >= 0) {
// pick block movement only for the given source node.
if (sourceNode.equals(locations[i])) {
// construct internal block
long blockId = blockInfo.getBlockId() + si.getBlockIndex();
ExtendedBlock extBlock = sBlockInfo.getBlock();
long numBytes = StripedBlockUtil.getInternalBlockLength(
sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
sBlockInfo.getDataBlockNum(), si.getBlockIndex());
Block blk = new Block(blockId, numBytes,
blockInfo.getGenerationStamp());
extBlock.getNumBytes(), ecPolicy, blkIndex);
Block blk = new Block(ExtendedBlock.getLocalBlock(extBlock));
long blkId = blk.getBlockId() + blkIndex;
blk.setBlockId(blkId);
blk.setNumBytes(numBytes);
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
@ -703,34 +689,35 @@ private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
* @param targetTypes
* - list of target storage types
*/
private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
DatanodeDescriptor source, List<StorageType> targetTypes) {
private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo,
DatanodeInfo source, List<StorageType> targetTypes) {
for (StorageType t : targetTypes) {
DatanodeStorageInfo chooseStorage4Block =
source.chooseStorage4Block(t, block.getNumBytes());
if (chooseStorage4Block != null) {
boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
source, t, blockInfo.getBlockSize());
if (goodTargetDn) {
return new StorageTypeNodePair(t, source);
}
}
return null;
}
private StorageTypeNodePair chooseTarget(Block block,
DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
private StorageTypeNodePair chooseTarget(LocatedBlock block,
DatanodeInfo source, List<StorageType> targetTypes, Matcher matcher,
StorageTypeNodeMap locsForExpectedStorageTypes,
List<DatanodeDescriptor> excludeNodes) {
List<DatanodeInfo> excludeNodes) {
for (StorageType t : targetTypes) {
List<DatanodeDescriptor> nodesWithStorages =
locsForExpectedStorageTypes.getNodesWithStorages(t);
List<DatanodeInfo> nodesWithStorages = locsForExpectedStorageTypes
.getNodesWithStorages(t);
if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
continue; // no target nodes with the required storage type.
}
Collections.shuffle(nodesWithStorages);
for (DatanodeDescriptor target : nodesWithStorages) {
if (!excludeNodes.contains(target) && matcher.match(
blockManager.getDatanodeManager().getNetworkTopology(), source,
target)) {
if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
for (DatanodeInfo target : nodesWithStorages) {
if (!excludeNodes.contains(target)
&& matcher.match(ctxt.getNetworkTopology(), source, target)) {
boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
target, t, block.getBlockSize());
if (goodTargetDn) {
return new StorageTypeNodePair(t, target);
}
}
@ -741,27 +728,25 @@ private StorageTypeNodePair chooseTarget(Block block,
private static class StorageTypeNodePair {
private StorageType storageType = null;
private DatanodeDescriptor dn = null;
private DatanodeInfo dn = null;
StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
this.storageType = storageType;
this.dn = dn;
}
}
private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
List<StorageType> expected) {
List<StorageType> expected, DatanodeStorageReport[] liveDns) {
StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.LIVE);
for (DatanodeDescriptor dn : reports) {
for (DatanodeStorageReport dn : liveDns) {
StorageReport[] storageReports = dn.getStorageReports();
for (StorageReport storageReport : storageReports) {
StorageType t = storageReport.getStorage().getStorageType();
if (expected.contains(t)) {
final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
if (maxRemaining > 0L) {
targetMap.add(t, dn);
targetMap.add(t, dn.getDatanodeInfo());
}
}
}
@ -782,32 +767,40 @@ private static long getMaxRemaining(StorageReport[] storageReports,
return max;
}
private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
List<StorageType> existing, List<StorageType> expectedStorageTypes) {
DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn,
List<StorageType> existing, List<StorageType> expectedStorageTypes,
DatanodeStorageReport[] liveDns) {
boolean isExpectedTypeAvailable = false;
boolean isExistingTypeAvailable = false;
for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
StorageType storageType = dnInfo.getStorageType();
if (existing.contains(storageType)) {
isExistingTypeAvailable = true;
}
if (expectedStorageTypes.contains(storageType)) {
isExpectedTypeAvailable = true;
for (DatanodeStorageReport liveDn : liveDns) {
if (dn.equals(liveDn.getDatanodeInfo())) {
StorageReport[] storageReports = liveDn.getStorageReports();
for (StorageReport eachStorage : storageReports) {
StorageType storageType = eachStorage.getStorage().getStorageType();
if (existing.contains(storageType)) {
isExistingTypeAvailable = true;
}
if (expectedStorageTypes.contains(storageType)) {
isExpectedTypeAvailable = true;
}
if (isExistingTypeAvailable && isExpectedTypeAvailable) {
return true;
}
}
}
}
return isExistingTypeAvailable && isExpectedTypeAvailable;
}
private static class StorageTypeNodeMap {
private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
private final EnumMap<StorageType, List<DatanodeInfo>> typeNodeMap =
new EnumMap<StorageType, List<DatanodeInfo>>(StorageType.class);
private void add(StorageType t, DatanodeDescriptor dn) {
List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
LinkedList<DatanodeDescriptor> value = null;
private void add(StorageType t, DatanodeInfo dn) {
List<DatanodeInfo> nodesWithStorages = getNodesWithStorages(t);
LinkedList<DatanodeInfo> value = null;
if (nodesWithStorages == null) {
value = new LinkedList<DatanodeDescriptor>();
value = new LinkedList<DatanodeInfo>();
value.add(dn);
typeNodeMap.put(t, value);
} else {
@ -820,7 +813,7 @@ private void add(StorageType t, DatanodeDescriptor dn) {
* - Storage type
* @return datanodes which has the given storage type
*/
private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
private List<DatanodeInfo> getNodesWithStorages(StorageType type) {
return typeNodeMap.get(type);
}
}
@ -982,7 +975,6 @@ List<Block> getBlocks() {
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
INode inode = namesystem.getFSDirectory().getINode(path);
return storageMovementNeeded.getStatus(inode.getId());
return storageMovementNeeded.getStatus(ctxt.getFileID(path));
}
}

View File

@ -25,8 +25,9 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.junit.After;
@ -46,11 +47,15 @@ public class TestBlockStorageMovementAttemptedItems {
@Before
public void setup() throws Exception {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
Mockito.mock(Namesystem.class),
Mockito.mock(StoragePolicySatisfier.class), 100);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
selfRetryTimeout, unsatisfiedStorageMovementFiles);
Configuration config = new HdfsConfiguration();
Context ctxt = Mockito.mock(Context.class);
Mockito.when(ctxt.getConf()).thenReturn(config);
Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(ctxt,
unsatisfiedStorageMovementFiles);
}
@After

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -68,6 +69,7 @@
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -105,7 +107,8 @@ public class TestStoragePolicySatisfier {
private DistributedFileSystem dfs = null;
private static final int DEFAULT_BLOCK_SIZE = 1024;
private void shutdownCluster() {
@After
public void shutdownCluster() {
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
@ -1298,11 +1301,17 @@ public void testTraverseWhenParentDeleted() throws Exception {
//entry in queue. After 10 files, traverse control will be on U.
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
Mockito.when(sps.isRunning()).thenReturn(true);
Context ctxt = Mockito.mock(Context.class);
config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
Mockito.when(ctxt.getConf()).thenReturn(config);
Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
BlockStorageMovementNeeded movmentNeededQueue =
new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
new BlockStorageMovementNeeded(ctxt);
INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
movmentNeededQueue.init();
movmentNeededQueue.init(fsDir);
//Wait for thread to reach U.
Thread.sleep(1000);
@ -1361,9 +1370,15 @@ public void testTraverseWhenRootParentDeleted() throws Exception {
Mockito.when(sps.isRunning()).thenReturn(true);
// Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U.
Context ctxt = Mockito.mock(Context.class);
config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
Mockito.when(ctxt.getConf()).thenReturn(config);
Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
BlockStorageMovementNeeded movmentNeededQueue =
new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
movmentNeededQueue.init();
new BlockStorageMovementNeeded(ctxt);
movmentNeededQueue.init(fsDir);
INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
// Wait for thread to reach U.