HDFS-10802. [SPS]: Add satisfyStoragePolicy API in HdfsAdmin. Contributed by Yuanbo Liu

This commit is contained in:
Rakesh Radhakrishnan 2016-11-17 14:07:45 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 19b5aee3e4
commit b67ae6d9d7
12 changed files with 323 additions and 1 deletions

View File

@ -3079,7 +3079,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* has REPLICATION policy.
* @throws IOException
*/
public ErasureCodingPolicy getErasureCodingPolicy(String src)
throws IOException {
checkOpen();
@ -3092,6 +3091,24 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
/**
* Satisfy storage policy for an existing file/directory.
* @param src file/directory name
* @throws IOException
*/
public void satisfyStoragePolicy(String src) throws IOException {
checkOpen();
try (TraceScope ignored =
newPathTraceScope("satisfyStoragePolicy", src)) {
namenode.satisfyStoragePolicy(src);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
SafeModeException.class,
UnresolvedPathException.class);
}
}
Tracer getTracer() {
return tracer;
}

View File

@ -2862,6 +2862,40 @@ public class DistributedFileSystem extends FileSystem
}.resolve(this, absF);
}
/**
* Set the source path to satisfy storage policy. This API is non-recursive
* in nature, i.e., if the source path is a directory then all the files
* immediately under the directory would be considered for satisfying the
* policy and the sub-directories if any under this path will be skipped.
*
* @param path The source path referring to either a directory or a file.
* @throws IOException
*/
public void satisfyStoragePolicy(final Path path) throws IOException {
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
public Void doCall(Path p) throws IOException {
dfs.satisfyStoragePolicy(getPathName(p));
return null;
}
@Override
public Void next(FileSystem fs, Path p) throws IOException {
// DFS only
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem) fs;
myDfs.satisfyStoragePolicy(p);
return null;
}
throw new UnsupportedOperationException(
"Cannot satisfyStoragePolicy through a symlink to a "
+ "non-DistributedFileSystem: " + path + " -> " + p);
}
}.resolve(this, absF);
}
/**
* Get erasure coding policy information for the specified path
*

View File

@ -532,6 +532,16 @@ public class HdfsAdmin {
return dfs.getErasureCodingPolicy(path);
}
/**
* Set the source path to the specified storage policy.
*
* @param path The source path referring to either a directory or a file.
* @throws IOException
*/
public void satisfyStoragePolicy(final Path path) throws IOException {
dfs.satisfyStoragePolicy(path);
}
/**
* Get the Erasure coding policies supported.
*

View File

@ -1742,4 +1742,18 @@ public interface ClientProtocol {
@Idempotent
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException;
/**
* Satisfy the storage policy for a file/directory.
* @param path Path of an existing file/directory.
* @throws AccessControlException If access is denied.
* @throws org.apache.hadoop.fs.UnresolvedLinkException if <code>src</code>
* contains a symlink.
* @throws java.io.FileNotFoundException If file/dir <code>src</code> is not
* found.
* @throws org.apache.hadoop.hdfs.server.namenode.SafeModeException append not
* allowed in safemode.
*/
@Idempotent
void satisfyStoragePolicy(String path) throws IOException;
}

View File

