HDFS-11655. Ozone: CLI: Guarantees user runs SCM commands has appropriate permission. Contributed by Weiwei Yang.

This commit is contained in:
Anu Engineer 2017-05-30 11:08:51 -07:00 committed by Owen O'Malley
parent b71efcf1b0
commit 59d273b175
3 changed files with 144 additions and 3 deletions

View File

@ -64,6 +64,16 @@ public final class OzoneConfigKeys {
public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
/**
* Ozone administrator users delimited by comma.
* If not set, only the user who launches an ozone service will be the
* admin user. This property must be set if ozone services are started by
* different users. Otherwise the RPC layer will reject calls from
* other servers which are started by users not in the list.
* */
public static final String OZONE_ADMINISTRATORS =
"ozone.administrators";
public static final String OZONE_CONTAINER_TASK_WAIT =
"ozone.container.task.wait.seconds";
public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5;

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.ozone.protocolPB
@ -85,6 +86,7 @@ import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -98,6 +100,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.UUID;
import static org.apache.hadoop.ozone.protocol.proto
@ -159,6 +162,10 @@ public class StorageContainerManager
/** SCM mxbean. */
private ObjectName scmInfoBeanName;
/** SCM super user. */
private final String scmUsername;
private final Collection<String> scmAdminUsernames;
/**
* Creates a new StorageContainerManager. Configuration will be updated with
* information on the actual listening addresses used for RPC servers.
@ -179,6 +186,13 @@ public class StorageContainerManager
scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
scmContainerManager, cacheSize);
scmAdminUsernames = conf.getTrimmedStringCollection(
OzoneConfigKeys.OZONE_ADMINISTRATORS);
scmUsername = UserGroupInformation.getCurrentUser().getUserName();
if (!scmAdminUsernames.contains(scmUsername)) {
scmAdminUsernames.add(scmUsername);
}
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
@ -401,6 +415,7 @@ public class StorageContainerManager
*/
@Override
public Pipeline allocateContainer(String containerName) throws IOException {
checkAdminAccess();
return scmContainerManager.allocateContainer(containerName,
ScmClient.ReplicationFactor.ONE);
}
@ -410,6 +425,7 @@ public class StorageContainerManager
*/
@Override
public Pipeline getContainer(String containerName) throws IOException {
checkAdminAccess();
return scmContainerManager.getContainer(containerName);
}
@ -418,6 +434,7 @@ public class StorageContainerManager
*/
@Override
public void deleteContainer(String containerName) throws IOException {
checkAdminAccess();
scmContainerManager.deleteContainer(containerName);
}
@ -433,6 +450,7 @@ public class StorageContainerManager
@Override
public Pipeline allocateContainer(String containerName,
ScmClient.ReplicationFactor replicationFactor) throws IOException {
checkAdminAccess();
return scmContainerManager.allocateContainer(containerName,
replicationFactor);
}
@ -671,4 +689,21 @@ public class StorageContainerManager
}
return results;
}
@VisibleForTesting
public String getPpcRemoteUsername() {
UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
return user == null ? null : user.getUserName();
}
private void checkAdminAccess() throws IOException {
String remoteUser = getPpcRemoteUsername();
if(remoteUser != null) {
if (!scmAdminUsernames.contains(remoteUser)) {
throw new IOException(
"Access denied for user " + remoteUser
+ ". Superuser privilege is required.");
}
}
}
}

View File

@ -24,17 +24,22 @@ import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Set;
import org.junit.After;
import org.junit.BeforeClass;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.Rule;
import org.junit.Assert;
// TODO : We need this when we enable these tests back.
//import org.junit.Test;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.After;
import org.junit.rules.ExpectedException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
/**
* Test class that exercises the StorageContainerManager.
@ -46,6 +51,9 @@ public class TestStorageContainerManager {
@Rule
public Timeout testTimeout = new Timeout(300000);
@Rule
public ExpectedException thrown = ExpectedException.none();
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerLocationProtocolClientSideTranslatorPB
@ -64,6 +72,94 @@ public class TestStorageContainerManager {
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
}
@Test
public void testRpcPermission() throws IOException {
// Test with default configuration
OzoneConfiguration defaultConf = new OzoneConfiguration();
testRpcPermissionWithConf(defaultConf, "unknownUser", true);
// Test with ozone.administrators defined in configuration
OzoneConfiguration ozoneConf = new OzoneConfiguration();
ozoneConf.setStrings(OzoneConfigKeys.OZONE_ADMINISTRATORS,
"adminUser1, adminUser2");
// Non-admin user will get permission denied.
testRpcPermissionWithConf(ozoneConf, "unknownUser", true);
// Admin user will pass the permission check.
testRpcPermissionWithConf(ozoneConf, "adminUser2", false);
}
private void testRpcPermissionWithConf(
OzoneConfiguration ozoneConf, String fakeRemoteUsername,
boolean expectPermissionDenied) throws IOException {
cluster = new MiniOzoneCluster.Builder(ozoneConf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
String fakeUser = fakeRemoteUsername;
StorageContainerManager mockScm = Mockito.spy(
cluster.getStorageContainerManager());
Mockito.when(mockScm.getPpcRemoteUsername())
.thenReturn(fakeUser);
try {
mockScm.deleteContainer("container1");
fail("Operation should fail, expecting an IOException here.");
} catch (Exception e) {
if (expectPermissionDenied) {
verifyPermissionDeniedException(e, fakeUser);
} else {
// If passes permission check, it should fail with
// container not exist exception.
Assert.assertTrue(e.getMessage()
.contains("container doesn't exist"));
}
}
try {
Pipeline pipeLine2 = mockScm.allocateContainer("container2");
if(expectPermissionDenied) {
fail("Operation should fail, expecting an IOException here.");
} else {
Assert.assertEquals("container2", pipeLine2.getContainerName());
}
} catch (Exception e) {
verifyPermissionDeniedException(e, fakeUser);
}
try {
Pipeline pipeLine3 = mockScm.allocateContainer("container3",
ScmClient.ReplicationFactor.ONE);
if(expectPermissionDenied) {
fail("Operation should fail, expecting an IOException here.");
} else {
Assert.assertEquals("container3", pipeLine3.getContainerName());
Assert.assertEquals(1, pipeLine3.getMachines().size());
}
} catch (Exception e) {
verifyPermissionDeniedException(e, fakeUser);
}
try {
mockScm.getContainer("container4");
fail("Operation should fail, expecting an IOException here.");
} catch (Exception e) {
if (expectPermissionDenied) {
verifyPermissionDeniedException(e, fakeUser);
} else {
// If passes permission check, it should fail with
// key not exist exception.
Assert.assertTrue(e.getMessage()
.contains("Specified key does not exist"));
}
}
}
private void verifyPermissionDeniedException(Exception e, String userName) {
String expectedErrorMessage = "Access denied for user "
+ userName + ". " + "Superuser privilege is required.";
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals(expectedErrorMessage, e.getMessage());
}
// TODO : Disabling this test after verifying that failure is due
// Not Implemented exception. Will turn on this test in next patch
//@Test