YARN-617. Made ContainerTokens to be used for validation at NodeManager also in unsecure mode to prevent AMs from faking resource requirements in unsecure mode. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1483667 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9fdb117476
commit
ca80246731
|
@ -350,15 +350,14 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
|
||||
final InetSocketAddress cmAddr =
|
||||
NetUtils.createSocketAddr(containerManagerBindAddr);
|
||||
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
// the user in createRemoteUser in this context has to be ContainerID
|
||||
UserGroupInformation user =
|
||||
UserGroupInformation.createRemoteUser(containerID.toString());
|
||||
|
||||
Token<ContainerTokenIdentifier> token =
|
||||
ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
|
||||
// the user in createRemoteUser in this context has to be ContainerID
|
||||
user = UserGroupInformation.createRemoteUser(containerID.toString());
|
||||
user.addToken(token);
|
||||
}
|
||||
|
||||
ContainerManager proxy = user
|
||||
.doAs(new PrivilegedAction<ContainerManager>() {
|
||||
|
|
|
@ -256,6 +256,10 @@ Release 2.0.5-beta - UNRELEASED
|
|||
asks an RM to shutdown/resync etc so that NMs can log this message locally
|
||||
for better debuggability. (Mayank Bansal via vinodkv)
|
||||
|
||||
YARN-617. Made ContainerTokens to be used for validation at NodeManager
|
||||
also in unsecure mode to prevent AMs from faking resource requirements in
|
||||
unsecure mode. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -135,13 +135,8 @@ public class NodeManager extends CompositeService
|
|||
|
||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
||||
|
||||
// Create the secretManager if need be.
|
||||
NMContainerTokenSecretManager containerTokenSecretManager = null;
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
LOG.info("Security is enabled on NodeManager. "
|
||||
+ "Creating ContainerTokenSecretManager");
|
||||
containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
|
||||
}
|
||||
NMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new NMContainerTokenSecretManager(conf);
|
||||
|
||||
this.context = createNMContext(containerTokenSecretManager);
|
||||
|
||||
|
|
|
@ -195,15 +195,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isSecurityEnabled() {
|
||||
return UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
|
||||
@Private
|
||||
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
|
||||
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
|
||||
&& isSecurityEnabled();
|
||||
&& UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
|
||||
protected ResourceTracker getRMClient() {
|
||||
|
@ -303,17 +299,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
+ message);
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
MasterKey masterKey = regNMResponse.getMasterKey();
|
||||
// do this now so that its set before we start heartbeating to RM
|
||||
LOG.info("Security enabled - updating secret keys now");
|
||||
// It is expected that status updater is started by this point and
|
||||
// RM gives the shared secret in registration during
|
||||
// StatusUpdater#start().
|
||||
if (masterKey != null) {
|
||||
this.context.getContainerTokenSecretManager().setMasterKey(masterKey);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Registered with ResourceManager as " + this.nodeId
|
||||
+ " with total resource of " + this.totalResource);
|
||||
|
@ -443,10 +436,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
NodeHeartbeatRequest request = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatRequest.class);
|
||||
request.setNodeStatus(nodeStatus);
|
||||
if (isSecurityEnabled()) {
|
||||
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
|
||||
.getContainerTokenSecretManager().getCurrentKey());
|
||||
}
|
||||
while (!isStopped) {
|
||||
try {
|
||||
rmRetryCount++;
|
||||
|
@ -475,14 +466,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
//get next heartbeat interval from response
|
||||
nextHeartBeatInterval = response.getNextHeartBeatInterval();
|
||||
// See if the master-key has rolled over
|
||||
if (isSecurityEnabled()) {
|
||||
MasterKey updatedMasterKey = response.getMasterKey();
|
||||
if (updatedMasterKey != null) {
|
||||
// Will be non-null only on roll-over on RM side
|
||||
context.getContainerTokenSecretManager().setMasterKey(
|
||||
updatedMasterKey);
|
||||
}
|
||||
}
|
||||
|
||||
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
||||
LOG
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
|||
|
||||
import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -29,9 +31,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -40,7 +44,6 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
||||
|
@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
|
@ -102,6 +106,9 @@ import org.apache.hadoop.yarn.service.CompositeService;
|
|||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.RpcUtil;
|
||||
|
||||
public class ContainerManagerImpl extends CompositeService implements
|
||||
ServiceStateChangeListener, ContainerManager,
|
||||
EventHandler<ContainerManagerEvent> {
|
||||
|
@ -300,6 +307,33 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
return resultId;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected ContainerTokenIdentifier getContainerTokenIdentifier(
|
||||
UserGroupInformation remoteUgi,
|
||||
org.apache.hadoop.yarn.api.records.Container container)
|
||||
throws YarnRemoteException {
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
|
||||
+ remoteUgi.getTokenIdentifiers().size());
|
||||
}
|
||||
// Get the tokenId from the remote user ugi
|
||||
return selectContainerTokenIdentifier(remoteUgi);
|
||||
} else {
|
||||
ContainerToken containerToken = container.getContainerToken();
|
||||
Token<ContainerTokenIdentifier> token =
|
||||
new Token<ContainerTokenIdentifier>(containerToken.getIdentifier()
|
||||
.array(), containerToken.getPassword().array(), new Text(
|
||||
containerToken.getKind()), new Text(containerToken.getService()));
|
||||
try {
|
||||
return token.decodeIdentifier();
|
||||
} catch (IOException e) {
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Authorize the request.
|
||||
*
|
||||
|
@ -311,16 +345,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
* ugi corresponding to the remote end making the api-call
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private void authorizeRequest(String containerIDStr,
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected void authorizeRequest(String containerIDStr,
|
||||
ContainerLaunchContext launchContext,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
UserGroupInformation remoteUgi)
|
||||
UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
|
||||
throws YarnRemoteException {
|
||||
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean unauthorized = false;
|
||||
StringBuilder messageBuilder =
|
||||
new StringBuilder("Unauthorized request to start container. ");
|
||||
|
@ -332,37 +364,29 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
} else if (launchContext != null) {
|
||||
// Verify other things also for startContainer() request.
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
|
||||
+ remoteUgi.getTokenIdentifiers().size());
|
||||
}
|
||||
|
||||
// Get the tokenId from the remote user ugi
|
||||
ContainerTokenIdentifier tokenId =
|
||||
selectContainerTokenIdentifier(remoteUgi);
|
||||
|
||||
if (tokenId == null) {
|
||||
unauthorized = true;
|
||||
messageBuilder
|
||||
.append("\nContainerTokenIdentifier cannot be null! Null found for "
|
||||
+ containerIDStr);
|
||||
.append("\nNo ContainerToken found for " + containerIDStr);
|
||||
} else {
|
||||
|
||||
// Is the container coming in with correct user-name?
|
||||
if (!tokenId.getApplicationSubmitter().equals(launchContext.getUser())) {
|
||||
if (!launchContext.getUser().equals(tokenId.getApplicationSubmitter())) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\n Expected user-name "
|
||||
+ tokenId.getApplicationSubmitter() + " but found "
|
||||
+ launchContext.getUser());
|
||||
}
|
||||
|
||||
|
||||
// Is the container being relaunched? Or RPC layer let startCall with
|
||||
// tokens generated off old-secret through?
|
||||
if (!this.context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(tokenId)) {
|
||||
.isValidStartContainerRequest(tokenId.getContainerID())) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\n Attempt to relaunch the same " +
|
||||
"container with id " + containerIDStr + ".");
|
||||
messageBuilder.append("\n Attempt to relaunch the same "
|
||||
+ "container with id " + containerIDStr + ".");
|
||||
}
|
||||
|
||||
// Ensure the token is not expired.
|
||||
|
@ -375,7 +399,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
|
||||
Resource resource = tokenId.getResource();
|
||||
if (!resource.equals(container.getResource())) {
|
||||
if (resource == null || !resource.equals(container.getResource())) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\nExpected resource " + resource
|
||||
+ " but found " + container.getResource());
|
||||
|
@ -411,7 +435,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
String containerIDStr = containerID.toString();
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi);
|
||||
ContainerTokenIdentifier tokenId =
|
||||
getContainerTokenIdentifier(remoteUgi, lauchContainer);
|
||||
authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi,
|
||||
tokenId);
|
||||
|
||||
// Is the container coming from unknown RM
|
||||
if (lauchContainer.getRMIdentifer() != nodeStatusUpdater
|
||||
|
@ -476,13 +503,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
// TODO: Validate the request
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationContainerInitEvent(container));
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
ContainerTokenIdentifier tokenId =
|
||||
selectContainerTokenIdentifier(remoteUgi);
|
||||
|
||||
this.context.getContainerTokenSecretManager().startContainerSuccessful(
|
||||
tokenId);
|
||||
}
|
||||
|
||||
NMAuditLogger.logSuccess(launchContext.getUser(),
|
||||
AuditConstants.START_CONTAINER, "ContainerManageImpl",
|
||||
applicationID, containerID);
|
||||
|
@ -511,12 +534,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
// TODO: Only the container's owner can kill containers today.
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi);
|
||||
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
StopContainerResponse response =
|
||||
recordFactory.newRecordInstance(StopContainerResponse.class);
|
||||
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
if (container == null) {
|
||||
LOG.warn("Trying to stop unknown container " + containerID);
|
||||
NMAuditLogger.logFailure("UnknownUser",
|
||||
|
@ -526,6 +547,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
containerID);
|
||||
return response; // Return immediately.
|
||||
}
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi,
|
||||
getContainerTokenIdentifier(remoteUgi, container.getContainer()));
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerID,
|
||||
|
@ -554,22 +577,22 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
// TODO: Only the container's owner can get containers' status today.
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi);
|
||||
|
||||
LOG.info("Getting container-status for " + containerIDStr);
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
if (container != null) {
|
||||
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
|
||||
LOG.info("Returning " + containerStatus);
|
||||
GetContainerStatusResponse response = recordFactory
|
||||
.newRecordInstance(GetContainerStatusResponse.class);
|
||||
response.setStatus(containerStatus);
|
||||
return response;
|
||||
}
|
||||
|
||||
if (container == null) {
|
||||
throw RPCUtil.getRemoteException("Container " + containerIDStr
|
||||
+ " is not handled by this NodeManager");
|
||||
}
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi,
|
||||
getContainerTokenIdentifier(remoteUgi, container.getContainer()));
|
||||
|
||||
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
|
||||
LOG.info("Returning " + containerStatus);
|
||||
GetContainerStatusResponse response =
|
||||
recordFactory.newRecordInstance(GetContainerStatusResponse.class);
|
||||
response.setStatus(containerStatus);
|
||||
return response;
|
||||
}
|
||||
|
||||
class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
|
||||
@Override
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -394,9 +393,8 @@ public class ApplicationImpl implements Application {
|
|||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
|
||||
// Inform the ContainerTokenSecretManager
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
app.context.getContainerTokenSecretManager().appFinished(app.appId);
|
||||
}
|
||||
|
||||
// Inform the logService
|
||||
app.dispatcher.getEventHandler().handle(
|
||||
new LogHandlerAppFinishedEvent(app.appId));
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -35,6 +34,8 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The NM maintains only two master-keys. The current key that RM knows and the
|
||||
* key from the previous rolling-interval.
|
||||
|
@ -134,10 +135,6 @@ public class NMContainerTokenSecretManager extends
|
|||
*/
|
||||
public synchronized void startContainerSuccessful(
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
int keyId = tokenId.getMasterKeyId();
|
||||
if (currentMasterKey.getMasterKey().getKeyId() == keyId) {
|
||||
addKeyForContainerId(tokenId.getContainerID(), currentMasterKey);
|
||||
|
@ -154,8 +151,7 @@ public class NMContainerTokenSecretManager extends
|
|||
* via retrievePassword.
|
||||
*/
|
||||
public synchronized boolean isValidStartContainerRequest(
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
ContainerId containerID = tokenId.getContainerID();
|
||||
ContainerId containerID) {
|
||||
ApplicationId applicationId =
|
||||
containerID.getApplicationAttemptId().getApplicationId();
|
||||
return !this.oldMasterKeys.containsKey(applicationId)
|
||||
|
|
|
@ -26,7 +26,11 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
|
@ -173,4 +177,23 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void authorizeRequest(String containerIDStr,
|
||||
ContainerLaunchContext launchContext,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
|
||||
throws YarnRemoteException {
|
||||
// do Nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerTokenIdentifier getContainerTokenIdentifier(
|
||||
UserGroupInformation remoteUgi,
|
||||
org.apache.hadoop.yarn.api.records.Container container)
|
||||
throws YarnRemoteException {
|
||||
return new ContainerTokenIdentifier(container.getId(),
|
||||
container.getNodeHttpAddress(), remoteUgi.getUserName(),
|
||||
container.getResource(), System.currentTimeMillis(), 123);
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
@ -28,6 +30,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
|
||||
public class LocalRMInterface implements ResourceTracker {
|
||||
|
||||
|
@ -38,6 +42,11 @@ public class LocalRMInterface implements ResourceTracker {
|
|||
RegisterNodeManagerRequest request) throws YarnRemoteException,
|
||||
IOException {
|
||||
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
MasterKey masterKey = new MasterKeyPBImpl();
|
||||
masterKey.setKeyId(123);
|
||||
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
|
||||
.byteValue() }));
|
||||
response.setMasterKey(masterKey);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
|
@ -31,7 +33,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||
|
||||
|
@ -67,6 +71,11 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
|||
IOException {
|
||||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
MasterKey masterKey = new MasterKeyPBImpl();
|
||||
masterKey.setKeyId(123);
|
||||
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
|
||||
.byteValue() }));
|
||||
response.setMasterKey(masterKey);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -45,7 +48,6 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
||||
public class TestEventFlow {
|
||||
|
@ -75,6 +77,7 @@ public class TestEventFlow {
|
|||
remoteLogDir.mkdir();
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
Context context = new NMContext(new NMContainerTokenSecretManager(conf));
|
||||
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
|
||||
|
@ -112,6 +115,9 @@ public class TestEventFlow {
|
|||
DummyContainerManager containerManager =
|
||||
new DummyContainerManager(context, exec, del, nodeStatusUpdater,
|
||||
metrics, new ApplicationACLsManager(conf), dirsHandler);
|
||||
nodeStatusUpdater.init(conf);
|
||||
((NMContext)context).setContainerManager(containerManager);
|
||||
nodeStatusUpdater.start();
|
||||
containerManager.init(conf);
|
||||
containerManager.start();
|
||||
|
||||
|
@ -132,7 +138,6 @@ public class TestEventFlow {
|
|||
when(mockContainer.getResource()).thenReturn(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(SIMULATED_RM_IDENTIFIER);
|
||||
|
||||
launchContext.setUser("testing");
|
||||
StartContainerRequest request =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
|
|
|
@ -18,10 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isNull;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -32,6 +38,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -41,16 +49,17 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
|
@ -69,7 +78,6 @@ public class TestNodeManagerReboot {
|
|||
|
||||
static final String user = System.getProperty("user.name");
|
||||
private FileContext localFS;
|
||||
|
||||
private MyNodeManager nm;
|
||||
private DeletionService delService;
|
||||
static final Log LOG = LogFactory.getLog(TestNodeManagerReboot.class);
|
||||
|
@ -87,23 +95,25 @@ public class TestNodeManagerReboot {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
@Test(timeout = 2000000)
|
||||
public void testClearLocalDirWhenNodeReboot() throws IOException,
|
||||
YarnRemoteException {
|
||||
YarnRemoteException, InterruptedException {
|
||||
nm = new MyNodeManager();
|
||||
nm.start();
|
||||
|
||||
final ContainerManager containerManager = nm.getContainerManager();
|
||||
|
||||
// create files under fileCache
|
||||
createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100);
|
||||
localResourceDir.mkdirs();
|
||||
ContainerManagerImpl containerManager = nm.getContainerManager();
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
Records.newRecord(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
mockContainer.setId(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
|
@ -123,17 +133,31 @@ public class TestNodeManagerReboot {
|
|||
new HashMap<String, LocalResource>();
|
||||
localResources.put(destinationFile, localResource);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
containerLaunchContext.setUser(containerLaunchContext.getUser());
|
||||
List<String> commands = new ArrayList<String>();
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Resource resource = Records.newRecord(Resource.class);
|
||||
resource.setMemory(1024);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
StartContainerRequest startRequest =
|
||||
mockContainer.setResource(resource);
|
||||
NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 12345);
|
||||
mockContainer.setContainerToken(nm.getNMContext()
|
||||
.getContainerTokenSecretManager()
|
||||
.createContainerToken(cId, nodeId, user, resource));
|
||||
mockContainer.setNodeHttpAddress("127.0.0.1");
|
||||
mockContainer.setNodeId(nodeId);
|
||||
|
||||
final StartContainerRequest startRequest =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
final UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(cId.toString());
|
||||
currentUser.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws YarnRemoteException, IOException {
|
||||
containerManager.startContainer(startRequest);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
GetContainerStatusRequest request =
|
||||
Records.newRecord(GetContainerStatusRequest.class);
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -33,10 +32,14 @@ import java.util.Map;
|
|||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -46,18 +49,21 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
|
@ -100,7 +106,7 @@ public class TestNodeManagerShutdown {
|
|||
@Test
|
||||
public void testKillContainersOnShutdown() throws IOException,
|
||||
YarnRemoteException {
|
||||
NodeManager nm = getNodeManager();
|
||||
NodeManager nm = new TestNodeManager();
|
||||
nm.init(createNMConfig());
|
||||
nm.start();
|
||||
startContainer(nm, cId, localFS, tmpDir, processStartFile);
|
||||
|
@ -147,19 +153,20 @@ public class TestNodeManagerShutdown {
|
|||
public static void startContainer(NodeManager nm, ContainerId cId,
|
||||
FileContext localFS, File scriptFileDir, File processStartFile)
|
||||
throws IOException, YarnRemoteException {
|
||||
ContainerManagerImpl containerManager = nm.getContainerManager();
|
||||
File scriptFile =
|
||||
createUnhaltingScriptFile(cId, scriptFileDir, processStartFile);
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
Container mockContainer = new ContainerPBImpl();
|
||||
|
||||
mockContainer.setId(cId);
|
||||
|
||||
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
|
||||
when(mockContainer.getNodeId()).thenReturn(nodeId);
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:12345");
|
||||
containerLaunchContext.setUser(user);
|
||||
mockContainer.setNodeId(nodeId);
|
||||
mockContainer.setNodeHttpAddress("localhost:12345");
|
||||
|
||||
containerLaunchContext.setUser(cId.toString());
|
||||
|
||||
URL localResourceUri =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
|
@ -180,11 +187,28 @@ public class TestNodeManagerShutdown {
|
|||
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
mockContainer.setResource(resource);
|
||||
mockContainer.setContainerToken(getContainerToken(nm, cId, nodeId,
|
||||
cId.toString(), resource));
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(cId.toString());
|
||||
|
||||
ContainerManager containerManager =
|
||||
currentUser.doAs(new PrivilegedAction<ContainerManager>() {
|
||||
@Override
|
||||
public ContainerManager run() {
|
||||
Configuration conf = new Configuration();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
InetSocketAddress containerManagerBindAddress =
|
||||
NetUtils.createSocketAddrForHost("127.0.0.1", 12345);
|
||||
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
||||
containerManagerBindAddress, conf);
|
||||
}
|
||||
});
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
GetContainerStatusRequest request =
|
||||
|
@ -249,15 +273,24 @@ public class TestNodeManagerShutdown {
|
|||
return scriptFile;
|
||||
}
|
||||
|
||||
private NodeManager getNodeManager() {
|
||||
return new NodeManager() {
|
||||
public static ContainerToken getContainerToken(NodeManager nm,
|
||||
ContainerId containerId, NodeId nodeId, String user, Resource resource) {
|
||||
return nm.getNMContext().getContainerTokenSecretManager()
|
||||
.createContainerToken(containerId, nodeId, user, resource);
|
||||
}
|
||||
|
||||
class TestNodeManager extends NodeManager {
|
||||
|
||||
@Override
|
||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
|
||||
context, dispatcher, healthChecker, metrics);
|
||||
MockNodeStatusUpdater myNodeStatusUpdater =
|
||||
new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics);
|
||||
return myNodeStatusUpdater;
|
||||
}
|
||||
};
|
||||
|
||||
public void setMasterKey(MasterKey masterKey) {
|
||||
getNMContext().getContainerTokenSecretManager().setMasterKey(masterKey);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -42,7 +43,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -63,8 +63,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -95,7 +97,7 @@ public class TestNodeStatusUpdater {
|
|||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
|
||||
int heartBeatID = 0;
|
||||
volatile int heartBeatID = 0;
|
||||
volatile Throwable nmStartError = null;
|
||||
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
||||
private final Configuration conf = createNMConfig();
|
||||
|
@ -113,6 +115,14 @@ public class TestNodeStatusUpdater {
|
|||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
public static MasterKey createMasterKey() {
|
||||
MasterKey masterKey = new MasterKeyPBImpl();
|
||||
masterKey.setKeyId(123);
|
||||
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
|
||||
.byteValue() }));
|
||||
return masterKey;
|
||||
}
|
||||
|
||||
private class MyResourceTracker implements ResourceTracker {
|
||||
|
||||
private final Context context;
|
||||
|
@ -137,6 +147,7 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setMasterKey(createMasterKey());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -398,6 +409,7 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction );
|
||||
response.setMasterKey(createMasterKey());
|
||||
response.setDiagnosticsMessage(shutDownMessage);
|
||||
return response;
|
||||
}
|
||||
|
@ -435,6 +447,7 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response =
|
||||
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction);
|
||||
response.setMasterKey(createMasterKey());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -485,6 +498,7 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction);
|
||||
response.setMasterKey(createMasterKey());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -577,6 +591,8 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction );
|
||||
response.setMasterKey(createMasterKey());
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -635,13 +651,13 @@ public class TestNodeStatusUpdater {
|
|||
+ nm.getServiceState());
|
||||
|
||||
int waitCount = 0;
|
||||
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
|
||||
while (nm.getServiceState() == STATE.INITED && waitCount++ != 50) {
|
||||
LOG.info("Waiting for NM to start..");
|
||||
if (nmStartError != null) {
|
||||
LOG.error("Error during startup. ", nmStartError);
|
||||
Assert.fail(nmStartError.getCause().getMessage());
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
if (nm.getServiceState() != STATE.STARTED) {
|
||||
// NM could have failed.
|
||||
|
@ -686,7 +702,7 @@ public class TestNodeStatusUpdater {
|
|||
nm.start();
|
||||
|
||||
int waitCount = 0;
|
||||
while (heartBeatID < 1 && waitCount++ != 20) {
|
||||
while (heartBeatID < 1 && waitCount++ != 200) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
Assert.assertFalse(heartBeatID < 1);
|
||||
|
@ -714,7 +730,7 @@ public class TestNodeStatusUpdater {
|
|||
nm.start();
|
||||
|
||||
int waitCount = 0;
|
||||
while (heartBeatID < 1 && waitCount++ != 20) {
|
||||
while (heartBeatID < 1 && waitCount++ != 200) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
Assert.assertFalse(heartBeatID < 1);
|
||||
|
@ -751,9 +767,9 @@ public class TestNodeStatusUpdater {
|
|||
+ "Message from ResourceManager: RM Shutting Down Node");
|
||||
}
|
||||
|
||||
@Test (timeout = 15000)
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRM() {
|
||||
final long delta = 1500;
|
||||
final long delta = 50000;
|
||||
final long connectionWaitSecs = 5;
|
||||
final long connectionRetryIntervalSecs = 1;
|
||||
//Waiting for rmStartIntervalMS, RM will be started
|
||||
|
@ -891,7 +907,7 @@ public class TestNodeStatusUpdater {
|
|||
/**
|
||||
* Test completed containerStatus get back up when heart beat lost
|
||||
*/
|
||||
@Test(timeout = 20000)
|
||||
@Test(timeout = 200000)
|
||||
public void testCompletedContainerStatusBackup() throws Exception {
|
||||
nm = new NodeManager() {
|
||||
@Override
|
||||
|
@ -925,7 +941,7 @@ public class TestNodeStatusUpdater {
|
|||
nm.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
@Test(timeout = 200000)
|
||||
public void testNodeStatusUpdaterRetryAndNMShutdown()
|
||||
throws InterruptedException {
|
||||
final long connectionWaitSecs = 1;
|
||||
|
|
|
@ -29,10 +29,13 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
|
@ -147,7 +151,52 @@ public abstract class BaseContainerManagerTest {
|
|||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
|
||||
|
||||
// Default delSrvc
|
||||
delSrvc = new DeletionService(exec) {
|
||||
delSrvc = createDeletionService();
|
||||
delSrvc.init(conf);
|
||||
|
||||
exec = createContainerExecutor();
|
||||
nodeHealthChecker = new NodeHealthCheckerService();
|
||||
nodeHealthChecker.init(conf);
|
||||
dirsHandler = nodeHealthChecker.getDiskHandler();
|
||||
containerManager = createContainerManager(delSrvc);
|
||||
((NMContext)context).setContainerManager(containerManager);
|
||||
nodeStatusUpdater.init(conf);
|
||||
containerManager.init(conf);
|
||||
nodeStatusUpdater.start();
|
||||
}
|
||||
|
||||
protected ContainerManagerImpl
|
||||
createContainerManager(DeletionService delSrvc) {
|
||||
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||
@Override
|
||||
public void
|
||||
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void authorizeRequest(String containerIDStr,
|
||||
ContainerLaunchContext launchContext, Container container,
|
||||
UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
|
||||
throws YarnRemoteException {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerTokenIdentifier getContainerTokenIdentifier(
|
||||
UserGroupInformation remoteUgi,
|
||||
org.apache.hadoop.yarn.api.records.Container container)
|
||||
throws YarnRemoteException {
|
||||
return new ContainerTokenIdentifier(container.getId(),
|
||||
container.getNodeHttpAddress(), remoteUgi.getUserName(),
|
||||
container.getResource(), System.currentTimeMillis(), 123);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected DeletionService createDeletionService() {
|
||||
return new DeletionService(exec) {
|
||||
@Override
|
||||
public void delete(String user, Path subDir, Path[] baseDirs) {
|
||||
// Don't do any deletions.
|
||||
|
@ -155,22 +204,6 @@ public abstract class BaseContainerManagerTest {
|
|||
+ ", baseDirs - " + baseDirs);
|
||||
};
|
||||
};
|
||||
delSrvc.init(conf);
|
||||
|
||||
exec = createContainerExecutor();
|
||||
nodeHealthChecker = new NodeHealthCheckerService();
|
||||
nodeHealthChecker.init(conf);
|
||||
dirsHandler = nodeHealthChecker.getDiskHandler();
|
||||
containerManager =
|
||||
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||
@Override
|
||||
public void setBlockNewContainerRequests(
|
||||
boolean blockNewContainerRequests) {
|
||||
// do nothing
|
||||
}
|
||||
};
|
||||
containerManager.init(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -62,7 +62,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Test;
|
||||
|
@ -419,15 +418,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
delSrvc = new DeletionService(exec);
|
||||
delSrvc.init(conf);
|
||||
|
||||
containerManager =
|
||||
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||
@Override
|
||||
public void setBlockNewContainerRequests(
|
||||
boolean blockNewContainerRequests) {
|
||||
// do nothing
|
||||
}
|
||||
};
|
||||
containerManager = createContainerManager(delSrvc);
|
||||
containerManager.init(conf);
|
||||
containerManager.start();
|
||||
|
||||
|
|
|
@ -25,10 +25,13 @@ import static org.mockito.Mockito.reset;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -37,6 +40,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
||||
|
@ -54,11 +60,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
|
||||
|
||||
public class TestApplication {
|
||||
|
||||
/**
|
||||
|
@ -257,6 +265,10 @@ public class TestApplication {
|
|||
AuxServicesEventType.APPLICATION_STOP, wa.appId)));
|
||||
|
||||
wa.appResourcesCleanedup();
|
||||
for ( Container container : wa.containers) {
|
||||
Assert.assertTrue(wa.context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||
}
|
||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||
|
||||
} finally {
|
||||
|
@ -293,6 +305,10 @@ public class TestApplication {
|
|||
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
|
||||
|
||||
wa.appResourcesCleanedup();
|
||||
for ( Container container : wa.containers) {
|
||||
Assert.assertTrue(wa.context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||
}
|
||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||
} finally {
|
||||
if (wa != null)
|
||||
|
@ -429,8 +445,10 @@ public class TestApplication {
|
|||
final Application app;
|
||||
|
||||
WrappedApplication(int id, long timestamp, String user, int numContainers) {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
dispatcher = new DrainDispatcher();
|
||||
dispatcher.init(new Configuration());
|
||||
dispatcher.init(conf);
|
||||
|
||||
localizerBus = mock(EventHandler.class);
|
||||
launcherBus = mock(EventHandler.class);
|
||||
|
@ -448,6 +466,16 @@ public class TestApplication {
|
|||
|
||||
context = mock(Context.class);
|
||||
|
||||
when(context.getContainerTokenSecretManager()).thenReturn(
|
||||
new NMContainerTokenSecretManager(conf));
|
||||
|
||||
// Setting master key
|
||||
MasterKey masterKey = new MasterKeyPBImpl();
|
||||
masterKey.setKeyId(123);
|
||||
masterKey.setBytes(ByteBuffer.wrap(new byte[] { (new Integer(123)
|
||||
.byteValue()) }));
|
||||
context.getContainerTokenSecretManager().setMasterKey(masterKey);
|
||||
|
||||
this.user = user;
|
||||
this.appId = BuilderUtils.newApplicationId(timestamp, id);
|
||||
|
||||
|
@ -455,7 +483,13 @@ public class TestApplication {
|
|||
new Configuration()), this.user, appId, null, context);
|
||||
containers = new ArrayList<Container>();
|
||||
for (int i = 0; i < numContainers; i++) {
|
||||
containers.add(createMockedContainer(this.appId, i));
|
||||
Container container = createMockedContainer(this.appId, i);
|
||||
containers.add(container);
|
||||
context.getContainerTokenSecretManager().startContainerSuccessful(
|
||||
new ContainerTokenIdentifier(container.getContainer().getId(), "",
|
||||
"", null, System.currentTimeMillis() + 1000, masterKey.getKeyId()));
|
||||
Assert.assertFalse(context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||
}
|
||||
|
||||
dispatcher.start();
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
[libdefaults]
|
||||
default_realm = APACHE.ORG
|
||||
udp_preference_limit = 1
|
||||
extra_addresses = 127.0.0.1
|
||||
[realms]
|
||||
APACHE.ORG = {
|
||||
admin_server = localhost:88
|
||||
kdc = localhost:88
|
||||
}
|
||||
[domain_realm]
|
||||
localhost = APACHE.ORG
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
|
|||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
|
@ -198,11 +197,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
return response;
|
||||
}
|
||||
|
||||
if (isSecurityEnabled()) {
|
||||
MasterKey nextMasterKeyForNode =
|
||||
this.containerTokenSecretManager.getCurrentKey();
|
||||
response.setMasterKey(nextMasterKeyForNode);
|
||||
}
|
||||
|
||||
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
||||
resolve(host), capability);
|
||||
|
@ -298,10 +295,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
getResponseId() + 1, NodeAction.NORMAL, null, null, null,
|
||||
nextHeartBeatInterval);
|
||||
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
|
||||
|
||||
// Check if node's masterKey needs to be updated and if the currentKey has
|
||||
// roller over, send it across
|
||||
if (isSecurityEnabled()) {
|
||||
|
||||
boolean shouldSendMasterKey = false;
|
||||
|
||||
MasterKey nextMasterKeyForNode =
|
||||
|
@ -317,7 +313,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
if (shouldSendMasterKey) {
|
||||
nodeHeartBeatResponse.setMasterKey(nextMasterKeyForNode);
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Send status to RMNode, saving the latest response.
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
|
@ -341,8 +336,4 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
PolicyProvider policyProvider) {
|
||||
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||
}
|
||||
|
||||
protected boolean isSecurityEnabled() {
|
||||
return UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1296,8 +1296,6 @@ public class LeafQueue implements CSQueue {
|
|||
unreserve(application, priority, node, rmContainer);
|
||||
}
|
||||
|
||||
// Create container tokens in secure-mode
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
ContainerToken containerToken =
|
||||
createContainerToken(application, container);
|
||||
if (containerToken == null) {
|
||||
|
@ -1305,7 +1303,6 @@ public class LeafQueue implements CSQueue {
|
|||
return Resources.none();
|
||||
}
|
||||
container.setContainerToken(containerToken);
|
||||
}
|
||||
|
||||
// Inform the application
|
||||
RMContainer allocatedContainer =
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
|
@ -35,8 +34,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
|
@ -159,17 +158,12 @@ public class AppSchedulable extends Schedulable {
|
|||
NodeId nodeId = node.getRMNode().getNodeID();
|
||||
ContainerId containerId = BuilderUtils.newContainerId(application
|
||||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
ContainerToken containerToken = null;
|
||||
|
||||
// If security is enabled, send the container-tokens too.
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
containerToken =
|
||||
ContainerToken containerToken =
|
||||
containerTokenSecretManager.createContainerToken(containerId, nodeId,
|
||||
application.getUser(), capability);
|
||||
if (containerToken == null) {
|
||||
return null; // Try again later.
|
||||
}
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
|
|
|
@ -552,8 +552,6 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
ContainerToken containerToken = null;
|
||||
|
||||
// If security is enabled, send the container-tokens too.
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
containerToken =
|
||||
this.rmContext.getContainerTokenSecretManager()
|
||||
.createContainerToken(containerId, nodeId,
|
||||
|
@ -561,7 +559,6 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
if (containerToken == null) {
|
||||
return i; // Try again later.
|
||||
}
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
|
|
|
@ -202,15 +202,18 @@ public class MockNodes {
|
|||
};
|
||||
|
||||
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
|
||||
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++);
|
||||
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null);
|
||||
}
|
||||
|
||||
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum) {
|
||||
private static RMNode buildRMNode(int rack, final Resource perNode,
|
||||
NodeState state, String httpAddr, int hostnum, String hostName) {
|
||||
final String rackName = "rack"+ rack;
|
||||
final int nid = hostnum;
|
||||
final String hostName = "host"+ nid;
|
||||
final String nodeAddr = hostName + ":" + nid;
|
||||
final int port = 123;
|
||||
if (hostName == null) {
|
||||
hostName = "host"+ nid;
|
||||
}
|
||||
final NodeId nodeID = newNodeID(hostName, port);
|
||||
final String httpAddress = httpAddr;
|
||||
final NodeHealthStatus nodeHealthStatus =
|
||||
|
@ -233,6 +236,12 @@ public class MockNodes {
|
|||
}
|
||||
|
||||
public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
|
||||
return buildRMNode(rack, perNode, null, "localhost:0", hostnum);
|
||||
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null);
|
||||
}
|
||||
|
||||
public static RMNode newNodeInfo(int rack, final Resource perNode,
|
||||
int hostnum, String hostName) {
|
||||
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -298,8 +299,12 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
@Override
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(new Configuration());
|
||||
containerTokenSecretManager.rollMasterKey();
|
||||
return new ResourceTrackerService(getRMContext(), nodesListManager,
|
||||
this.nmLivelinessMonitor, this.containerTokenSecretManager) {
|
||||
this.nmLivelinessMonitor, containerTokenSecretManager) {
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
// override to not start rpc handler
|
||||
|
|
|
@ -59,7 +59,7 @@ public class TestApplicationCleanup {
|
|||
MockRM rm = new MockRM();
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5000);
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
|
||||
|
||||
RMApp app = rm.submitApp(2000);
|
||||
|
||||
|
@ -72,7 +72,7 @@ public class TestApplicationCleanup {
|
|||
|
||||
//request for containers
|
||||
int request = 2;
|
||||
am.allocate("h1" , 1000, request,
|
||||
am.allocate("127.0.0.1" , 1000, request,
|
||||
new ArrayList<ContainerId>());
|
||||
|
||||
//kick the scheduler
|
||||
|
@ -147,7 +147,7 @@ public class TestApplicationCleanup {
|
|||
};
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5000);
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
|
||||
|
||||
RMApp app = rm.submitApp(2000);
|
||||
|
||||
|
@ -160,7 +160,7 @@ public class TestApplicationCleanup {
|
|||
|
||||
//request for containers
|
||||
int request = 2;
|
||||
am.allocate("h1" , 1000, request,
|
||||
am.allocate("127.0.0.1" , 1000, request,
|
||||
new ArrayList<ContainerId>());
|
||||
dispatcher.await();
|
||||
|
||||
|
|
|
@ -112,7 +112,7 @@ public class TestApplicationMasterLauncher {
|
|||
MockRMWithCustomAMLauncher rm = new MockRMWithCustomAMLauncher(
|
||||
containerManager);
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
|
||||
|
||||
RMApp app = rm.submitApp(2000);
|
||||
|
||||
|
|
|
@ -74,8 +74,8 @@ public class TestFifoScheduler {
|
|||
rootLogger.setLevel(Level.DEBUG);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
|
||||
MockNM nm2 = rm.registerNode("h2:5678", 4 * GB);
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
|
||||
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
// kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1
|
||||
|
@ -98,10 +98,10 @@ public class TestFifoScheduler {
|
|||
Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory());
|
||||
|
||||
// add request for containers
|
||||
am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
|
||||
am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, GB, 1, 1);
|
||||
AllocateResponse alloc1Response = am1.schedule(); // send the request
|
||||
// add request for containers
|
||||
am2.addRequests(new String[] { "h1", "h2" }, 3 * GB, 0, 1);
|
||||
am2.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 0, 1);
|
||||
AllocateResponse alloc2Response = am2.schedule(); // send the request
|
||||
|
||||
// kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0
|
||||
|
@ -163,7 +163,7 @@ public class TestFifoScheduler {
|
|||
rm.start();
|
||||
|
||||
// Register node1
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
|
||||
// Submit an application
|
||||
RMApp app1 = rm.submitApp(testAlloc);
|
||||
|
@ -212,8 +212,10 @@ public class TestFifoScheduler {
|
|||
FifoScheduler fs = new FifoScheduler();
|
||||
fs.reinitialize(conf, null);
|
||||
|
||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
||||
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
|
||||
RMNode n1 =
|
||||
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
|
||||
RMNode n2 =
|
||||
MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2, "127.0.0.3");
|
||||
|
||||
fs.handle(new NodeAddedSchedulerEvent(n1));
|
||||
fs.handle(new NodeAddedSchedulerEvent(n2));
|
||||
|
@ -222,7 +224,8 @@ public class TestFifoScheduler {
|
|||
Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
||||
|
||||
// reconnect n1 with downgraded memory
|
||||
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
|
||||
n1 =
|
||||
MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1, "127.0.0.2");
|
||||
fs.handle(new NodeRemovedSchedulerEvent(n1));
|
||||
fs.handle(new NodeAddedSchedulerEvent(n1));
|
||||
fs.handle(new NodeUpdateSchedulerEvent(n1));
|
||||
|
@ -241,7 +244,8 @@ public class TestFifoScheduler {
|
|||
FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
|
||||
|
||||
// Add a node
|
||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
||||
RMNode n1 =
|
||||
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
|
||||
fs.handle(new NodeAddedSchedulerEvent(n1));
|
||||
|
||||
// Add two applications
|
||||
|
|
|
@ -95,8 +95,10 @@ public class TestRMRestart {
|
|||
// start like normal because state is empty
|
||||
rm1.start();
|
||||
|
||||
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
||||
MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService());
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.2:5678", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
nm2.registerNode(); // nm2 will not heartbeat with RM1
|
||||
|
||||
|
@ -145,7 +147,7 @@ public class TestRMRestart {
|
|||
am1.registerAppAttempt();
|
||||
|
||||
// AM request for containers
|
||||
am1.allocate("h1" , 1000, 1, new ArrayList<ContainerId>());
|
||||
am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
|
||||
// kick the scheduler
|
||||
nm1.nodeHeartbeat(true);
|
||||
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
|
@ -244,8 +246,8 @@ public class TestRMRestart {
|
|||
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
|
||||
|
||||
// new NM to represent NM re-register
|
||||
nm1 = rm2.registerNode("h1:1234", 15120);
|
||||
nm2 = rm2.registerNode("h2:5678", 15120);
|
||||
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
|
||||
nm2 = rm2.registerNode("127.0.0.2:5678", 15120);
|
||||
|
||||
// verify no more reboot response sent
|
||||
hbResponse = nm1.nodeHeartbeat(true);
|
||||
|
@ -265,7 +267,8 @@ public class TestRMRestart {
|
|||
|
||||
// Nodes on which the AM's run
|
||||
MockNM am1Node = nm1;
|
||||
if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
|
||||
if (attemptState.getMasterContainer().getNodeId().toString()
|
||||
.contains("127.0.0.2")) {
|
||||
am1Node = nm2;
|
||||
}
|
||||
|
||||
|
@ -280,7 +283,8 @@ public class TestRMRestart {
|
|||
attemptState.getMasterContainer().getId());
|
||||
|
||||
MockNM am2Node = nm1;
|
||||
if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
|
||||
if (attemptState.getMasterContainer().getNodeId().toString()
|
||||
.contains("127.0.0.2")) {
|
||||
am2Node = nm2;
|
||||
}
|
||||
|
||||
|
@ -292,8 +296,8 @@ public class TestRMRestart {
|
|||
am2.registerAppAttempt();
|
||||
|
||||
//request for containers
|
||||
am1.allocate("h1" , 1000, 3, new ArrayList<ContainerId>());
|
||||
am2.allocate("h2" , 1000, 1, new ArrayList<ContainerId>());
|
||||
am1.allocate("127.0.0.1" , 1000, 3, new ArrayList<ContainerId>());
|
||||
am2.allocate("127.0.0.2" , 1000, 1, new ArrayList<ContainerId>());
|
||||
|
||||
// verify container allocate continues to work
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
@ -346,7 +350,8 @@ public class TestRMRestart {
|
|||
rmState.getApplicationState();
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// submit an app with maxAppAttempts equals to 1
|
||||
|
|
|
@ -52,6 +52,7 @@ public class TestResourceManager {
|
|||
Configuration conf = new YarnConfiguration();
|
||||
resourceManager = new ResourceManager();
|
||||
resourceManager.init(conf);
|
||||
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -48,13 +48,13 @@ public class TestApplicationMasterService {
|
|||
ResourceScheduler.class);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@Test(timeout = 3000000)
|
||||
public void testRMIdentifierOnContainerAllocation() throws Exception {
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
// Register node1
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
|
||||
// Submit an application
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
|
@ -65,7 +65,7 @@ public class TestApplicationMasterService {
|
|||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
|
||||
am1.addRequests(new String[] { "h1" }, GB, 1, 1);
|
||||
am1.addRequests(new String[] { "127.0.0.1" }, GB, 1, 1);
|
||||
AllocateResponse alloc1Response = am1.schedule(); // send the request
|
||||
|
||||
// kick the scheduler
|
||||
|
|
|
@ -90,10 +90,10 @@ public class TestAMRMRPCNodeUpdates {
|
|||
@Test
|
||||
public void testAMRMUnusableNodes() throws Exception {
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 10000);
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 10000);
|
||||
MockNM nm3 = rm.registerNode("h3:1234", 10000);
|
||||
MockNM nm4 = rm.registerNode("h4:1234", 10000);
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
|
||||
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
|
||||
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000);
|
||||
MockNM nm4 = rm.registerNode("127.0.0.4:1234", 10000);
|
||||
|
||||
RMApp app1 = rm.submitApp(2000);
|
||||
|
||||
|
|
|
@ -59,6 +59,7 @@ public class TestRMNMRPCResponseId {
|
|||
|
||||
@Before
|
||||
public void setUp() {
|
||||
Configuration conf = new Configuration();
|
||||
// Dispatcher that processes events inline
|
||||
Dispatcher dispatcher = new InlineDispatcher();
|
||||
dispatcher.register(SchedulerEventType.class, new EventHandler<Event>() {
|
||||
|
@ -69,17 +70,16 @@ public class TestRMNMRPCResponseId {
|
|||
});
|
||||
RMContext context =
|
||||
new RMContextImpl(dispatcher, null, null, null, null,
|
||||
null, null, null);
|
||||
null, new RMContainerTokenSecretManager(conf), null);
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
new ResourceManager.NodeEventDispatcher(context));
|
||||
NodesListManager nodesListManager = new NodesListManager(context);
|
||||
Configuration conf = new Configuration();
|
||||
nodesListManager.init(conf);
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(conf);
|
||||
|
||||
context.getContainerTokenSecretManager().rollMasterKey();
|
||||
resourceTrackerService = new ResourceTrackerService(context,
|
||||
nodesListManager, new NMLivelinessMonitor(dispatcher),
|
||||
containerTokenSecretManager);
|
||||
context.getContainerTokenSecretManager());
|
||||
resourceTrackerService.init(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -88,6 +89,11 @@ public class TestApplicationLimits {
|
|||
thenReturn(CapacityScheduler.queueComparator);
|
||||
when(csContext.getResourceCalculator()).
|
||||
thenReturn(resourceCalculator);
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(conf);
|
||||
containerTokenSecretManager.rollMasterKey();
|
||||
when(csContext.getContainerTokenSecretManager()).thenReturn(
|
||||
containerTokenSecretManager);
|
||||
|
||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||
CSQueue root =
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -122,6 +123,11 @@ public class TestLeafQueue {
|
|||
thenReturn(CapacityScheduler.queueComparator);
|
||||
when(csContext.getResourceCalculator()).
|
||||
thenReturn(resourceCalculator);
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(conf);
|
||||
containerTokenSecretManager.rollMasterKey();
|
||||
when(csContext.getContainerTokenSecretManager()).thenReturn(
|
||||
containerTokenSecretManager);
|
||||
|
||||
root =
|
||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
||||
|
@ -275,7 +281,7 @@ public class TestLeafQueue {
|
|||
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||
|
||||
final int numNodes = 1;
|
||||
|
@ -397,7 +403,7 @@ public class TestLeafQueue {
|
|||
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||
|
||||
final int numNodes = 1;
|
||||
|
@ -528,9 +534,9 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_2, user_1, A);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||
String host_1 = "host_1";
|
||||
String host_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
|
||||
|
||||
final int numNodes = 2;
|
||||
|
@ -622,9 +628,9 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_2, user_1, A);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||
String host_1 = "host_1";
|
||||
String host_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
|
||||
|
||||
final int numNodes = 2;
|
||||
|
@ -740,7 +746,7 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_3, user_2, A);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||
|
||||
final int numNodes = 1;
|
||||
|
@ -902,7 +908,7 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_1, user_1, A);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
|
||||
|
||||
final int numNodes = 2;
|
||||
|
@ -1002,9 +1008,9 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_1, user_1, A);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
|
||||
String host_1 = "host_1";
|
||||
String host_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
|
||||
|
||||
final int numNodes = 3;
|
||||
|
@ -1102,10 +1108,10 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_1, user_1, A);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
|
||||
|
||||
String host_1 = "host_1";
|
||||
String host_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
|
||||
|
||||
final int numNodes = 3;
|
||||
|
@ -1214,15 +1220,15 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_0, user_0, A);
|
||||
|
||||
// Setup some nodes and racks
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
String rack_0 = "rack_0";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
|
||||
|
||||
String host_1 = "host_1";
|
||||
String host_1 = "127.0.0.2";
|
||||
String rack_1 = "rack_1";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
|
||||
|
||||
String host_2 = "host_2";
|
||||
String host_2 = "127.0.0.3";
|
||||
String rack_2 = "rack_2";
|
||||
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
|
||||
|
||||
|
@ -1317,7 +1323,7 @@ public class TestLeafQueue {
|
|||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
String host_3 = "host_3"; // on rack_1
|
||||
String host_3 = "127.0.0.4"; // on rack_1
|
||||
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
|
||||
|
||||
// Rack-delay
|
||||
|
@ -1355,15 +1361,15 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_0, user_0, A);
|
||||
|
||||
// Setup some nodes and racks
|
||||
String host_0 = "host_0";
|
||||
String host_0 = "127.0.0.1";
|
||||
String rack_0 = "rack_0";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
|
||||
|
||||
String host_1 = "host_1";
|
||||
String host_1 = "127.0.0.2";
|
||||
String rack_1 = "rack_1";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
|
||||
|
||||
String host_2 = "host_2";
|
||||
String host_2 = "127.0.0.3";
|
||||
String rack_2 = "rack_2";
|
||||
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
|
||||
|
||||
|
@ -1486,14 +1492,14 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_0, user_0, A);
|
||||
|
||||
// Setup some nodes and racks
|
||||
String host_0_0 = "host_0_0";
|
||||
String host_0_0 = "127.0.0.1";
|
||||
String rack_0 = "rack_0";
|
||||
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
|
||||
String host_0_1 = "host_0_1";
|
||||
String host_0_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
|
||||
|
||||
|
||||
String host_1_0 = "host_1_0";
|
||||
String host_1_0 = "127.0.0.3";
|
||||
String rack_1 = "rack_1";
|
||||
FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
|
||||
|
||||
|
|
|
@ -123,6 +123,8 @@ public class TestFairScheduler {
|
|||
resourceManager.init(conf);
|
||||
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
// to initialize the master key
|
||||
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -221,13 +223,16 @@ public class TestFairScheduler {
|
|||
@Test
|
||||
public void testAggregateCapacityTracking() throws Exception {
|
||||
// Add a node
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
assertEquals(1024, scheduler.getClusterCapacity().getMemory());
|
||||
|
||||
// Add another node
|
||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
assertEquals(1536, scheduler.getClusterCapacity().getMemory());
|
||||
|
@ -241,7 +246,9 @@ public class TestFairScheduler {
|
|||
@Test
|
||||
public void testSimpleFairShareCalculation() {
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024));
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -265,7 +272,9 @@ public class TestFairScheduler {
|
|||
public void testSimpleHierarchicalFairShareCalculation() {
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
int capacity = 10 * 24;
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity));
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(capacity), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -313,12 +322,15 @@ public class TestFairScheduler {
|
|||
@Test (timeout = 5000)
|
||||
public void testSimpleContainerAllocation() {
|
||||
// Add a node
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// Add another node
|
||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
|
@ -351,7 +363,9 @@ public class TestFairScheduler {
|
|||
@Test (timeout = 5000)
|
||||
public void testSimpleContainerReservation() throws InterruptedException {
|
||||
// Add a node
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -359,6 +373,7 @@ public class TestFairScheduler {
|
|||
createSchedulingRequest(1024, "queue1", "user1", 1);
|
||||
scheduler.update();
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
// Make sure queue 1 is allocated app capacity
|
||||
|
@ -376,7 +391,9 @@ public class TestFairScheduler {
|
|||
assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
|
||||
|
||||
// Now another node checks in with capacity
|
||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node2 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
@ -442,7 +459,9 @@ public class TestFairScheduler {
|
|||
queueManager.initialize();
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024));
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -799,7 +818,9 @@ public class TestFairScheduler {
|
|||
queueManager.initialize();
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -857,7 +878,9 @@ public class TestFairScheduler {
|
|||
queueManager.initialize();
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -933,15 +956,21 @@ public class TestFairScheduler {
|
|||
queueManager.initialize();
|
||||
|
||||
// Create four nodes
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
|
||||
RMNode node3 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
|
||||
"127.0.0.3");
|
||||
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
||||
scheduler.handle(nodeEvent3);
|
||||
|
||||
|
@ -1094,15 +1123,21 @@ public class TestFairScheduler {
|
|||
queueManager.initialize();
|
||||
|
||||
// Create four nodes
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
|
||||
RMNode node3 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
|
||||
"127.0.0.3");
|
||||
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
||||
scheduler.handle(nodeEvent3);
|
||||
|
||||
|
@ -1183,7 +1218,9 @@ public class TestFairScheduler {
|
|||
@Test (timeout = 5000)
|
||||
public void testMultipleContainersWaitingForReservation() {
|
||||
// Add a node
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -1226,7 +1263,9 @@ public class TestFairScheduler {
|
|||
queueManager.initialize();
|
||||
|
||||
// Add a node
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -1263,7 +1302,9 @@ public class TestFairScheduler {
|
|||
@Test (timeout = 5000)
|
||||
public void testReservationWhileMultiplePriorities() {
|
||||
// Add a node
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -1348,9 +1389,15 @@ public class TestFairScheduler {
|
|||
|
||||
@Test (timeout = 5000)
|
||||
public void testMultipleNodesSingleRackRequest() throws Exception {
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node3 = MockNodes.newNodeInfo(2, Resources.createResource(1024));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
RMNode node2 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
|
||||
RMNode node3 =
|
||||
MockNodes
|
||||
.newNodeInfo(2, Resources.createResource(1024), 3, "127.0.0.3");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
|
@ -1388,7 +1435,9 @@ public class TestFairScheduler {
|
|||
|
||||
@Test (timeout = 5000)
|
||||
public void testFifoWithinQueue() throws Exception {
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3072));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -1432,7 +1481,9 @@ public class TestFairScheduler {
|
|||
scheduler.getQueueManager().getLeafQueue("root.default")
|
||||
.setPolicy(SchedulingPolicy.getDefault());
|
||||
|
||||
RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384));
|
||||
RMNode node =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(16384), 0,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||
scheduler.handle(nodeEvent);
|
||||
|
@ -1477,8 +1528,12 @@ public class TestFairScheduler {
|
|||
final String fairChild1 = fairParent + ".fairChild1";
|
||||
final String fairChild2 = fairParent + ".fairChild2";
|
||||
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
|
||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
|
||||
RMNode node2 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2");
|
||||
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
|
@ -1597,7 +1652,9 @@ public class TestFairScheduler {
|
|||
|
||||
@Test
|
||||
public void testReservationThatDoesntFit() {
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -153,14 +154,17 @@ public class TestFifoScheduler {
|
|||
@Test(timeout=2000)
|
||||
public void testNodeLocalAssignment() throws Exception {
|
||||
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(new Configuration());
|
||||
containerTokenSecretManager.rollMasterKey();
|
||||
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
|
||||
null, null, null);
|
||||
null, containerTokenSecretManager, null);
|
||||
|
||||
FifoScheduler scheduler = new FifoScheduler();
|
||||
scheduler.reinitialize(new Configuration(), rmContext);
|
||||
|
||||
RMNode node0 = MockNodes.newNodeInfo(1,
|
||||
Resources.createResource(1024 * 64), 1234);
|
||||
Resources.createResource(1024 * 64), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
|
|
@ -121,7 +121,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testApps() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testAppsHelper("apps", app1, MediaType.APPLICATION_JSON);
|
||||
|
@ -131,7 +131,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsSlash() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testAppsHelper("apps/", app1, MediaType.APPLICATION_JSON);
|
||||
|
@ -141,7 +141,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsDefault() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testAppsHelper("apps/", app1, "");
|
||||
|
@ -151,7 +151,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsXML() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -176,7 +176,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsXMLMulti() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024, "testwordcount", "user1");
|
||||
rm.submitApp(2048, "testwordcount2", "user1");
|
||||
|
||||
|
@ -220,7 +220,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryState() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -242,7 +242,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryStateNone() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -260,7 +260,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryStateInvalid() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -298,7 +298,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryFinalStatus() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -321,7 +321,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryFinalStatusNone() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -339,7 +339,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryFinalStatusInvalid() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -377,7 +377,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryUser() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
|
||||
|
@ -405,7 +405,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryQueue() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
|
||||
|
@ -428,7 +428,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryLimit() throws JSONException, Exception {
|
||||
rm.start();
|
||||
rm.registerNode("amNM:1234", 2048);
|
||||
rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
|
@ -451,7 +451,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
rm.start();
|
||||
long start = System.currentTimeMillis();
|
||||
Thread.sleep(1);
|
||||
rm.registerNode("amNM:1234", 2048);
|
||||
rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
|
@ -472,7 +472,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryStartBeginSome() throws JSONException, Exception {
|
||||
rm.start();
|
||||
rm.registerNode("amNM:1234", 2048);
|
||||
rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
long start = System.currentTimeMillis();
|
||||
|
@ -495,7 +495,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryStartEnd() throws JSONException, Exception {
|
||||
rm.start();
|
||||
rm.registerNode("amNM:1234", 2048);
|
||||
rm.registerNode("127.0.0.1:1234", 2048);
|
||||
long end = System.currentTimeMillis();
|
||||
Thread.sleep(1);
|
||||
rm.submitApp(1024);
|
||||
|
@ -515,7 +515,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryStartBeginEnd() throws JSONException, Exception {
|
||||
rm.start();
|
||||
rm.registerNode("amNM:1234", 2048);
|
||||
rm.registerNode("127.0.0.1:1234", 2048);
|
||||
long start = System.currentTimeMillis();
|
||||
Thread.sleep(1);
|
||||
rm.submitApp(1024);
|
||||
|
@ -541,7 +541,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryFinishBegin() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
long start = System.currentTimeMillis();
|
||||
Thread.sleep(1);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
|
@ -573,7 +573,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryFinishEnd() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
// finish App
|
||||
|
@ -605,7 +605,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppsQueryFinishBeginEnd() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
long start = System.currentTimeMillis();
|
||||
Thread.sleep(1);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
|
@ -640,7 +640,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testSingleApp() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testSingleAppsHelper(app1.getApplicationId().toString(), app1,
|
||||
|
@ -651,7 +651,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testSingleAppsSlash() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testSingleAppsHelper(app1.getApplicationId().toString() + "/", app1,
|
||||
|
@ -662,7 +662,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testSingleAppsDefault() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testSingleAppsHelper(app1.getApplicationId().toString() + "/", app1, "");
|
||||
|
@ -672,7 +672,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testInvalidApp() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -708,7 +708,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testNonexistApp() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -757,7 +757,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testSingleAppsXML() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -858,7 +858,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppAttempts() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testAppAttemptsHelper(app1.getApplicationId().toString(), app1,
|
||||
|
@ -869,7 +869,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testMultipleAppAttempts() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
int maxAppAttempts = rm.getConfig().getInt(
|
||||
|
@ -895,7 +895,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppAttemptsSlash() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testAppAttemptsHelper(app1.getApplicationId().toString() + "/", app1,
|
||||
|
@ -906,7 +906,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppAttemtpsDefault() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
testAppAttemptsHelper(app1.getApplicationId().toString() + "/", app1, "");
|
||||
|
@ -916,7 +916,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testInvalidAppAttempts() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -952,7 +952,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testNonexistAppAttempts() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
@ -1014,7 +1014,7 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
@Test
|
||||
public void testAppAttemptsXML() throws JSONException, Exception {
|
||||
rm.start();
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
WebResource r = resource();
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.yarn.server;
|
|||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -39,14 +37,11 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -86,8 +81,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSe
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerManagerSecurity {
|
||||
|
@ -95,39 +88,51 @@ public class TestContainerManagerSecurity {
|
|||
static Log LOG = LogFactory.getLog(TestContainerManagerSecurity.class);
|
||||
static final RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
private static FileContext localFS = null;
|
||||
private static final File localDir = new File("target",
|
||||
TestContainerManagerSecurity.class.getName() + "-localDir")
|
||||
.getAbsoluteFile();
|
||||
private static MiniYARNCluster yarnCluster;
|
||||
|
||||
static final Configuration conf = new Configuration();
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws AccessControlException,
|
||||
FileNotFoundException, UnsupportedFileSystemException, IOException {
|
||||
localFS = FileContext.getLocalFSFileContext();
|
||||
localFS.delete(new Path(localDir.getAbsolutePath()), true);
|
||||
localDir.mkdir();
|
||||
|
||||
@Test (timeout = 1000000)
|
||||
public void testContainerManagerWithSecurityEnabled() throws Exception {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
// Set AM expiry interval to be very long.
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
testContainerManager();
|
||||
}
|
||||
|
||||
@Test (timeout=1000000)
|
||||
public void testContainerManagerWithSecurityDisabled() throws Exception {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"simple");
|
||||
testContainerManager();
|
||||
}
|
||||
|
||||
private void testContainerManager() throws Exception {
|
||||
try {
|
||||
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
|
||||
.getName(), 1, 1, 1);
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
// Testing for authenticated user
|
||||
testAuthenticatedUser();
|
||||
|
||||
// Testing for malicious user
|
||||
testMaliceUser();
|
||||
|
||||
// Testing for unauthorized user
|
||||
testUnauthorizedUser();
|
||||
|
||||
} finally {
|
||||
if (yarnCluster != null) {
|
||||
yarnCluster.stop();
|
||||
yarnCluster = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuthenticatedUser() throws IOException,
|
||||
private void testAuthenticatedUser() throws IOException,
|
||||
InterruptedException, YarnRemoteException {
|
||||
|
||||
LOG.info("Running test for authenticated user");
|
||||
|
@ -179,8 +184,7 @@ public class TestContainerManagerSecurity {
|
|||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaliceUser() throws IOException, InterruptedException,
|
||||
private void testMaliceUser() throws IOException, InterruptedException,
|
||||
YarnRemoteException {
|
||||
|
||||
LOG.info("Running test for malice user");
|
||||
|
@ -265,8 +269,7 @@ public class TestContainerManagerSecurity {
|
|||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnauthorizedUser() throws IOException, InterruptedException,
|
||||
private void testUnauthorizedUser() throws IOException, InterruptedException,
|
||||
YarnRemoteException {
|
||||
|
||||
LOG.info("\n\nRunning test for malice user");
|
||||
|
@ -316,9 +319,9 @@ public class TestContainerManagerSecurity {
|
|||
|
||||
LOG.info("Going to contact NM: unauthorized request");
|
||||
|
||||
callWithIllegalContainerID(client, tokenId);
|
||||
callWithIllegalResource(client, tokenId);
|
||||
callWithIllegalUserName(client, tokenId);
|
||||
callWithIllegalContainerID(client, tokenId, allocatedContainer);
|
||||
callWithIllegalResource(client, tokenId, allocatedContainer);
|
||||
callWithIllegalUserName(client, tokenId, allocatedContainer);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
@ -336,10 +339,11 @@ public class TestContainerManagerSecurity {
|
|||
resourceManager.getRMContainerTokenSecretManager();
|
||||
final ContainerTokenIdentifier newTokenId =
|
||||
new ContainerTokenIdentifier(tokenId.getContainerID(),
|
||||
tokenId.getNmHostAddress(), "testUser", tokenId.getResource(),
|
||||
tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(),
|
||||
tokenId.getResource(),
|
||||
System.currentTimeMillis() - 1,
|
||||
containerTokenSecreteManager.getCurrentKey().getKeyId());
|
||||
byte[] passowrd =
|
||||
final byte[] passowrd =
|
||||
containerTokenSecreteManager.createPassword(
|
||||
newTokenId);
|
||||
// Create a valid token by using the key from the RM.
|
||||
|
@ -358,13 +362,12 @@ public class TestContainerManagerSecurity {
|
|||
|
||||
LOG.info("Going to contact NM with expired token");
|
||||
ContainerLaunchContext context = createContainerLaunchContextForTest(newTokenId);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(newTokenId.getContainerID(), null, null,
|
||||
BuilderUtils.newResource(newTokenId.getResource().getMemory(),
|
||||
newTokenId.getResource().getVirtualCores()), null, null, 0);
|
||||
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
|
||||
StartContainerRequest request =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
allocatedContainer.setContainerToken(BuilderUtils.newContainerToken(
|
||||
allocatedContainer.getNodeId(), passowrd, newTokenId));
|
||||
request.setContainer(allocatedContainer);
|
||||
|
||||
//Calling startContainer with an expired token.
|
||||
try {
|
||||
|
@ -453,6 +456,7 @@ public class TestContainerManagerSecurity {
|
|||
// Ask for a container from the RM
|
||||
final InetSocketAddress schedulerAddr =
|
||||
resourceManager.getApplicationMasterService().getBindAddress();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
|
||||
appAttempt.getAppAttemptId());
|
||||
ApplicationTokenSecretManager appTokenSecretManager =
|
||||
|
@ -464,6 +468,7 @@ public class TestContainerManagerSecurity {
|
|||
appTokenSecretManager);
|
||||
SecurityUtil.setTokenService(appToken, schedulerAddr);
|
||||
currentUser.addToken(appToken);
|
||||
}
|
||||
|
||||
AMRMProtocol scheduler = currentUser
|
||||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
|
@ -519,16 +524,20 @@ public class TestContainerManagerSecurity {
|
|||
}
|
||||
|
||||
void callWithIllegalContainerID(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
GetContainerStatusRequest request = recordFactory
|
||||
.newRecordInstance(GetContainerStatusRequest.class);
|
||||
ContainerTokenIdentifier tokenId, Container container) {
|
||||
StartContainerRequest request = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils
|
||||
.newApplicationAttemptId(tokenId.getContainerID()
|
||||
.getApplicationAttemptId().getApplicationId(), 1), 42);
|
||||
request.setContainerId(newContainerId); // Authenticated but
|
||||
// unauthorized.
|
||||
ContainerId oldContainerId = container.getId();
|
||||
try {
|
||||
client.getContainerStatus(request);
|
||||
container.setId(newContainerId);
|
||||
request.setContainer(container);
|
||||
request.setContainerLaunchContext(context);
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
+ "access is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
|
@ -540,19 +549,20 @@ public class TestContainerManagerSecurity {
|
|||
} catch (IOException e) {
|
||||
LOG.info("Got IOException: ",e);
|
||||
fail("IOException is not expected.");
|
||||
} finally {
|
||||
container.setId(oldContainerId);
|
||||
}
|
||||
}
|
||||
|
||||
void callWithIllegalResource(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
ContainerTokenIdentifier tokenId, Container container) {
|
||||
StartContainerRequest request = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
// Authenticated but unauthorized, due to wrong resource
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
||||
BuilderUtils.newResource(2048, 1), null, null, 0);
|
||||
Resource rsrc = container.getResource();
|
||||
container.setResource(BuilderUtils.newResource(2048, 1));
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
try {
|
||||
|
@ -570,20 +580,17 @@ public class TestContainerManagerSecurity {
|
|||
LOG.info("Got IOException: ",e);
|
||||
fail("IOException is not expected.");
|
||||
}
|
||||
container.setResource(rsrc);
|
||||
}
|
||||
|
||||
void callWithIllegalUserName(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
ContainerTokenIdentifier tokenId, Container container) {
|
||||
StartContainerRequest request = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
// Authenticated but unauthorized, due to wrong resource
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
context.setUser("Saruman"); // Set a different user-name.
|
||||
Container container =
|
||||
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
||||
BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId
|
||||
.getResource().getVirtualCores()), null, null, 0);
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
try {
|
||||
|
@ -607,7 +614,8 @@ public class TestContainerManagerSecurity {
|
|||
ContainerTokenIdentifier tokenId) {
|
||||
ContainerLaunchContext context =
|
||||
BuilderUtils.newContainerLaunchContext(
|
||||
"testUser", new HashMap<String, LocalResource>(),
|
||||
tokenId.getApplicationSubmitter(),
|
||||
new HashMap<String, LocalResource>(),
|
||||
new HashMap<String, String>(), new ArrayList<String>(),
|
||||
new HashMap<String, ByteBuffer>(), null,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
|
|
|
@ -37,17 +37,25 @@ import org.junit.Test;
|
|||
|
||||
public class TestRMNMSecretKeys {
|
||||
|
||||
@Test
|
||||
@Test(timeout = 1000000)
|
||||
public void testNMUpdation() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
// validating RM NM keys for Unsecured environment
|
||||
validateRMNMKeyExchange(conf);
|
||||
|
||||
// validating RM NM keys for secured environment
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
validateRMNMKeyExchange(conf);
|
||||
}
|
||||
|
||||
private void validateRMNMKeyExchange(YarnConfiguration conf) throws Exception {
|
||||
// Default rolling and activation intervals are large enough, no need to
|
||||
// intervene
|
||||
|
||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
ResourceManager rm = new ResourceManager() {
|
||||
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Do nothing.
|
||||
|
@ -110,7 +118,8 @@ public class TestRMNMSecretKeys {
|
|||
dispatcher.await();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull(
|
||||
Assert
|
||||
.assertNull(
|
||||
"Even second heartbeat after activation shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
dispatcher.await();
|
||||
|
|
Loading…
Reference in New Issue