HDDS-1834. parent directories not found in secure setup due to ACL check. Contributed by Doroszlai, Attila.

This closes #1171.
This commit is contained in:
Doroszlai, Attila 2019-07-30 22:41:16 +02:00 committed by Xiaoyu Yao
parent 42683aef1a
commit e68d8446c4
3 changed files with 91 additions and 11 deletions

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.security.acl; package org.apache.hadoop.ozone.security.acl;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@ -158,6 +159,14 @@ public final class OzoneObjInfo extends OzoneObj {
return new Builder(); return new Builder();
} }
public static Builder fromKeyArgs(OmKeyArgs args) {
return new Builder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setResType(ResourceType.KEY);
}
public Builder setResType(OzoneObj.ResourceType res) { public Builder setResType(OzoneObj.ResourceType res) {
this.resType = res; this.resType = res;
return this; return this;

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@ -73,6 +74,7 @@ import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
import org.apache.hadoop.ozone.security.acl.OzoneObj; import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.ozone.security.acl.OzoneObjInfo; import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
import org.apache.hadoop.ozone.security.acl.RequestContext;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -395,6 +397,53 @@ public class TestKeyManagerImpl {
} }
} }
@Test
public void testCheckAccessForFileKey() throws Exception {
OmKeyArgs keyArgs = createBuilder()
.setKeyName("testdir/deep/NOTICE.txt")
.build();
OpenKeySession keySession = keyManager.createFile(keyArgs, false, true);
keyArgs.setLocationInfoList(
keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
keyManager.commitKey(keyArgs, keySession.getId());
OzoneObj fileKey = OzoneObjInfo.Builder.fromKeyArgs(keyArgs)
.setStoreType(OzoneObj.StoreType.OZONE)
.build();
RequestContext context = currentUserReads();
Assert.assertTrue(keyManager.checkAccess(fileKey, context));
OzoneObj parentDirKey = OzoneObjInfo.Builder.fromKeyArgs(keyArgs)
.setStoreType(OzoneObj.StoreType.OZONE)
.setKeyName("testdir")
.build();
Assert.assertTrue(keyManager.checkAccess(parentDirKey, context));
}
@Test
public void testCheckAccessForNonExistentKey() throws Exception {
OmKeyArgs keyArgs = createBuilder()
.setKeyName("testdir/deep/NO_SUCH_FILE.txt")
.build();
OzoneObj nonExistentKey = OzoneObjInfo.Builder.fromKeyArgs(keyArgs)
.setStoreType(OzoneObj.StoreType.OZONE)
.build();
OzoneTestUtils.expectOmException(OMException.ResultCodes.KEY_NOT_FOUND,
() -> keyManager.checkAccess(nonExistentKey, currentUserReads()));
}
@Test
public void testCheckAccessForDirectoryKey() throws Exception {
OmKeyArgs keyArgs = createBuilder()
.setKeyName("some/dir")
.build();
keyManager.createDirectory(keyArgs);
OzoneObj dirKey = OzoneObjInfo.Builder.fromKeyArgs(keyArgs)
.setStoreType(OzoneObj.StoreType.OZONE)
.build();
Assert.assertTrue(keyManager.checkAccess(dirKey, currentUserReads()));
}
@Test @Test
public void testPrefixAclOps() throws IOException { public void testPrefixAclOps() throws IOException {
@ -914,4 +963,12 @@ public class TestKeyManagerImpl {
ALL, ALL)) ALL, ALL))
.setVolumeName(VOLUME_NAME); .setVolumeName(VOLUME_NAME);
} }
private RequestContext currentUserReads() throws IOException {
return RequestContext.newBuilder()
.setClientUgi(UserGroupInformation.getCurrentUser())
.setAclRights(ACLType.READ_ACL)
.setAclType(ACLIdentityType.USER)
.build();
}
} }

View File

@ -1638,24 +1638,38 @@ public class KeyManagerImpl implements KeyManager {
String volume = ozObject.getVolumeName(); String volume = ozObject.getVolumeName();
String bucket = ozObject.getBucketName(); String bucket = ozObject.getBucketName();
String keyName = ozObject.getKeyName(); String keyName = ozObject.getKeyName();
String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
OmKeyArgs args = new OmKeyArgs.Builder()
.setVolumeName(volume)
.setBucketName(bucket)
.setKeyName(keyName)
.build();
metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket); metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
try { try {
validateBucket(volume, bucket); validateBucket(volume, bucket);
String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName); OmKeyInfo keyInfo = null;
OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey); try {
if (keyInfo == null) { OzoneFileStatus fileStatus = getFileStatus(args);
objectKey = OzoneFSUtils.addTrailingSlashIfNeeded(objectKey); keyInfo = fileStatus.getKeyInfo();
keyInfo = metadataManager.getKeyTable().get(objectKey); if (keyInfo == null) {
// the key does not exist, but it is a parent "dir" of some key
if(keyInfo == null) { // let access be determined based on volume/bucket/prefix ACL
LOG.debug("key:{} is non-existent parent, permit access to user:{}",
keyName, context.getClientUgi());
return true;
}
} catch (OMException e) {
if (e.getResult() == FILE_NOT_FOUND) {
keyInfo = metadataManager.getOpenKeyTable().get(objectKey); keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
if (keyInfo == null) {
throw new OMException("Key not found, checkAccess failed. Key:" +
objectKey, KEY_NOT_FOUND);
}
} }
} }
if (keyInfo == null) {
throw new OMException("Key not found, checkAccess failed. Key:" +
objectKey, KEY_NOT_FOUND);
}
boolean hasAccess = OzoneUtils.checkAclRight(keyInfo.getAcls(), context); boolean hasAccess = OzoneUtils.checkAclRight(keyInfo.getAcls(), context);
LOG.debug("user:{} has access rights for key:{} :{} ", LOG.debug("user:{} has access rights for key:{} :{} ",
context.getClientUgi(), ozObject.getKeyName(), hasAccess); context.getClientUgi(), ozObject.getKeyName(), hasAccess);