HDFS-11771. Ozone: KSM: Add checkVolumeAccess. Contributed by Mukul Kumar Singh.

This commit is contained in:
Anu Engineer 2017-06-02 17:04:10 -07:00 committed by Owen O'Malley
parent 1b2d0b4fec
commit 42eaecae55
21 changed files with 419 additions and 60 deletions

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ksm.helpers;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;
/**
* This helper class keeps a map of all user and their permissions.
*/
public class KsmOzoneAclMap {
// per Acl Type user:rights map
private ArrayList<Map<String, OzoneAclRights>> aclMaps;
KsmOzoneAclMap() {
aclMaps = new ArrayList<>();
for (OzoneAclType aclType : OzoneAclType.values()) {
aclMaps.add(aclType.ordinal(), new HashMap<>());
}
}
private Map<String, OzoneAclRights> getMap(OzoneAclType type) {
return aclMaps.get(type.ordinal());
}
// For a given acl type and user, get the stored acl
private OzoneAclRights getAcl(OzoneAclType type, String user) {
return getMap(type).get(user);
}
// Add a new acl to the map
public void addAcl(OzoneAclInfo acl) {
getMap(acl.getType()).put(acl.getName(), acl.getRights());
}
// for a given acl, check if the user has access rights
public boolean hasAccess(OzoneAclInfo acl) {
OzoneAclRights storedRights = getAcl(acl.getType(), acl.getName());
if (storedRights != null) {
switch (acl.getRights()) {
case READ:
return (storedRights == OzoneAclRights.READ)
|| (storedRights == OzoneAclRights.READ_WRITE);
case WRITE:
return (storedRights == OzoneAclRights.WRITE)
|| (storedRights == OzoneAclRights.READ_WRITE);
case READ_WRITE:
return (storedRights == OzoneAclRights.READ_WRITE);
default:
return false;
}
} else {
return false;
}
}
// Convert this map to OzoneAclInfo Protobuf List
public List<OzoneAclInfo> ozoneAclGetProtobuf() {
List<OzoneAclInfo> aclList = new LinkedList<>();
for (OzoneAclType type: OzoneAclType.values()) {
for (Map.Entry<String, OzoneAclRights> entry :
aclMaps.get(type.ordinal()).entrySet()) {
OzoneAclInfo aclInfo = OzoneAclInfo.newBuilder()
.setName(entry.getKey())
.setType(type)
.setRights(entry.getValue())
.build();
aclList.add(aclInfo);
}
}
return aclList;
}
// Create map from list of OzoneAclInfos
public static KsmOzoneAclMap ozoneAclGetFromProtobuf(
List<OzoneAclInfo> aclList) {
KsmOzoneAclMap aclMap = new KsmOzoneAclMap();
for (OzoneAclInfo acl : aclList) {
aclMap.addAcl(acl);
}
return aclMap;
}
}

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.ksm.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -38,6 +41,7 @@ public final class KsmVolumeArgs {
private final String volume;
private final long quotaInBytes;
private final Map<String, String> keyValueMap;
private final KsmOzoneAclMap aclMap;
/**
* Private constructor, constructed via builder.
@ -45,15 +49,18 @@ public final class KsmVolumeArgs {
* @param ownerName - Volume owner's name
* @param volume - volume name
* @param quotaInBytes - Volume Quota in bytes.
* @param keyValueMap - keyValue map.
* @param keyValueMap - keyValue map.
* @param aclMap - User to access rights map.
*/
private KsmVolumeArgs(String adminName, String ownerName, String volume,
long quotaInBytes, Map<String, String> keyValueMap) {
long quotaInBytes, Map<String, String> keyValueMap,
KsmOzoneAclMap aclMap) {
this.adminName = adminName;
this.ownerName = ownerName;
this.volume = volume;
this.quotaInBytes = quotaInBytes;
this.keyValueMap = keyValueMap;
this.aclMap = aclMap;
}
/**
@ -92,6 +99,9 @@ public final class KsmVolumeArgs {
return keyValueMap;
}
public KsmOzoneAclMap getAclMap() {
return aclMap;
}
/**
* Returns new builder class that builds a KsmVolumeArgs.
*
@ -110,12 +120,14 @@ public final class KsmVolumeArgs {
private String volume;
private long quotaInBytes;
private Map<String, String> keyValueMap;
private KsmOzoneAclMap aclMap;
/**
* Constructs a builder.
*/
Builder() {
keyValueMap = new HashMap<>();
aclMap = new KsmOzoneAclMap();
}
public Builder setAdminName(String adminName) {
@ -143,6 +155,11 @@ public final class KsmVolumeArgs {
return this;
}
public Builder addOzoneAcls(OzoneAclInfo acl) throws IOException {
aclMap.addAcl(acl);
return this;
}
/**
* Constructs a CreateVolumeArgument.
* @return CreateVolumeArgs.
@ -152,31 +169,36 @@ public final class KsmVolumeArgs {
Preconditions.checkNotNull(ownerName);
Preconditions.checkNotNull(volume);
return new KsmVolumeArgs(adminName, ownerName, volume, quotaInBytes,
keyValueMap);
keyValueMap, aclMap);
}
}
public VolumeInfo getProtobuf() {
List<KeyValue> list = new LinkedList<>();
List<KeyValue> metadataList = new LinkedList<>();
for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
list.add(KeyValue.newBuilder().setKey(entry.getKey()).
metadataList.add(KeyValue.newBuilder().setKey(entry.getKey()).
setValue(entry.getValue()).build());
}
List<OzoneAclInfo> aclList = aclMap.ozoneAclGetProtobuf();
return VolumeInfo.newBuilder()
.setAdminName(adminName)
.setOwnerName(ownerName)
.setVolume(volume)
.setQuotaInBytes(quotaInBytes)
.addAllMetadata(list)
.addAllMetadata(metadataList)
.addAllVolumeAcls(aclList)
.build();
}
public static KsmVolumeArgs getFromProtobuf(VolumeInfo volInfo) {
Map<String, String> kvMap = volInfo.getMetadataList().stream()
.collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue));
KsmOzoneAclMap aclMap =
KsmOzoneAclMap.ozoneAclGetFromProtobuf(volInfo.getVolumeAclsList());
return new KsmVolumeArgs(volInfo.getAdminName(), volInfo.getOwnerName(),
volInfo.getVolume(), volInfo.getQuotaInBytes(),
volInfo.getMetadataList().stream()
.collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue)));
volInfo.getVolume(), volInfo.getQuotaInBytes(), kvMap, aclMap);
}
}

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import java.io.IOException;
import java.util.List;
@ -56,10 +58,13 @@ public interface KeySpaceManagerProtocol {
/**
* Checks if the specified user can access this volume.
* @param volume - volume
* @param userName - user name
* @param userAcl - user acls which needs to be checked for access
* @return true if the user has required access for the volume,
* false otherwise
* @throws IOException
*/
void checkVolumeAccess(String volume, String userName) throws IOException;
boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
throws IOException;
/**
* Gets the volume information.

View File

@ -66,10 +66,16 @@ import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.InfoVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.InfoVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import java.io.Closeable;
import java.io.IOException;
@ -196,13 +202,32 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
* Checks if the specified user can access this volume.
*
* @param volume - volume
* @param userName - user name
* @param userAcl - user acls which needs to be checked for access
* @return true if the user has required access for the volume,
* false otherwise
* @throws IOException
*/
@Override
public void checkVolumeAccess(String volume, String userName) throws
public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) throws
IOException {
CheckVolumeAccessRequest.Builder req =
CheckVolumeAccessRequest.newBuilder();
req.setVolumeName(volume).setUserAcl(userAcl);
final CheckVolumeAccessResponse resp;
try {
resp = rpcProxy.checkVolumeAccess(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() == Status.ACCESS_DENIED) {
return false;
} else if (resp.getStatus() == Status.OK) {
return true;
} else {
throw new
IOException("Check Volume Access failed, error:" + resp.getStatus());
}
}
/**

View File

@ -61,6 +61,7 @@ message VolumeInfo {
required string volume = 3;
optional uint64 quotaInBytes = 4;
repeated KeyValue metadata = 5;
repeated OzoneAclInfo volumeAcls = 6;
}
/**
@ -93,11 +94,11 @@ message SetVolumePropertyResponse {
}
/**
Checks if a specified user has access to the volume.
*/
* Checks if the user has specified permissions for the volume
*/
message CheckVolumeAccessRequest {
required string volumeName = 1;
required string userName = 2;
required OzoneAclInfo userAcl = 2;
}
message CheckVolumeAccessResponse {

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ozone.web.request.OzoneAcl;
/**
* KSM Constants.
*/
@ -46,4 +47,15 @@ public final class KSMConfigKeys {
public static final String OZONE_KSM_USER_MAX_VOLUME =
"ozone.ksm.user.max.volume";
public static final int OZONE_KSM_USER_MAX_VOLUME_DEFAULT = 1024;
// KSM Default user/group permissions
public static final String OZONE_KSM_USER_RIGHTS =
"ozone.ksm.user.rights";
public static final OzoneAcl.OzoneACLRights OZONE_KSM_USER_RIGHTS_DEFAULT =
OzoneAcl.OzoneACLRights.READ_WRITE;
public static final String OZONE_KSM_GROUP_RIGHTS =
"ozone.ksm.group.rights";
public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT =
OzoneAcl.OzoneACLRights.READ_WRITE;
}

View File

@ -31,6 +31,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numVolumeCreates;
private @Metric MutableCounterLong numVolumeModifies;
private @Metric MutableCounterLong numVolumeInfos;
private @Metric MutableCounterLong numVolumeCheckAccesses;
private @Metric MutableCounterLong numBucketCreates;
private @Metric MutableCounterLong numVolumeDeletes;
private @Metric MutableCounterLong numBucketInfos;
@ -44,6 +45,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numVolumeInfoFails;
private @Metric MutableCounterLong numVolumeDeleteFails;
private @Metric MutableCounterLong numBucketCreateFails;
private @Metric MutableCounterLong numVolumeCheckAccessFails;
private @Metric MutableCounterLong numBucketInfoFails;
private @Metric MutableCounterLong numBucketModifyFails;
private @Metric MutableCounterLong numKeyAllocateFails;
@ -75,6 +77,10 @@ public class KSMMetrics {
numVolumeDeletes.incr();
}
public void incNumVolumeCheckAccesses() {
numVolumeCheckAccesses.incr();
}
public void incNumBucketCreates() {
numBucketCreates.incr();
}
@ -103,6 +109,10 @@ public class KSMMetrics {
numVolumeDeleteFails.incr();
}
public void incNumVolumeCheckAccessFails() {
numVolumeCheckAccessFails.incr();
}
public void incNumBucketCreateFails() {
numBucketCreateFails.incr();
}
@ -151,6 +161,11 @@ public class KSMMetrics {
return numVolumeDeletes.value();
}
@VisibleForTesting
public long getNumVolumeCheckAccesses() {
return numVolumeCheckAccesses.value();
}
@VisibleForTesting
public long getNumBucketCreates() {
return numBucketCreates.value();
@ -186,6 +201,11 @@ public class KSMMetrics {
return numVolumeDeleteFails.value();
}
@VisibleForTesting
public long getNumVolumeCheckAccessFails() {
return numVolumeCheckAccessFails.value();
}
@VisibleForTesting
public long getNumBucketCreateFails() {
return numBucketCreateFails.value();

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocolPB
.KeySpaceManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
@ -300,13 +302,21 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
* Checks if the specified user can access this volume.
*
* @param volume - volume
* @param userName - user name
* @param userAcl - user acls which needs to be checked for access
* @return true if the user has required access for the volume,
* false otherwise
* @throws IOException
*/
@Override
public void checkVolumeAccess(String volume, String userName) throws
IOException {
public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
throws IOException {
try {
metrics.incNumVolumeCheckAccesses();
return volumeManager.checkVolumeAccess(volume, userAcl);
} catch (Exception ex) {
metrics.incNumVolumeCheckAccessFails();
throw ex;
}
}
/**

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import java.io.IOException;
@ -64,4 +66,15 @@ public interface VolumeManager {
* @throws IOException
*/
void deleteVolume(String volume) throws IOException;
/**
* Checks if the specified user with a role can access this volume.
*
* @param volume - volume
* @param userAcl - user acl which needs to be checked for access
* @return true if the user has access for the volume, false otherwise
* @throws IOException
*/
boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
throws IOException;
}

View File

@ -20,6 +20,8 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.protocol.proto
@ -179,7 +181,7 @@ public class VolumeManagerImpl implements VolumeManager {
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
putBatch, deleteBatch);
@ -224,7 +226,7 @@ public class VolumeManagerImpl implements VolumeManager {
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
KsmVolumeArgs newVolumeArgs =
KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
@ -262,7 +264,7 @@ public class VolumeManagerImpl implements VolumeManager {
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
return volumeArgs;
} catch (IOException ex) {
LOG.error("Info volume failed for volume:{}", volume, ex);
@ -296,7 +298,7 @@ public class VolumeManagerImpl implements VolumeManager {
}
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
// delete the volume from the owner list
// as well as delete the volume entry
delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(),
@ -310,4 +312,37 @@ public class VolumeManagerImpl implements VolumeManager {
metadataManager.writeLock().unlock();
}
}
/**
* Checks if the specified user with a role can access this volume.
*
* @param volume - volume
* @param userAcl - user acl which needs to be checked for access
* @return true if the user has access for the volume, false otherwise
* @throws IOException
*/
public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
throws IOException {
Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(userAcl);
metadataManager.readLock().lock();
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.get(dbVolumeKey);
if (volInfo == null) {
throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
return volumeArgs.getAclMap().hasAccess(userAcl);
} catch (IOException ex) {
LOG.error("Check volume access failed for volume:{} user:{} rights:{}",
volume, userAcl.getName(), userAcl.getRights(), ex);
throw ex;
} finally {
metadataManager.readLock().unlock();
}
}
}

View File

@ -165,7 +165,21 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
public CheckVolumeAccessResponse checkVolumeAccess(
RpcController controller, CheckVolumeAccessRequest request)
throws ServiceException {
return null;
CheckVolumeAccessResponse.Builder resp =
CheckVolumeAccessResponse.newBuilder();
resp.setStatus(Status.OK);
try {
boolean access = impl.checkVolumeAccess(request.getVolumeName(),
request.getUserAcl());
// if no access, set the response status as access denied
if (!access) {
resp.setStatus(Status.ACCESS_DENIED);
}
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
@Override

View File

@ -183,9 +183,11 @@ public final class ErrorTable {
OzoneException err =
new OzoneException(e.getHttpCode(), e.getShortMessage(),
e.getMessage());
err.setRequestId(args.getRequestID());
err.setResource(args.getResourceName());
err.setHostID(args.getHostName());
if (args != null) {
err.setRequestId(args.getRequestID());
err.setResource(args.getResourceName());
err.setHostID(args.getHostName());
}
return err;
}

View File

@ -35,6 +35,7 @@ public class UserArgs {
private final UriInfo uri;
private final Request request;
private final HttpHeaders headers;
private String[] groups;
/**
@ -111,6 +112,24 @@ public class UserArgs {
this.userName = userName;
}
/**
* Returns list of groups.
*
* @return String[]
*/
public String[] getGroups() {
return this.groups;
}
/**
* Sets the group list.
*
* @param groups list of groups
*/
public void setGroups(String[] groups) {
this.groups = groups;
}
/**
* Returns the resource Name.
*

View File

@ -64,11 +64,13 @@ public class VolumeArgs extends UserArgs {
* @param request - Http Request
* @param info - URI info
* @param headers - http headers
* @param groups - list of groups allowed to access the volume
*/
public VolumeArgs(String userName, String volumeName, String requestID,
String hostName, Request request, UriInfo info,
HttpHeaders headers) {
HttpHeaders headers, String[] groups) {
super(userName, requestID, hostName, request, info, headers);
super.setGroups(groups);
this.volumeName = volumeName;
}
@ -81,7 +83,7 @@ public class VolumeArgs extends UserArgs {
public VolumeArgs(String volumeName, UserArgs userArgs) {
this(userArgs.getUserName(), volumeName, userArgs.getRequestID(),
userArgs.getHostName(), userArgs.getRequest(), userArgs.getUri(),
userArgs.getHeaders());
userArgs.getHeaders(), userArgs.getGroups());
}
/**

View File

@ -92,6 +92,7 @@ public class VolumeHandler implements Volume {
}
args.setUserName(volumeOwner);
args.setGroups(auth.getGroups(args));
if (!quota.equals(Header.OZONE_QUOTA_UNDEFINED)) {
setQuotaArgs(args, quota);
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.request.OzoneAcl;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys;
@ -80,18 +81,20 @@ public interface StorageHandler {
throws IOException, OzoneException;
/**
* Checks if a Volume exists and the user specified has access to the
* Volume.
* Checks if a Volume exists and the user with a role specified has access
* to the Volume.
*
* @param args - Volume Args
* @param volume - Volume Name whose access permissions needs to be checked
* @param acl - requested acls which needs to be checked for access
*
* @return - Boolean - True if the user can modify the volume.
* @return - Boolean - True if the user with a role can access the volume.
* This is possible for owners of the volume and admin users
*
* @throws IOException
* @throws OzoneException
*/
boolean checkVolumeAccess(VolumeArgs args) throws IOException, OzoneException;
boolean checkVolumeAccess(String volume, OzoneAcl acl)
throws IOException, OzoneException;
/**

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.request.OzoneAcl;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.ListBuckets;
@ -106,17 +107,18 @@ public class LocalStorageHandler implements StorageHandler {
/**
* Checks if a Volume exists and the user specified has access to the volume.
*
* @param args - volumeArgs
* @param volume - Volume Name
* @param acl - Ozone acl which needs to be compared for access
* @return - Boolean - True if the user can modify the volume. This is
* possible for owners of the volume and admin users
* @throws IOException
*/
@Override
public boolean checkVolumeAccess(VolumeArgs args)
public boolean checkVolumeAccess(String volume, OzoneAcl acl)
throws IOException, OzoneException {
OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);
return oz.checkVolumeAccess(args);
return oz.checkVolumeAccess(volume, acl);
}
/**

View File

@ -324,23 +324,25 @@ public final class OzoneMetadataManager {
/**
* Checks if you are the owner of a specific volume.
*
* @param args - VolumeArgs
* @param volume - Volume Name whose access permissions needs to be checked
* @param acl - requested acls which needs to be checked for access
* @return - True if you are the owner, false otherwise
* @throws OzoneException
*/
public boolean checkVolumeAccess(VolumeArgs args) throws OzoneException {
public boolean checkVolumeAccess(String volume, OzoneAcl acl)
throws OzoneException {
lock.readLock().lock();
try {
byte[] volumeInfo =
metadataDB.get(args.getVolumeName().getBytes(encoding));
metadataDB.get(volume.getBytes(encoding));
if (volumeInfo == null) {
throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, args);
throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, null);
}
VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding));
return info.getOwner().getName().equals(args.getUserName());
return info.getOwner().getName().equals(acl.getName());
} catch (IOException | DBException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, null, ex);
} finally {
lock.readLock().unlock();
}

View File

@ -53,6 +53,13 @@ public class OzoneAcl {
this.name = name;
this.rights = rights;
this.type = type;
if (type == OzoneACLType.WORLD && name.length() != 0) {
throw new IllegalArgumentException("Unexpected name part in world type");
}
if (((type == OzoneACLType.USER) || (type == OzoneACLType.GROUP))
&& (name.length() == 0)) {
throw new IllegalArgumentException("User or group name is required");
}
}
/**
@ -74,14 +81,6 @@ public class OzoneAcl {
OzoneACLType aclType = OzoneACLType.valueOf(parts[0].toUpperCase());
OzoneACLRights rights = OzoneACLRights.getACLRight(parts[2].toLowerCase());
if (((aclType == OzoneACLType.USER) || (aclType == OzoneACLType.GROUP))
&& (parts[1].length() == 0)) {
throw new IllegalArgumentException("User or group name is required");
}
if ((aclType == OzoneACLType.WORLD) && (parts[1].length() != 0)) {
throw new IllegalArgumentException("Unexpected name part in world type");
}
// TODO : Support sanitation of these user names by calling into
// userAuth Interface.
return new OzoneAcl(aclType, parts[1], rights);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.web.request.OzoneAcl;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -92,7 +93,8 @@ public final class DistributedStorageHandler implements StorageHandler {
private final KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient;
private final XceiverClientManager xceiverClientManager;
private final OzoneAcl.OzoneACLRights userRights;
private final OzoneAcl.OzoneACLRights groupRights;
private int chunkSize;
/**
@ -113,6 +115,10 @@ public final class DistributedStorageHandler implements StorageHandler {
chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
LOG.warn("The chunk size ({}) is not allowed to be more than"
+ " the maximum size ({}),"
@ -126,13 +132,23 @@ public final class DistributedStorageHandler implements StorageHandler {
public void createVolume(VolumeArgs args) throws IOException, OzoneException {
long quota = args.getQuota() == null ?
OzoneConsts.MAX_QUOTA_IN_BYTES : args.getQuota().sizeInBytes();
KsmVolumeArgs volumeArgs = KsmVolumeArgs.newBuilder()
.setAdminName(args.getAdminName())
OzoneAcl userAcl =
new OzoneAcl(OzoneAcl.OzoneACLType.USER,
args.getUserName(), userRights);
KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder();
builder.setAdminName(args.getAdminName())
.setOwnerName(args.getUserName())
.setVolume(args.getVolumeName())
.setQuotaInBytes(quota)
.build();
keySpaceManagerClient.createVolume(volumeArgs);
.addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl));
if (args.getGroups() != null) {
for (String group : args.getGroups()) {
OzoneAcl groupAcl =
new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights);
builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(groupAcl));
}
}
keySpaceManagerClient.createVolume(builder.build());
}
@Override
@ -150,9 +166,10 @@ public final class DistributedStorageHandler implements StorageHandler {
}
@Override
public boolean checkVolumeAccess(VolumeArgs args)
public boolean checkVolumeAccess(String volume, OzoneAcl acl)
throws IOException, OzoneException {
throw new UnsupportedOperationException("checkVolumeAccessnot implemented");
return keySpaceManagerClient
.checkVolumeAccess(volume, KSMPBHelper.convertOzoneAcl(acl));
}
@Override

View File

@ -258,6 +258,51 @@ public class TestKeySpaceManager {
Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
}
// Create a volume and test Volume access for a different user
@Test(timeout = 60000)
public void testAccessVolume() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String[] groupName =
{"group" + RandomStringUtils.randomNumeric(5)};
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
createVolumeArgs.setGroups(groupName);
storageHandler.createVolume(createVolumeArgs);
OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, userName,
OzoneAcl.OzoneACLRights.READ_WRITE);
Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, userAcl));
OzoneAcl group = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, groupName[0],
OzoneAcl.OzoneACLRights.READ);
Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, group));
// Create a different user and access should fail
String falseUserName = "user" + RandomStringUtils.randomNumeric(5);
OzoneAcl falseUserAcl =
new OzoneAcl(OzoneAcl.OzoneACLType.USER, falseUserName,
OzoneAcl.OzoneACLRights.READ_WRITE);
Assert.assertFalse(storageHandler
.checkVolumeAccess(volumeName, falseUserAcl));
// Checking access with user name and Group Type should fail
OzoneAcl falseGroupAcl = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, userName,
OzoneAcl.OzoneACLRights.READ_WRITE);
Assert.assertFalse(storageHandler
.checkVolumeAccess(volumeName, falseGroupAcl));
// Access for acl type world should also fail
OzoneAcl worldAcl =
new OzoneAcl(OzoneAcl.OzoneACLType.WORLD, "",
OzoneAcl.OzoneACLRights.READ);
Assert.assertFalse(storageHandler.checkVolumeAccess(volumeName, worldAcl));
Assert.assertEquals(0, ksmMetrics.getNumVolumeCheckAccessFails());
Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
}
@Test(timeout = 60000)
public void testCreateBucket() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);