HDFS-12945. Switch to ClientProtocol instead of NamenodeProtocols in NamenodeWebHdfsMethods. Contributed by Wei Yan.
This commit is contained in:
parent
dc54747d70
commit
2ee0d64ace
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
|
@ -163,6 +164,15 @@ public class NamenodeWebHdfsMethods {
|
|||
return np;
|
||||
}
|
||||
|
||||
protected ClientProtocol getRpcClientProtocol() throws IOException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final ClientProtocol cp = namenode.getRpcServer();
|
||||
if (cp == null) {
|
||||
throw new RetriableException("Namenode is in startup mode");
|
||||
}
|
||||
return cp;
|
||||
}
|
||||
|
||||
private <T> T doAs(final UserGroupInformation ugi,
|
||||
final PrivilegedExceptionAction<T> action)
|
||||
throws IOException, InterruptedException {
|
||||
|
@ -566,7 +576,7 @@ public class NamenodeWebHdfsMethods {
|
|||
|
||||
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final NamenodeProtocols np = getRPCServer(namenode);
|
||||
final ClientProtocol cp = getRpcClientProtocol();
|
||||
|
||||
switch(op.getValue()) {
|
||||
case CREATE:
|
||||
|
@ -590,14 +600,14 @@ public class NamenodeWebHdfsMethods {
|
|||
permission.getDirFsPermission() :
|
||||
FsCreateModes.create(permission.getDirFsPermission(),
|
||||
unmaskedPermission.getDirFsPermission());
|
||||
final boolean b = np.mkdirs(fullpath, masked, true);
|
||||
final boolean b = cp.mkdirs(fullpath, masked, true);
|
||||
final String js = JsonUtil.toJsonString("boolean", b);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case CREATESYMLINK:
|
||||
{
|
||||
validateOpParams(op, destination);
|
||||
np.createSymlink(destination.getValue(), fullpath,
|
||||
cp.createSymlink(destination.getValue(), fullpath,
|
||||
PermissionParam.getDefaultSymLinkFsPermission(),
|
||||
createParent.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
|
@ -607,18 +617,18 @@ public class NamenodeWebHdfsMethods {
|
|||
validateOpParams(op, destination);
|
||||
final EnumSet<Options.Rename> s = renameOptions.getValue();
|
||||
if (s.isEmpty()) {
|
||||
final boolean b = np.rename(fullpath, destination.getValue());
|
||||
final boolean b = cp.rename(fullpath, destination.getValue());
|
||||
final String js = JsonUtil.toJsonString("boolean", b);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
} else {
|
||||
np.rename2(fullpath, destination.getValue(),
|
||||
cp.rename2(fullpath, destination.getValue(),
|
||||
s.toArray(new Options.Rename[s.size()]));
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
}
|
||||
case SETREPLICATION:
|
||||
{
|
||||
final boolean b = np.setReplication(fullpath, replication.getValue(conf));
|
||||
final boolean b = cp.setReplication(fullpath, replication.getValue(conf));
|
||||
final String js = JsonUtil.toJsonString("boolean", b);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
|
@ -628,17 +638,17 @@ public class NamenodeWebHdfsMethods {
|
|||
throw new IllegalArgumentException("Both owner and group are empty.");
|
||||
}
|
||||
|
||||
np.setOwner(fullpath, owner.getValue(), group.getValue());
|
||||
cp.setOwner(fullpath, owner.getValue(), group.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case SETPERMISSION:
|
||||
{
|
||||
np.setPermission(fullpath, permission.getDirFsPermission());
|
||||
cp.setPermission(fullpath, permission.getDirFsPermission());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case SETTIMES:
|
||||
{
|
||||
np.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
|
||||
cp.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case RENEWDELEGATIONTOKEN:
|
||||
|
@ -646,7 +656,7 @@ public class NamenodeWebHdfsMethods {
|
|||
validateOpParams(op, delegationTokenArgument);
|
||||
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
|
||||
token.decodeFromUrlString(delegationTokenArgument.getValue());
|
||||
final long expiryTime = np.renewDelegationToken(token);
|
||||
final long expiryTime = cp.renewDelegationToken(token);
|
||||
final String js = JsonUtil.toJsonString("long", expiryTime);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
|
@ -655,35 +665,35 @@ public class NamenodeWebHdfsMethods {
|
|||
validateOpParams(op, delegationTokenArgument);
|
||||
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
|
||||
token.decodeFromUrlString(delegationTokenArgument.getValue());
|
||||
np.cancelDelegationToken(token);
|
||||
cp.cancelDelegationToken(token);
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case MODIFYACLENTRIES: {
|
||||
validateOpParams(op, aclPermission);
|
||||
np.modifyAclEntries(fullpath, aclPermission.getAclPermission(true));
|
||||
cp.modifyAclEntries(fullpath, aclPermission.getAclPermission(true));
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case REMOVEACLENTRIES: {
|
||||
validateOpParams(op, aclPermission);
|
||||
np.removeAclEntries(fullpath, aclPermission.getAclPermission(false));
|
||||
cp.removeAclEntries(fullpath, aclPermission.getAclPermission(false));
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case REMOVEDEFAULTACL: {
|
||||
np.removeDefaultAcl(fullpath);
|
||||
cp.removeDefaultAcl(fullpath);
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case REMOVEACL: {
|
||||
np.removeAcl(fullpath);
|
||||
cp.removeAcl(fullpath);
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case SETACL: {
|
||||
validateOpParams(op, aclPermission);
|
||||
np.setAcl(fullpath, aclPermission.getAclPermission(true));
|
||||
cp.setAcl(fullpath, aclPermission.getAclPermission(true));
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case SETXATTR: {
|
||||
validateOpParams(op, xattrName, xattrSetFlag);
|
||||
np.setXAttr(
|
||||
cp.setXAttr(
|
||||
fullpath,
|
||||
XAttrHelper.buildXAttr(xattrName.getXAttrName(),
|
||||
xattrValue.getXAttrValue()), xattrSetFlag.getFlag());
|
||||
|
@ -691,34 +701,36 @@ public class NamenodeWebHdfsMethods {
|
|||
}
|
||||
case REMOVEXATTR: {
|
||||
validateOpParams(op, xattrName);
|
||||
np.removeXAttr(fullpath, XAttrHelper.buildXAttr(xattrName.getXAttrName()));
|
||||
cp.removeXAttr(fullpath,
|
||||
XAttrHelper.buildXAttr(xattrName.getXAttrName()));
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case ALLOWSNAPSHOT: {
|
||||
np.allowSnapshot(fullpath);
|
||||
cp.allowSnapshot(fullpath);
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case CREATESNAPSHOT: {
|
||||
String snapshotPath = np.createSnapshot(fullpath, snapshotName.getValue());
|
||||
String snapshotPath =
|
||||
cp.createSnapshot(fullpath, snapshotName.getValue());
|
||||
final String js = JsonUtil.toJsonString(
|
||||
org.apache.hadoop.fs.Path.class.getSimpleName(), snapshotPath);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case RENAMESNAPSHOT: {
|
||||
validateOpParams(op, oldSnapshotName, snapshotName);
|
||||
np.renameSnapshot(fullpath, oldSnapshotName.getValue(),
|
||||
cp.renameSnapshot(fullpath, oldSnapshotName.getValue(),
|
||||
snapshotName.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case DISALLOWSNAPSHOT: {
|
||||
np.disallowSnapshot(fullpath);
|
||||
cp.disallowSnapshot(fullpath);
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case SETSTORAGEPOLICY: {
|
||||
if (policyName.getValue() == null) {
|
||||
throw new IllegalArgumentException("Storage policy name is empty.");
|
||||
}
|
||||
np.setStoragePolicy(fullpath, policyName.getValue());
|
||||
cp.setStoragePolicy(fullpath, policyName.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
default:
|
||||
|
@ -812,12 +824,12 @@ public class NamenodeWebHdfsMethods {
|
|||
final NewLengthParam newLength,
|
||||
final NoRedirectParam noredirectParam
|
||||
) throws IOException, URISyntaxException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final NamenodeProtocols np = getRPCServer(namenode);
|
||||
final ClientProtocol cp = getRpcClientProtocol();
|
||||
|
||||
switch(op.getValue()) {
|
||||
case APPEND:
|
||||
{
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L, -1L,
|
||||
excludeDatanodes.getValue(), bufferSize);
|
||||
|
@ -832,20 +844,20 @@ public class NamenodeWebHdfsMethods {
|
|||
case CONCAT:
|
||||
{
|
||||
validateOpParams(op, concatSrcs);
|
||||
np.concat(fullpath, concatSrcs.getAbsolutePaths());
|
||||
cp.concat(fullpath, concatSrcs.getAbsolutePaths());
|
||||
return Response.ok().build();
|
||||
}
|
||||
case TRUNCATE:
|
||||
{
|
||||
validateOpParams(op, newLength);
|
||||
// We treat each rest request as a separate client.
|
||||
final boolean b = np.truncate(fullpath, newLength.getValue(),
|
||||
final boolean b = cp.truncate(fullpath, newLength.getValue(),
|
||||
"DFSClient_" + DFSUtil.getSecureRandom().nextLong());
|
||||
final String js = JsonUtil.toJsonString("boolean", b);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case UNSETSTORAGEPOLICY: {
|
||||
np.unsetStoragePolicy(fullpath);
|
||||
cp.unsetStoragePolicy(fullpath);
|
||||
return Response.ok().build();
|
||||
}
|
||||
default:
|
||||
|
@ -975,14 +987,14 @@ public class NamenodeWebHdfsMethods {
|
|||
final NoRedirectParam noredirectParam,
|
||||
final StartAfterParam startAfter
|
||||
) throws IOException, URISyntaxException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final Configuration conf = (Configuration) context
|
||||
.getAttribute(JspHelper.CURRENT_CONF);
|
||||
final NamenodeProtocols np = getRPCServer(namenode);
|
||||
final ClientProtocol cp = getRpcClientProtocol();
|
||||
|
||||
switch(op.getValue()) {
|
||||
case OPEN:
|
||||
{
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
|
||||
excludeDatanodes.getValue(), offset, length, bufferSize);
|
||||
|
@ -998,14 +1010,14 @@ public class NamenodeWebHdfsMethods {
|
|||
{
|
||||
final long offsetValue = offset.getValue();
|
||||
final Long lengthValue = length.getValue();
|
||||
final LocatedBlocks locatedblocks = np.getBlockLocations(fullpath,
|
||||
final LocatedBlocks locatedblocks = cp.getBlockLocations(fullpath,
|
||||
offsetValue, lengthValue != null? lengthValue: Long.MAX_VALUE);
|
||||
final String js = JsonUtil.toJsonString(locatedblocks);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETFILESTATUS:
|
||||
{
|
||||
final HdfsFileStatus status = np.getFileInfo(fullpath);
|
||||
final HdfsFileStatus status = cp.getFileInfo(fullpath);
|
||||
if (status == null) {
|
||||
throw new FileNotFoundException("File does not exist: " + fullpath);
|
||||
}
|
||||
|
@ -1015,17 +1027,18 @@ public class NamenodeWebHdfsMethods {
|
|||
}
|
||||
case LISTSTATUS:
|
||||
{
|
||||
final StreamingOutput streaming = getListingStream(np, fullpath);
|
||||
final StreamingOutput streaming = getListingStream(cp, fullpath);
|
||||
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETCONTENTSUMMARY:
|
||||
{
|
||||
final ContentSummary contentsummary = np.getContentSummary(fullpath);
|
||||
final ContentSummary contentsummary = cp.getContentSummary(fullpath);
|
||||
final String js = JsonUtil.toJsonString(contentsummary);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETFILECHECKSUM:
|
||||
{
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
||||
fullpath, op.getValue(), -1L, -1L, null);
|
||||
if(!noredirectParam.getValue()) {
|
||||
|
@ -1042,6 +1055,7 @@ public class NamenodeWebHdfsMethods {
|
|||
throw new IllegalArgumentException(delegation.getName()
|
||||
+ " parameter is not null.");
|
||||
}
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final Token<? extends TokenIdentifier> token = generateDelegationToken(
|
||||
namenode, ugi, renewer.getValue());
|
||||
|
||||
|
@ -1063,7 +1077,7 @@ public class NamenodeWebHdfsMethods {
|
|||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETACLSTATUS: {
|
||||
AclStatus status = np.getAclStatus(fullpath);
|
||||
AclStatus status = cp.getAclStatus(fullpath);
|
||||
if (status == null) {
|
||||
throw new FileNotFoundException("File does not exist: " + fullpath);
|
||||
}
|
||||
|
@ -1082,20 +1096,20 @@ public class NamenodeWebHdfsMethods {
|
|||
}
|
||||
}
|
||||
}
|
||||
List<XAttr> xAttrs = np.getXAttrs(fullpath, (names != null &&
|
||||
List<XAttr> xAttrs = cp.getXAttrs(fullpath, (names != null &&
|
||||
!names.isEmpty()) ? XAttrHelper.buildXAttrs(names) : null);
|
||||
final String js = JsonUtil.toJsonString(xAttrs,
|
||||
xattrEncoding.getEncoding());
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case LISTXATTRS: {
|
||||
final List<XAttr> xAttrs = np.listXAttrs(fullpath);
|
||||
final List<XAttr> xAttrs = cp.listXAttrs(fullpath);
|
||||
final String js = JsonUtil.toJsonString(xAttrs);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case CHECKACCESS: {
|
||||
validateOpParams(op, fsAction);
|
||||
np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
|
||||
cp.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
|
||||
return Response.ok().build();
|
||||
}
|
||||
case GETTRASHROOT: {
|
||||
|
@ -1109,17 +1123,17 @@ public class NamenodeWebHdfsMethods {
|
|||
if (startAfter != null && startAfter.getValue() != null) {
|
||||
start = startAfter.getValue().getBytes(Charsets.UTF_8);
|
||||
}
|
||||
final DirectoryListing listing = getDirectoryListing(np, fullpath, start);
|
||||
final DirectoryListing listing = getDirectoryListing(cp, fullpath, start);
|
||||
final String js = JsonUtil.toJsonString(listing);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETALLSTORAGEPOLICY: {
|
||||
BlockStoragePolicy[] storagePolicies = np.getStoragePolicies();
|
||||
BlockStoragePolicy[] storagePolicies = cp.getStoragePolicies();
|
||||
final String js = JsonUtil.toJsonString(storagePolicies);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETSTORAGEPOLICY: {
|
||||
BlockStoragePolicy storagePolicy = np.getStoragePolicy(fullpath);
|
||||
BlockStoragePolicy storagePolicy = cp.getStoragePolicy(fullpath);
|
||||
final String js = JsonUtil.toJsonString(storagePolicy);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
|
@ -1127,7 +1141,7 @@ public class NamenodeWebHdfsMethods {
|
|||
// Since none of the server defaults values are hot reloaded, we can
|
||||
// cache the output of serverDefaults.
|
||||
if (serverDefaultsResponse == null) {
|
||||
FsServerDefaults serverDefaults = np.getServerDefaults();
|
||||
FsServerDefaults serverDefaults = cp.getServerDefaults();
|
||||
serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults);
|
||||
}
|
||||
return Response.ok(serverDefaultsResponse)
|
||||
|
@ -1154,21 +1168,21 @@ public class NamenodeWebHdfsMethods {
|
|||
new org.apache.hadoop.fs.Path(fullPath)).toUri().getPath();
|
||||
}
|
||||
|
||||
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np,
|
||||
private static DirectoryListing getDirectoryListing(final ClientProtocol cp,
|
||||
final String p, byte[] startAfter) throws IOException {
|
||||
final DirectoryListing listing = np.getListing(p, startAfter, false);
|
||||
final DirectoryListing listing = cp.getListing(p, startAfter, false);
|
||||
if (listing == null) { // the directory does not exist
|
||||
throw new FileNotFoundException("File " + p + " does not exist.");
|
||||
}
|
||||
return listing;
|
||||
}
|
||||
|
||||
private static StreamingOutput getListingStream(final NamenodeProtocols np,
|
||||
private static StreamingOutput getListingStream(final ClientProtocol cp,
|
||||
final String p) throws IOException {
|
||||
// allows exceptions like FNF or ACE to prevent http response of 200 for
|
||||
// a failure since we can't (currently) return error responses in the
|
||||
// middle of a streaming operation
|
||||
final DirectoryListing firstDirList = getDirectoryListing(np, p,
|
||||
final DirectoryListing firstDirList = getDirectoryListing(cp, p,
|
||||
HdfsFileStatus.EMPTY_NAME);
|
||||
|
||||
// must save ugi because the streaming object will be executed outside
|
||||
|
@ -1189,7 +1203,7 @@ public class NamenodeWebHdfsMethods {
|
|||
public Void run() throws IOException {
|
||||
long n = 0;
|
||||
for (DirectoryListing dirList = firstDirList; ;
|
||||
dirList = getDirectoryListing(np, p, dirList.getLastName())
|
||||
dirList = getDirectoryListing(cp, p, dirList.getLastName())
|
||||
) {
|
||||
// send each segment of the directory listing
|
||||
for (HdfsFileStatus s : dirList.getPartialListing()) {
|
||||
|
@ -1282,18 +1296,17 @@ public class NamenodeWebHdfsMethods {
|
|||
final RecursiveParam recursive,
|
||||
final SnapshotNameParam snapshotName
|
||||
) throws IOException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final NamenodeProtocols np = getRPCServer(namenode);
|
||||
final ClientProtocol cp = getRpcClientProtocol();
|
||||
|
||||
switch(op.getValue()) {
|
||||
case DELETE: {
|
||||
final boolean b = np.delete(fullpath, recursive.getValue());
|
||||
final boolean b = cp.delete(fullpath, recursive.getValue());
|
||||
final String js = JsonUtil.toJsonString("boolean", b);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case DELETESNAPSHOT: {
|
||||
validateOpParams(op, snapshotName);
|
||||
np.deleteSnapshot(fullpath, snapshotName.getValue());
|
||||
cp.deleteSnapshot(fullpath, snapshotName.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue