HDFS-8815. DFS getStoragePolicy implementation using single RPC call (Contributed by Surendra Singh Lilhore)

(cherry picked from commit cc71ad80e1)
This commit is contained in:
Vinayakumar B 2015-08-06 11:10:48 +05:30
parent dc76c4b035
commit f8932d478f
10 changed files with 140 additions and 11 deletions

View File

@ -279,6 +279,20 @@ public interface ClientProtocol {
void setStoragePolicy(String src, String policyName) void setStoragePolicy(String src, String policyName)
throws IOException; throws IOException;
/**
* Get the storage policy for a file/directory.
* @param path
* Path of an existing file/directory.
* @throws AccessControlException
* If access is denied
* @throws org.apache.hadoop.fs.UnresolvedLinkException
* if <code>src</code> contains a symlink
* @throws java.io.FileNotFoundException
* If file/dir <code>src</code> is not found
*/
@Idempotent
BlockStoragePolicy getStoragePolicy(String path) throws IOException;
/** /**
* Set permissions for an existing file/directory. * Set permissions for an existing file/directory.
* *

View File

@ -112,6 +112,14 @@ message SetStoragePolicyRequestProto {
message SetStoragePolicyResponseProto { // void response message SetStoragePolicyResponseProto { // void response
} }
message GetStoragePolicyRequestProto {
required string path = 1;
}
message GetStoragePolicyResponseProto {
required BlockStoragePolicyProto storagePolicy = 1;
}
message GetStoragePoliciesRequestProto { // void request message GetStoragePoliciesRequestProto { // void request
} }
@ -722,6 +730,8 @@ service ClientNamenodeProtocol {
returns(SetReplicationResponseProto); returns(SetReplicationResponseProto);
rpc setStoragePolicy(SetStoragePolicyRequestProto) rpc setStoragePolicy(SetStoragePolicyRequestProto)
returns(SetStoragePolicyResponseProto); returns(SetStoragePolicyResponseProto);
rpc getStoragePolicy(GetStoragePolicyRequestProto)
returns(GetStoragePolicyResponseProto);
rpc getStoragePolicies(GetStoragePoliciesRequestProto) rpc getStoragePolicies(GetStoragePoliciesRequestProto)
returns(GetStoragePoliciesResponseProto); returns(GetStoragePoliciesResponseProto);
rpc setPermission(SetPermissionRequestProto) rpc setPermission(SetPermissionRequestProto)

View File

@ -427,6 +427,9 @@ Release 2.8.0 - UNRELEASED
HDFS-6860. BlockStateChange logs are too noisy. (Chang Li and xyao via xyao) HDFS-6860. BlockStateChange logs are too noisy. (Chang Li and xyao via xyao)
HDFS-8815. DFS getStoragePolicy implementation using single RPC call
(Surendra Singh Lilhore via vinayakumarb)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -1568,21 +1568,22 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** /**
* @param path file/directory name
* @return Get the storage policy for specified path * @return Get the storage policy for specified path
*/ */
public BlockStoragePolicy getStoragePolicy(String path) throws IOException { public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
HdfsFileStatus status = getFileInfo(path); checkOpen();
if (status == null) { TraceScope scope = getPathTraceScope("getStoragePolicy", path);
throw new FileNotFoundException("File does not exist: " + path); try {
return namenode.getStoragePolicy(path);
} catch (RemoteException e) {
throw e.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
SafeModeException.class,
UnresolvedPathException.class);
} finally {
scope.close();
} }
byte storagePolicyId = status.getStoragePolicy();
BlockStoragePolicy[] policies = getStoragePolicies();
for (BlockStoragePolicy policy : policies) {
if (policy.getId() == storagePolicyId) {
return policy;
}
}
return null;
} }
/** /**

View File

@ -128,6 +128,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
@ -198,6 +200,7 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@ -1459,6 +1462,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
return VOID_SET_STORAGE_POLICY_RESPONSE; return VOID_SET_STORAGE_POLICY_RESPONSE;
} }
@Override
public GetStoragePolicyResponseProto getStoragePolicy(
RpcController controller, GetStoragePolicyRequestProto request)
throws ServiceException {
try {
BlockStoragePolicyProto policy = PBHelper.convert(server
.getStoragePolicy(request.getPath()));
return GetStoragePolicyResponseProto.newBuilder()
.setStoragePolicy(policy).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override @Override
public GetStoragePoliciesResponseProto getStoragePolicies( public GetStoragePoliciesResponseProto getStoragePolicies(
RpcController controller, GetStoragePoliciesRequestProto request) RpcController controller, GetStoragePoliciesRequestProto request)

View File

@ -124,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
@ -1482,6 +1483,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
} }
} }
@Override
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
GetStoragePolicyRequestProto request = GetStoragePolicyRequestProto
.newBuilder().setPath(path).build();
try {
return PBHelper.convert(rpcProxy.getStoragePolicy(null, request)
.getStoragePolicy());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override @Override
public BlockStoragePolicy[] getStoragePolicies() throws IOException { public BlockStoragePolicy[] getStoragePolicies() throws IOException {
try { try {

View File

@ -200,6 +200,29 @@ public class FSDirAttrOp {
return bm.getStoragePolicies(); return bm.getStoragePolicies();
} }
static BlockStoragePolicy getStoragePolicy(FSDirectory fsd, BlockManager bm,
String path) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory
.getPathComponentsForReservedPath(path);
fsd.readLock();
try {
path = fsd.resolvePath(pc, path, pathComponents);
final INodesInPath iip = fsd.getINodesInPath(path, false);
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.READ);
}
INode inode = iip.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File/Directory does not exist: "
+ iip.getPath());
}
return bm.getStoragePolicy(inode.getStoragePolicyID());
} finally {
fsd.readUnlock();
}
}
static long getPreferredBlockSize(FSDirectory fsd, String src) static long getPreferredBlockSize(FSDirectory fsd, String src)
throws IOException { throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker(); FSPermissionChecker pc = fsd.getPermissionChecker();

View File

@ -1948,6 +1948,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(true, "setStoragePolicy", src, null, auditStat); logAuditEvent(true, "setStoragePolicy", src, null, auditStat);
} }
/**
* Get the storage policy for a file or a directory.
*
* @param src
* file/directory path
* @return storage policy object
*/
BlockStoragePolicy getStoragePolicy(String src) throws IOException {
checkOperation(OperationCategory.READ);
waitForLoadingFSImage();
readLock();
try {
checkOperation(OperationCategory.READ);
return FSDirAttrOp.getStoragePolicy(dir, blockManager, src);
} finally {
readUnlock();
}
}
/** /**
* @return All the existing block storage policies * @return All the existing block storage policies
*/ */

View File

@ -685,6 +685,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
namesystem.setStoragePolicy(src, policyName); namesystem.setStoragePolicy(src, policyName);
} }
@Override
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
checkNNStartup();
return namesystem.getStoragePolicy(path);
}
@Override @Override
public BlockStoragePolicy[] getStoragePolicies() throws IOException { public BlockStoragePolicy[] getStoragePolicies() throws IOException {
checkNNStartup(); checkNNStartup();

View File

@ -979,6 +979,29 @@ public class TestBlockStoragePolicy {
} }
} }
@Test
public void testGetStoragePolicy() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPLICATION).build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
final Path dir = new Path("/testGetStoragePolicy");
final Path fooFile = new Path(dir, "foo");
DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L);
DFSClient client = new DFSClient(cluster.getNameNode(0)
.getNameNodeAddress(), conf);
client.setStoragePolicy("/testGetStoragePolicy/foo",
HdfsConstants.COLD_STORAGE_POLICY_NAME);
String policyName = client.getStoragePolicy("/testGetStoragePolicy/foo")
.getName();
Assert.assertEquals("File storage policy should be COLD",
HdfsConstants.COLD_STORAGE_POLICY_NAME, policyName);
} finally {
cluster.shutdown();
}
}
@Test @Test
public void testSetStoragePolicyWithSnapshot() throws Exception { public void testSetStoragePolicyWithSnapshot() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)