HDFS-12945. Switch to ClientProtocol instead of NamenodeProtocols in NamenodeWebHdfsMethods. Contributed by Wei Yan.

(cherry picked from commit 2ee0d64ace)
This commit is contained in:
Wei Yan 2018-01-08 14:25:39 -08:00
parent a35267b47a
commit 5e8029eff9
1 changed files with 64 additions and 51 deletions

View File

@ -40,6 +40,7 @@ import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -161,6 +163,15 @@ public class NamenodeWebHdfsMethods {
return np;
}
protected ClientProtocol getRpcClientProtocol() throws IOException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final ClientProtocol cp = namenode.getRpcServer();
if (cp == null) {
throw new RetriableException("Namenode is in startup mode");
}
return cp;
}
private <T> T doAs(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
@ -541,7 +552,7 @@ public class NamenodeWebHdfsMethods {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case CREATE:
@ -560,13 +571,13 @@ public class NamenodeWebHdfsMethods {
}
case MKDIRS:
{
final boolean b = np.mkdirs(fullpath, permission.getFsPermission(), true);
final boolean b = cp.mkdirs(fullpath, permission.getFsPermission(), true);
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case CREATESYMLINK:
{
np.createSymlink(destination.getValue(), fullpath,
cp.createSymlink(destination.getValue(), fullpath,
PermissionParam.getDefaultFsPermission(), createParent.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
@ -574,18 +585,18 @@ public class NamenodeWebHdfsMethods {
{
final EnumSet<Options.Rename> s = renameOptions.getValue();
if (s.isEmpty()) {
final boolean b = np.rename(fullpath, destination.getValue());
final boolean b = cp.rename(fullpath, destination.getValue());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
} else {
np.rename2(fullpath, destination.getValue(),
cp.rename2(fullpath, destination.getValue(),
s.toArray(new Options.Rename[s.size()]));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
}
case SETREPLICATION:
{
final boolean b = np.setReplication(fullpath, replication.getValue(conf));
final boolean b = cp.setReplication(fullpath, replication.getValue(conf));
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
@ -595,24 +606,24 @@ public class NamenodeWebHdfsMethods {
throw new IllegalArgumentException("Both owner and group are empty.");
}
np.setOwner(fullpath, owner.getValue(), group.getValue());
cp.setOwner(fullpath, owner.getValue(), group.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETPERMISSION:
{
np.setPermission(fullpath, permission.getFsPermission());
cp.setPermission(fullpath, permission.getFsPermission());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETTIMES:
{
np.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
cp.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case RENEWDELEGATIONTOKEN:
{
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegationTokenArgument.getValue());
final long expiryTime = np.renewDelegationToken(token);
final long expiryTime = cp.renewDelegationToken(token);
final String js = JsonUtil.toJsonString("long", expiryTime);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
@ -620,64 +631,65 @@ public class NamenodeWebHdfsMethods {
{
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegationTokenArgument.getValue());
np.cancelDelegationToken(token);
cp.cancelDelegationToken(token);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MODIFYACLENTRIES: {
np.modifyAclEntries(fullpath, aclPermission.getAclPermission(true));
cp.modifyAclEntries(fullpath, aclPermission.getAclPermission(true));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEACLENTRIES: {
np.removeAclEntries(fullpath, aclPermission.getAclPermission(false));
cp.removeAclEntries(fullpath, aclPermission.getAclPermission(false));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEDEFAULTACL: {
np.removeDefaultAcl(fullpath);
cp.removeDefaultAcl(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEACL: {
np.removeAcl(fullpath);
cp.removeAcl(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETACL: {
np.setAcl(fullpath, aclPermission.getAclPermission(true));
cp.setAcl(fullpath, aclPermission.getAclPermission(true));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETXATTR: {
np.setXAttr(
cp.setXAttr(
fullpath,
XAttrHelper.buildXAttr(xattrName.getXAttrName(),
xattrValue.getXAttrValue()), xattrSetFlag.getFlag());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEXATTR: {
np.removeXAttr(fullpath, XAttrHelper.buildXAttr(xattrName.getXAttrName()));
cp.removeXAttr(fullpath, XAttrHelper.buildXAttr(xattrName.getXAttrName()));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case ALLOWSNAPSHOT: {
np.allowSnapshot(fullpath);
cp.allowSnapshot(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case CREATESNAPSHOT: {
String snapshotPath = np.createSnapshot(fullpath, snapshotName.getValue());
String snapshotPath =
cp.createSnapshot(fullpath, snapshotName.getValue());
final String js = JsonUtil.toJsonString(
org.apache.hadoop.fs.Path.class.getSimpleName(), snapshotPath);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case RENAMESNAPSHOT: {
np.renameSnapshot(fullpath, oldSnapshotName.getValue(),
cp.renameSnapshot(fullpath, oldSnapshotName.getValue(),
snapshotName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case DISALLOWSNAPSHOT: {
np.disallowSnapshot(fullpath);
cp.disallowSnapshot(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETSTORAGEPOLICY: {
if (policyName.getValue() == null) {
throw new IllegalArgumentException("Storage policy name is empty.");
}
np.setStoragePolicy(fullpath, policyName.getValue());
cp.setStoragePolicy(fullpath, policyName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
@ -769,12 +781,12 @@ public class NamenodeWebHdfsMethods {
final NewLengthParam newLength,
final NoRedirectParam noredirectParam
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case APPEND:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, -1L,
excludeDatanodes.getValue(), bufferSize);
@ -788,7 +800,7 @@ public class NamenodeWebHdfsMethods {
}
case CONCAT:
{
np.concat(fullpath, concatSrcs.getAbsolutePaths());
cp.concat(fullpath, concatSrcs.getAbsolutePaths());
return Response.ok().build();
}
case TRUNCATE:
@ -798,13 +810,13 @@ public class NamenodeWebHdfsMethods {
"newLength parameter is Missing");
}
// We treat each rest request as a separate client.
final boolean b = np.truncate(fullpath, newLength.getValue(),
final boolean b = cp.truncate(fullpath, newLength.getValue(),
"DFSClient_" + DFSUtil.getSecureRandom().nextLong());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case UNSETSTORAGEPOLICY: {
np.unsetStoragePolicy(fullpath);
cp.unsetStoragePolicy(fullpath);
return Response.ok().build();
}
default:
@ -932,14 +944,14 @@ public class NamenodeWebHdfsMethods {
final NoRedirectParam noredirectParam,
final StartAfterParam startAfter
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final Configuration conf = (Configuration) context
.getAttribute(JspHelper.CURRENT_CONF);
final NamenodeProtocols np = getRPCServer(namenode);
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case OPEN:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
excludeDatanodes.getValue(), offset, length, bufferSize);
@ -970,14 +982,14 @@ public class NamenodeWebHdfsMethods {
{
final long offsetValue = offset.getValue();
final Long lengthValue = length.getValue();
final LocatedBlocks locatedblocks = np.getBlockLocations(fullpath,
final LocatedBlocks locatedblocks = cp.getBlockLocations(fullpath,
offsetValue, lengthValue != null? lengthValue: Long.MAX_VALUE);
final String js = JsonUtil.toJsonString(locatedblocks);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETFILESTATUS:
{
final HdfsFileStatus status = np.getFileInfo(fullpath);
final HdfsFileStatus status = cp.getFileInfo(fullpath);
if (status == null) {
throw new FileNotFoundException("File does not exist: " + fullpath);
}
@ -987,17 +999,18 @@ public class NamenodeWebHdfsMethods {
}
case LISTSTATUS:
{
final StreamingOutput streaming = getListingStream(np, fullpath);
final StreamingOutput streaming = getListingStream(cp, fullpath);
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
}
case GETCONTENTSUMMARY:
{
final ContentSummary contentsummary = np.getContentSummary(fullpath);
final ContentSummary contentsummary = cp.getContentSummary(fullpath);
final String js = JsonUtil.toJsonString(contentsummary);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETFILECHECKSUM:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, -1L, null);
if(!noredirectParam.getValue()) {
@ -1014,6 +1027,7 @@ public class NamenodeWebHdfsMethods {
throw new IllegalArgumentException(delegation.getName()
+ " parameter is not null.");
}
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final Token<? extends TokenIdentifier> token = generateDelegationToken(
namenode, ugi, renewer.getValue());
@ -1035,7 +1049,7 @@ public class NamenodeWebHdfsMethods {
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETACLSTATUS: {
AclStatus status = np.getAclStatus(fullpath);
AclStatus status = cp.getAclStatus(fullpath);
if (status == null) {
throw new FileNotFoundException("File does not exist: " + fullpath);
}
@ -1053,19 +1067,19 @@ public class NamenodeWebHdfsMethods {
}
}
}
List<XAttr> xAttrs = np.getXAttrs(fullpath, (names != null &&
List<XAttr> xAttrs = cp.getXAttrs(fullpath, (names != null &&
!names.isEmpty()) ? XAttrHelper.buildXAttrs(names) : null);
final String js = JsonUtil.toJsonString(xAttrs,
xattrEncoding.getEncoding());
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case LISTXATTRS: {
final List<XAttr> xAttrs = np.listXAttrs(fullpath);
final List<XAttr> xAttrs = cp.listXAttrs(fullpath);
final String js = JsonUtil.toJsonString(xAttrs);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case CHECKACCESS: {
np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
cp.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
return Response.ok().build();
}
case GETTRASHROOT: {
@ -1079,17 +1093,17 @@ public class NamenodeWebHdfsMethods {
if (startAfter != null && startAfter.getValue() != null) {
start = startAfter.getValue().getBytes(Charsets.UTF_8);
}
final DirectoryListing listing = getDirectoryListing(np, fullpath, start);
final DirectoryListing listing = getDirectoryListing(cp, fullpath, start);
final String js = JsonUtil.toJsonString(listing);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETALLSTORAGEPOLICY: {
BlockStoragePolicy[] storagePolicies = np.getStoragePolicies();
BlockStoragePolicy[] storagePolicies = cp.getStoragePolicies();
final String js = JsonUtil.toJsonString(storagePolicies);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETSTORAGEPOLICY: {
BlockStoragePolicy storagePolicy = np.getStoragePolicy(fullpath);
BlockStoragePolicy storagePolicy = cp.getStoragePolicy(fullpath);
final String js = JsonUtil.toJsonString(storagePolicy);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
@ -1097,7 +1111,7 @@ public class NamenodeWebHdfsMethods {
// Since none of the server defaults values are hot reloaded, we can
// cache the output of serverDefaults.
if (serverDefaultsResponse == null) {
FsServerDefaults serverDefaults = np.getServerDefaults();
FsServerDefaults serverDefaults = cp.getServerDefaults();
serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults);
}
return Response.ok(serverDefaultsResponse)
@ -1124,21 +1138,21 @@ public class NamenodeWebHdfsMethods {
new org.apache.hadoop.fs.Path(fullPath)).toUri().getPath();
}
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np,
private static DirectoryListing getDirectoryListing(final ClientProtocol cp,
final String p, byte[] startAfter) throws IOException {
final DirectoryListing listing = np.getListing(p, startAfter, false);
final DirectoryListing listing = cp.getListing(p, startAfter, false);
if (listing == null) { // the directory does not exist
throw new FileNotFoundException("File " + p + " does not exist.");
}
return listing;
}
private static StreamingOutput getListingStream(final NamenodeProtocols np,
private static StreamingOutput getListingStream(final ClientProtocol cp,
final String p) throws IOException {
// allows exceptions like FNF or ACE to prevent http response of 200 for
// a failure since we can't (currently) return error responses in the
// middle of a streaming operation
final DirectoryListing firstDirList = getDirectoryListing(np, p,
final DirectoryListing firstDirList = getDirectoryListing(cp, p,
HdfsFileStatus.EMPTY_NAME);
// must save ugi because the streaming object will be executed outside
@ -1159,7 +1173,7 @@ public class NamenodeWebHdfsMethods {
public Void run() throws IOException {
long n = 0;
for (DirectoryListing dirList = firstDirList; ;
dirList = getDirectoryListing(np, p, dirList.getLastName())
dirList = getDirectoryListing(cp, p, dirList.getLastName())
) {
// send each segment of the directory listing
for (HdfsFileStatus s : dirList.getPartialListing()) {
@ -1252,17 +1266,16 @@ public class NamenodeWebHdfsMethods {
final RecursiveParam recursive,
final SnapshotNameParam snapshotName
) throws IOException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case DELETE: {
final boolean b = np.delete(fullpath, recursive.getValue());
final boolean b = cp.delete(fullpath, recursive.getValue());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case DELETESNAPSHOT: {
np.deleteSnapshot(fullpath, snapshotName.getValue());
cp.deleteSnapshot(fullpath, snapshotName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default: