HDFS-8815. DFS getStoragePolicy implementation using single RPC call (Contributed by Surendra Singh Lilhore)
This commit is contained in:
parent
df9e7280db
commit
cc71ad80e1
|
@ -279,6 +279,20 @@ public interface ClientProtocol {
|
||||||
void setStoragePolicy(String src, String policyName)
|
void setStoragePolicy(String src, String policyName)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the storage policy for a file/directory.
|
||||||
|
* @param path
|
||||||
|
* Path of an existing file/directory.
|
||||||
|
* @throws AccessControlException
|
||||||
|
* If access is denied
|
||||||
|
* @throws org.apache.hadoop.fs.UnresolvedLinkException
|
||||||
|
* if <code>src</code> contains a symlink
|
||||||
|
* @throws java.io.FileNotFoundException
|
||||||
|
* If file/dir <code>src</code> is not found
|
||||||
|
*/
|
||||||
|
@Idempotent
|
||||||
|
BlockStoragePolicy getStoragePolicy(String path) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set permissions for an existing file/directory.
|
* Set permissions for an existing file/directory.
|
||||||
*
|
*
|
||||||
|
|
|
@ -112,6 +112,14 @@ message SetStoragePolicyRequestProto {
|
||||||
message SetStoragePolicyResponseProto { // void response
|
message SetStoragePolicyResponseProto { // void response
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GetStoragePolicyRequestProto {
|
||||||
|
required string path = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetStoragePolicyResponseProto {
|
||||||
|
required BlockStoragePolicyProto storagePolicy = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message GetStoragePoliciesRequestProto { // void request
|
message GetStoragePoliciesRequestProto { // void request
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -725,6 +733,8 @@ service ClientNamenodeProtocol {
|
||||||
returns(SetReplicationResponseProto);
|
returns(SetReplicationResponseProto);
|
||||||
rpc setStoragePolicy(SetStoragePolicyRequestProto)
|
rpc setStoragePolicy(SetStoragePolicyRequestProto)
|
||||||
returns(SetStoragePolicyResponseProto);
|
returns(SetStoragePolicyResponseProto);
|
||||||
|
rpc getStoragePolicy(GetStoragePolicyRequestProto)
|
||||||
|
returns(GetStoragePolicyResponseProto);
|
||||||
rpc getStoragePolicies(GetStoragePoliciesRequestProto)
|
rpc getStoragePolicies(GetStoragePoliciesRequestProto)
|
||||||
returns(GetStoragePoliciesResponseProto);
|
returns(GetStoragePoliciesResponseProto);
|
||||||
rpc setPermission(SetPermissionRequestProto)
|
rpc setPermission(SetPermissionRequestProto)
|
||||||
|
|
|
@ -770,6 +770,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6860. BlockStateChange logs are too noisy. (Chang Li and xyao via xyao)
|
HDFS-6860. BlockStateChange logs are too noisy. (Chang Li and xyao via xyao)
|
||||||
|
|
||||||
|
HDFS-8815. DFS getStoragePolicy implementation using single RPC call
|
||||||
|
(Surendra Singh Lilhore via vinayakumarb)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -1574,21 +1574,22 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @param path file/directory name
|
||||||
* @return Get the storage policy for specified path
|
* @return Get the storage policy for specified path
|
||||||
*/
|
*/
|
||||||
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
|
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
|
||||||
HdfsFileStatus status = getFileInfo(path);
|
checkOpen();
|
||||||
if (status == null) {
|
TraceScope scope = getPathTraceScope("getStoragePolicy", path);
|
||||||
throw new FileNotFoundException("File does not exist: " + path);
|
try {
|
||||||
|
return namenode.getStoragePolicy(path);
|
||||||
|
} catch (RemoteException e) {
|
||||||
|
throw e.unwrapRemoteException(AccessControlException.class,
|
||||||
|
FileNotFoundException.class,
|
||||||
|
SafeModeException.class,
|
||||||
|
UnresolvedPathException.class);
|
||||||
|
} finally {
|
||||||
|
scope.close();
|
||||||
}
|
}
|
||||||
byte storagePolicyId = status.getStoragePolicy();
|
|
||||||
BlockStoragePolicy[] policies = getStoragePolicies();
|
|
||||||
for (BlockStoragePolicy policy : policies) {
|
|
||||||
if (policy.getId() == storagePolicyId) {
|
|
||||||
return policy;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -128,6 +128,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
||||||
|
@ -198,6 +200,7 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
||||||
|
@ -1457,6 +1460,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
||||||
return VOID_SET_STORAGE_POLICY_RESPONSE;
|
return VOID_SET_STORAGE_POLICY_RESPONSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetStoragePolicyResponseProto getStoragePolicy(
|
||||||
|
RpcController controller, GetStoragePolicyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
BlockStoragePolicyProto policy = PBHelper.convert(server
|
||||||
|
.getStoragePolicy(request.getPath()));
|
||||||
|
return GetStoragePolicyResponseProto.newBuilder()
|
||||||
|
.setStoragePolicy(policy).build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetStoragePoliciesResponseProto getStoragePolicies(
|
public GetStoragePoliciesResponseProto getStoragePolicies(
|
||||||
RpcController controller, GetStoragePoliciesRequestProto request)
|
RpcController controller, GetStoragePoliciesRequestProto request)
|
||||||
|
|
|
@ -124,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
|
||||||
|
@ -1484,6 +1485,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
|
||||||
|
GetStoragePolicyRequestProto request = GetStoragePolicyRequestProto
|
||||||
|
.newBuilder().setPath(path).build();
|
||||||
|
try {
|
||||||
|
return PBHelper.convert(rpcProxy.getStoragePolicy(null, request)
|
||||||
|
.getStoragePolicy());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -200,6 +200,29 @@ public class FSDirAttrOp {
|
||||||
return bm.getStoragePolicies();
|
return bm.getStoragePolicies();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static BlockStoragePolicy getStoragePolicy(FSDirectory fsd, BlockManager bm,
|
||||||
|
String path) throws IOException {
|
||||||
|
FSPermissionChecker pc = fsd.getPermissionChecker();
|
||||||
|
byte[][] pathComponents = FSDirectory
|
||||||
|
.getPathComponentsForReservedPath(path);
|
||||||
|
fsd.readLock();
|
||||||
|
try {
|
||||||
|
path = fsd.resolvePath(pc, path, pathComponents);
|
||||||
|
final INodesInPath iip = fsd.getINodesInPath(path, false);
|
||||||
|
if (fsd.isPermissionEnabled()) {
|
||||||
|
fsd.checkPathAccess(pc, iip, FsAction.READ);
|
||||||
|
}
|
||||||
|
INode inode = iip.getLastINode();
|
||||||
|
if (inode == null) {
|
||||||
|
throw new FileNotFoundException("File/Directory does not exist: "
|
||||||
|
+ iip.getPath());
|
||||||
|
}
|
||||||
|
return bm.getStoragePolicy(inode.getStoragePolicyID());
|
||||||
|
} finally {
|
||||||
|
fsd.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static long getPreferredBlockSize(FSDirectory fsd, String src)
|
static long getPreferredBlockSize(FSDirectory fsd, String src)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FSPermissionChecker pc = fsd.getPermissionChecker();
|
FSPermissionChecker pc = fsd.getPermissionChecker();
|
||||||
|
|
|
@ -1956,6 +1956,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
logAuditEvent(true, "setStoragePolicy", src, null, auditStat);
|
logAuditEvent(true, "setStoragePolicy", src, null, auditStat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the storage policy for a file or a directory.
|
||||||
|
*
|
||||||
|
* @param src
|
||||||
|
* file/directory path
|
||||||
|
* @return storage policy object
|
||||||
|
*/
|
||||||
|
BlockStoragePolicy getStoragePolicy(String src) throws IOException {
|
||||||
|
checkOperation(OperationCategory.READ);
|
||||||
|
waitForLoadingFSImage();
|
||||||
|
readLock();
|
||||||
|
try {
|
||||||
|
checkOperation(OperationCategory.READ);
|
||||||
|
return FSDirAttrOp.getStoragePolicy(dir, blockManager, src);
|
||||||
|
} finally {
|
||||||
|
readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return All the existing block storage policies
|
* @return All the existing block storage policies
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -690,6 +690,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
namesystem.setStoragePolicy(src, policyName);
|
namesystem.setStoragePolicy(src, policyName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
|
||||||
|
checkNNStartup();
|
||||||
|
return namesystem.getStoragePolicy(path);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
|
|
@ -979,6 +979,29 @@ public class TestBlockStoragePolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetStoragePolicy() throws Exception {
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(REPLICATION).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
try {
|
||||||
|
final Path dir = new Path("/testGetStoragePolicy");
|
||||||
|
final Path fooFile = new Path(dir, "foo");
|
||||||
|
DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L);
|
||||||
|
DFSClient client = new DFSClient(cluster.getNameNode(0)
|
||||||
|
.getNameNodeAddress(), conf);
|
||||||
|
client.setStoragePolicy("/testGetStoragePolicy/foo",
|
||||||
|
HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
|
String policyName = client.getStoragePolicy("/testGetStoragePolicy/foo")
|
||||||
|
.getName();
|
||||||
|
Assert.assertEquals("File storage policy should be COLD",
|
||||||
|
HdfsConstants.COLD_STORAGE_POLICY_NAME, policyName);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetStoragePolicyWithSnapshot() throws Exception {
|
public void testSetStoragePolicyWithSnapshot() throws Exception {
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
|
Loading…
Reference in New Issue