@ -188,6 +188,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.*;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
@ -1944,4 +1945,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
@Override
public void satisfyStoragePolicy(String src) throws IOException {
SatisfyStoragePolicyRequestProto req =
SatisfyStoragePolicyRequestProto.newBuilder().setSrc(src).build();
try {
rpcProxy.satisfyStoragePolicy(null, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -831,6 +831,14 @@ message ListOpenFilesResponseProto {
repeated OpenFilesTypeProto types = 3;
}
message SatisfyStoragePolicyRequestProto {
required string src = 1;
}
message SatisfyStoragePolicyResponseProto {
}
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@ -1017,4 +1025,6 @@ service ClientNamenodeProtocol {
returns(GetQuotaUsageResponseProto);
rpc listOpenFiles(ListOpenFilesRequestProto)
returns(ListOpenFilesResponseProto);
rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto)
returns(SatisfyStoragePolicyResponseProto);
}

View File

@ -225,6 +225,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathResponseProto;
@ -407,6 +409,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
private static final CheckAccessResponseProto
VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance();
private static final SatisfyStoragePolicyResponseProto
VOID_SATISFYSTORAGEPOLICY_RESPONSE = SatisfyStoragePolicyResponseProto
.getDefaultInstance();
/**
* Constructor
*
@ -1886,4 +1892,16 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
@Override
public SatisfyStoragePolicyResponseProto satisfyStoragePolicy(
RpcController controller,
SatisfyStoragePolicyRequestProto request) throws ServiceException {
try {
server.satisfyStoragePolicy(request.getSrc());
} catch (IOException e) {
throw new ServiceException(e);
}
return VOID_SATISFYSTORAGEPOLICY_RESPONSE;
}
}

View File

@ -4999,6 +4999,10 @@ public class BlockManager implements BlockStatsMXBean {
*/
public void satisfyStoragePolicy(long id) {
storageMovementNeeded.add(id);
if(LOG.isDebugEnabled()) {
LOG.debug("Added block collection id {} to block "
+ "storageMovementNeeded queue", id);
}
}
public StoragePolicySatisfier getStoragePolicySatisfier() {

View File

@ -190,6 +190,33 @@ public class FSDirAttrOp {
return fsd.getAuditFileInfo(iip);
}
static void satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
String src) throws IOException {
// make sure storage policy is enabled, otherwise
// there is no need to satisfy storage policy.
if (!fsd.isStoragePolicyEnabled()) {
throw new IOException(String.format(
"Failed to satisfy storage policy since %s is set to false.",
DFS_STORAGE_POLICY_ENABLED_KEY));
}
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
// check operation permission.
iip = fsd.resolvePath(pc, src, DirOp.WRITE);
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
unprotectedSatisfyStoragePolicy(bm, iip);
} finally {
fsd.writeUnlock();
}
}
static BlockStoragePolicy[] getStoragePolicies(BlockManager bm)
throws IOException {
return bm.getStoragePolicies();
@ -451,6 +478,35 @@ public class FSDirAttrOp {
}
}
static void unprotectedSatisfyStoragePolicy(BlockManager bm,
INodesInPath iip) throws IOException {
// check whether file exists.
INode inode = iip.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File/Directory does not exist: "
+ iip.getPath());
}
// TODO: need to check whether inode's storage policy
// has been satisfied or inode exists in the satisfier
// list before calling satisfyStoragePolicy in BlockManager.
if (inode.isDirectory()) {
final int snapshotId = iip.getLatestSnapshotId();
for (INode node : inode.asDirectory().getChildrenList(snapshotId)) {
if (node.isFile()) {
bm.satisfyStoragePolicy(node.getId());
}
}
} else if (inode.isFile()) {
bm.satisfyStoragePolicy(inode.getId());
} else {
throw new FileNotFoundException("File/Directory does not exist: "
+ iip.getPath());
}
}
private static void setDirStoragePolicy(
FSDirectory fsd, INodesInPath iip, byte policyId) throws IOException {
INode inode = FSDirectory.resolveLastINode(iip);

View File

@ -2226,6 +2226,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(true, operationName, src, null, auditStat);
}
/**
* Satisfy the storage policy for a file or a directory.
*
* @param src file/directory path
*/
void satisfyStoragePolicy(String src) throws IOException {
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot satisfy storage policy for " + src);
// TODO: need to update editlog for persistence.
FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src);
} finally {
writeUnlock();
}
}
/**
* unset storage policy set for a given file or a directory.
*

View File

@ -1405,6 +1405,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return namesystem.getQuotaUsage(path);
}
@Override // ClientProtocol
public void satisfyStoragePolicy(String src) throws IOException {
checkNNStartup();
namesystem.satisfyStoragePolicy(src);
}
@Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
StorageType type)

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -24,14 +25,18 @@ import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@ -39,6 +44,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
/**
* Tests that StoragePolicySatisfier daemon is able to check the blocks to be
* moved and finding its suggested target locations to move.
@ -232,6 +239,123 @@ public class TestStoragePolicySatisfier {
}
}
/**
* Tests to verify hdfsAdmin.satisfyStoragePolicy works well for file.
* @throws Exception
*/
@Test(timeout = 300000)
public void testSatisfyFileWithHdfsAdmin() throws Exception {
HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD");
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
hdfsAdmin.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
} finally {
hdfsCluster.shutdown();
}
}
/**
* Tests to verify hdfsAdmin.satisfyStoragePolicy works well for dir.
* @throws Exception
*/
@Test(timeout = 300000)
public void testSatisfyDirWithHdfsAdmin() throws Exception {
HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
final String subDir = "/subDir";
final String subFile1 = subDir + "/subFile1";
final String subDir2 = subDir + "/subDir2";
final String subFile2 = subDir2 + "/subFile2";
dfs.mkdirs(new Path(subDir));
writeContent(subFile1);
dfs.mkdirs(new Path(subDir2));
writeContent(subFile2);
// Change policy to COLD
dfs.setStoragePolicy(new Path(subDir), "ONE_SSD");
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
hdfsAdmin.satisfyStoragePolicy(new Path(subDir));
hdfsCluster.triggerHeartbeats();
// take effect for the file in the directory.
waitExpectedStorageType(subFile1, StorageType.SSD, 1, 30000);
waitExpectedStorageType(subFile1, StorageType.DISK, 2, 30000);
// take no effect for the sub-dir's file in the directory.
waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000);
} finally {
hdfsCluster.shutdown();
}
}
/**
* Tests to verify hdfsAdmin.satisfyStoragePolicy exceptions.
* @throws Exception
*/
@Test(timeout = 300000)
public void testSatisfyWithExceptions() throws Exception {
try {
final String nonExistingFile = "/noneExistingFile";
hdfsCluster.getConfiguration(0).
setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
hdfsCluster.restartNameNodes();
hdfsCluster.waitActive();
HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
hdfsAdmin.satisfyStoragePolicy(new Path(file));
Assert.fail(String.format(
"Should failed to satisfy storage policy "
+ "for %s since %s is set to false.",
file, DFS_STORAGE_POLICY_ENABLED_KEY));
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(String.format(
"Failed to satisfy storage policy since %s is set to false.",
DFS_STORAGE_POLICY_ENABLED_KEY)));
}
hdfsCluster.getConfiguration(0).
setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
hdfsCluster.restartNameNodes();
hdfsCluster.waitActive();
hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
hdfsAdmin.satisfyStoragePolicy(new Path(nonExistingFile));
Assert.fail("Should throw FileNotFoundException for " +
nonExistingFile);
} catch (FileNotFoundException e) {
}
} finally {
hdfsCluster.shutdown();
}
}
private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();