() {
+
+ @Override
+ public ContainerManagementProtocol run() {
+ return (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class, cmAddr, conf);
+ }
+ });
+ return proxy;
+ }
+
+ public ContainerManagementProtocol getContainerManagementProtocol() {
+ return proxy;
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
index bef03882497..02cfbfb953c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
@@ -19,9 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -29,31 +27,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
/**
*
@@ -91,14 +81,18 @@ public class NMClientImpl extends NMClient {
new ConcurrentHashMap();
//enabled by default
- private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private ContainerManagementProtocolProxy cmProxy;
+ private ConcurrentMap nmTokens;
- public NMClientImpl() {
+ public NMClientImpl(ConcurrentMap nmTokens) {
super(NMClientImpl.class.getName());
+ this.nmTokens = nmTokens;
}
- public NMClientImpl(String name) {
+ public NMClientImpl(String name, ConcurrentMap nmTokens) {
super(name);
+ this.nmTokens = nmTokens;
}
@Override
@@ -108,6 +102,7 @@ public class NMClientImpl extends NMClient {
if (getCleanupRunningContainers().get()) {
cleanupRunningContainers();
}
+ cmProxy.stopAllProxies();
super.serviceStop();
}
@@ -115,8 +110,7 @@ public class NMClientImpl extends NMClient {
for (StartedContainer startedContainer : startedContainers.values()) {
try {
stopContainer(startedContainer.getContainerId(),
- startedContainer.getNodeId(),
- startedContainer.getContainerToken());
+ startedContainer.getNodeId());
} catch (YarnException e) {
LOG.error("Failed to stop Container " +
startedContainer.getContainerId() +
@@ -129,23 +123,29 @@ public class NMClientImpl extends NMClient {
}
}
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ cmProxy =
+ new ContainerManagementProtocolProxy(conf, nmTokens);
+ }
+
@Override
public void cleanupRunningContainersOnStop(boolean enabled) {
getCleanupRunningContainers().set(enabled);
}
-
+
protected static class StartedContainer {
private ContainerId containerId;
private NodeId nodeId;
- private Token containerToken;
- private boolean stopped;
-
+ private ContainerState state;
+
+
public StartedContainer(ContainerId containerId, NodeId nodeId,
Token containerToken) {
this.containerId = containerId;
this.nodeId = nodeId;
- this.containerToken = containerToken;
- stopped = false;
+ state = ContainerState.NEW;
}
public ContainerId getContainerId() {
@@ -155,137 +155,17 @@ public class NMClientImpl extends NMClient {
public NodeId getNodeId() {
return nodeId;
}
-
- public Token getContainerToken() {
- return containerToken;
- }
}
- protected static final class NMCommunicator extends AbstractService {
- private ContainerId containerId;
- private NodeId nodeId;
- private Token containerToken;
- private ContainerManagementProtocol containerManager;
-
- public NMCommunicator(ContainerId containerId, NodeId nodeId,
- Token containerToken) {
- super(NMCommunicator.class.getName());
- this.containerId = containerId;
- this.nodeId = nodeId;
- this.containerToken = containerToken;
- }
-
- @Override
- protected synchronized void serviceStart() throws Exception {
- final YarnRPC rpc = YarnRPC.create(getConfig());
-
- final InetSocketAddress containerAddress =
- NetUtils.createSocketAddr(nodeId.toString());
-
- // the user in createRemoteUser in this context has to be ContainerId
- UserGroupInformation currentUser =
- UserGroupInformation.createRemoteUser(containerId.toString());
-
- org.apache.hadoop.security.token.Token token =
- ConverterUtils.convertFromYarn(containerToken, containerAddress);
- currentUser.addToken(token);
-
- containerManager = currentUser
- .doAs(new PrivilegedAction() {
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
- containerAddress, getConfig());
- }
- });
-
- LOG.debug("Connecting to ContainerManager at " + containerAddress);
- super.serviceStart();
- }
-
- @Override
- protected synchronized void serviceStop() throws Exception {
- if (this.containerManager != null) {
- RPC.stopProxy(this.containerManager);
-
- if (LOG.isDebugEnabled()) {
- InetSocketAddress containerAddress =
- NetUtils.createSocketAddr(nodeId.toString());
- LOG.debug("Disconnecting from ContainerManager at " +
- containerAddress);
- }
- }
- super.serviceStop();
- }
-
- public synchronized Map startContainer(
- Container container, ContainerLaunchContext containerLaunchContext)
- throws YarnException, IOException {
- if (!container.getId().equals(containerId)) {
- throw new IllegalArgumentException(
- "NMCommunicator's containerId mismatches the given Container's");
- }
- StartContainerResponse startResponse = null;
- try {
- StartContainerRequest startRequest =
- Records.newRecord(StartContainerRequest.class);
- startRequest.setContainerToken(container.getContainerToken());
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startResponse = containerManager.startContainer(startRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Started Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn("Container " + containerId + " failed to start", e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Container " + containerId + " failed to start", e);
- throw e;
- }
- return startResponse.getAllServicesMetaData();
- }
-
- public synchronized void stopContainer() throws YarnException,
- IOException {
- try {
- StopContainerRequest stopRequest =
- Records.newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(containerId);
- containerManager.stopContainer(stopRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopped Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn("Container " + containerId + " failed to stop", e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Container " + containerId + " failed to stop", e);
- throw e;
- }
- }
-
- public synchronized ContainerStatus getContainerStatus()
- throws YarnException, IOException {
- GetContainerStatusResponse statusResponse = null;
- try {
- GetContainerStatusRequest statusRequest =
- Records.newRecord(GetContainerStatusRequest.class);
- statusRequest.setContainerId(containerId);
- statusResponse = containerManager.getContainerStatus(statusRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got the status of Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn(
- "Unable to get the status of Container " + containerId, e);
- throw e;
- } catch (IOException e) {
- LOG.warn(
- "Unable to get the status of Container " + containerId, e);
- throw e;
- }
- return statusResponse.getStatus();
+ private void addStartingContainer(StartedContainer startedContainer)
+ throws YarnException {
+ if (startedContainers.putIfAbsent(startedContainer.containerId,
+ startedContainer) != null) {
+ throw RPCUtil.getRemoteException("Container "
+ + startedContainer.containerId.toString() + " is already started");
}
+ startedContainers
+ .put(startedContainer.getContainerId(), startedContainer);
}
@Override
@@ -293,108 +173,112 @@ public class NMClientImpl extends NMClient {
Container container, ContainerLaunchContext containerLaunchContext)
throws YarnException, IOException {
// Do synchronization on StartedContainer to prevent race condition
- // between startContainer and stopContainer
- synchronized (addStartedContainer(container)) {
+ // between startContainer and stopContainer only when startContainer is
+ // in progress for a given container.
+ StartedContainer startingContainer = createStartedContainer(container);
+ synchronized (startingContainer) {
+ addStartingContainer(startingContainer);
+
Map allServiceResponse;
- NMCommunicator nmCommunicator = null;
+ ContainerManagementProtocolProxyData proxy = null;
try {
- nmCommunicator = new NMCommunicator(container.getId(),
- container.getNodeId(), container.getContainerToken());
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
+ proxy =
+ cmProxy.getProxy(container.getNodeId().toString(),
+ container.getId());
allServiceResponse =
- nmCommunicator.startContainer(container, containerLaunchContext);
+ proxy
+ .getContainerManagementProtocol().startContainer(
+ StartContainerRequest.newInstance(containerLaunchContext,
+ container.getContainerToken())).getAllServicesMetaData();
+ startingContainer.state = ContainerState.RUNNING;
} catch (YarnException e) {
+ startingContainer.state = ContainerState.COMPLETE;
// Remove the started container if it failed to start
- removeStartedContainer(container.getId());
+ removeStartedContainer(startingContainer);
throw e;
} catch (IOException e) {
- removeStartedContainer(container.getId());
+ startingContainer.state = ContainerState.COMPLETE;
+ removeStartedContainer(startingContainer);
throw e;
} catch (Throwable t) {
- removeStartedContainer(container.getId());
+ startingContainer.state = ContainerState.COMPLETE;
+ removeStartedContainer(startingContainer);
throw RPCUtil.getRemoteException(t);
} finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
}
}
return allServiceResponse;
}
-
- // Three choices:
- // 1. starting and releasing the proxy before and after each interaction
- // 2. starting the proxy when starting the container and releasing it when
- // stopping the container
- // 3. starting the proxy when starting the container and releasing it when
- // stopping the client
- // Adopt 1 currently
}
@Override
- public void stopContainer(ContainerId containerId, NodeId nodeId,
- Token containerToken) throws YarnException, IOException {
+ public void stopContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException {
StartedContainer startedContainer = getStartedContainer(containerId);
- if (startedContainer == null) {
- throw RPCUtil.getRemoteException("Container " + containerId +
- " is either not started yet or already stopped");
- }
+
// Only allow one request of stopping the container to move forward
// When entering the block, check whether the precursor has already stopped
// the container
- synchronized (startedContainer) {
- if (startedContainer.stopped) {
- return;
- }
- NMCommunicator nmCommunicator = null;
- try {
- nmCommunicator =
- new NMCommunicator(containerId, nodeId, containerToken);
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
- nmCommunicator.stopContainer();
- } finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (startedContainer != null) {
+ synchronized (startedContainer) {
+ if (startedContainer.state != ContainerState.RUNNING) {
+ return;
}
- startedContainer.stopped = true;
- removeStartedContainer(containerId);
+ stopContainerInternal(containerId, nodeId);
+ // Only after successful
+ startedContainer.state = ContainerState.COMPLETE;
+ removeStartedContainer(startedContainer);
}
+ } else {
+ stopContainerInternal(containerId, nodeId);
}
+
}
@Override
public ContainerStatus getContainerStatus(ContainerId containerId,
- NodeId nodeId, Token containerToken)
- throws YarnException, IOException {
- NMCommunicator nmCommunicator = null;
+ NodeId nodeId) throws YarnException, IOException {
+
+ ContainerManagementProtocolProxyData proxy = null;
try {
- nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
- ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ ContainerStatus containerStatus =
+ proxy.getContainerManagementProtocol().getContainerStatus(
+ GetContainerStatusRequest.newInstance(containerId)).getStatus();
return containerStatus;
} finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
}
}
}
- protected synchronized StartedContainer addStartedContainer(
- Container container) throws YarnException, IOException {
- if (startedContainers.containsKey(container.getId())) {
- throw RPCUtil.getRemoteException("Container " + container.getId() +
- " is already started");
+ private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
+ throws IOException, YarnException {
+ ContainerManagementProtocolProxyData proxy = null;
+ try {
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ proxy.getContainerManagementProtocol().stopContainer(
+ StopContainerRequest.newInstance(containerId));
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
}
+ }
+
+ protected synchronized StartedContainer createStartedContainer(
+ Container container) throws YarnException, IOException {
StartedContainer startedContainer = new StartedContainer(container.getId(),
container.getNodeId(), container.getContainerToken());
- startedContainers.put(startedContainer.getContainerId(), startedContainer);
return startedContainer;
}
- protected synchronized void removeStartedContainer(ContainerId containerId) {
- startedContainers.remove(containerId);
+ protected synchronized void
+ removeStartedContainer(StartedContainer container) {
+ startedContainers.remove(container.containerId);
}
protected synchronized StartedContainer getStartedContainer(
@@ -405,5 +289,4 @@ public class NMClientImpl extends NMClient {
public AtomicBoolean getCleanupRunningContainers() {
return cleanupRunningContainers;
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
index 4f659482b5c..0815e310181 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
@@ -232,10 +232,10 @@ public class TestNMClientAsync {
actualStartSuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
- asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+ asyncClient.getContainerStatusAsync(containerId, nodeId);
} else {
// move on to the following failure tests
- asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+ asyncClient.stopContainerAsync(containerId, nodeId);
}
// Shouldn't crash the test thread
@@ -253,7 +253,7 @@ public class TestNMClientAsync {
actualQuerySuccess.addAndGet(1);
actualQuerySuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
- asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+ asyncClient.stopContainerAsync(containerId, nodeId);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -290,7 +290,7 @@ public class TestNMClientAsync {
actualStartFailure.addAndGet(1);
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
// move on to the following failure tests
- asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+ asyncClient.getContainerStatusAsync(containerId, nodeId);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -383,33 +383,30 @@ public class TestNMClientAsync {
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.emptyMap());
- when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class))).thenReturn(
+ when(client.getContainerStatus(any(ContainerId.class),
+ any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).stopContainer(any(ContainerId.class),
- any(NodeId.class), any(Token.class));
+ any(NodeId.class));
break;
case 1:
doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
.startContainer(any(Container.class),
any(ContainerLaunchContext.class));
doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
- .getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .getContainerStatus(any(ContainerId.class), any(NodeId.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
- .stopContainer(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .stopContainer(any(ContainerId.class), any(NodeId.class));
break;
case 2:
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.emptyMap());
- when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class))).thenReturn(
+ when(client.getContainerStatus(any(ContainerId.class),
+ any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
- .stopContainer(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .stopContainer(any(ContainerId.class), any(NodeId.class));
}
return client;
}
@@ -437,8 +434,7 @@ public class TestNMClientAsync {
t.start();
barrierA.await();
- asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ asyncClient.stopContainerAsync(container.getId(), container.getNodeId());
barrierC.await();
Assert.assertFalse("Starting and stopping should be out of order",
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index 232903b4eef..309b5afb882 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,10 +47,12 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
@@ -75,7 +78,8 @@ public class TestNMClient {
List nodeReports = null;
ApplicationAttemptId attemptId = null;
int nodeCount = 3;
-
+ ConcurrentHashMap nmTokens;
+
@Before
public void setup() throws YarnException, IOException {
// start minicluster
@@ -140,6 +144,7 @@ public class TestNMClient {
if (iterationsLeft == 0) {
fail("Application hasn't bee started");
}
+ nmTokens = new ConcurrentHashMap();
// start am rm client
rmClient =
@@ -151,7 +156,7 @@ public class TestNMClient {
assertEquals(STATE.STARTED, rmClient.getServiceState());
// start am nm client
- nmClient = (NMClientImpl) NMClient.createNMClient();
+ nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens);
nmClient.init(conf);
nmClient.start();
assertNotNull(nmClient);
@@ -194,14 +199,13 @@ public class TestNMClient {
assertEquals(0, nmClient.startedContainers.size());
}
- @Test (timeout = 60000)
+ @Test (timeout = 200000)
public void testNMClient()
throws YarnException, IOException {
-
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-
+
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
// stop the running containers on close
@@ -243,6 +247,11 @@ public class TestNMClient {
for(Container container : allocResponse.getAllocatedContainers()) {
containers.add(container);
}
+ if (!allocResponse.getNMTokens().isEmpty()) {
+ for (NMToken token : allocResponse.getNMTokens()) {
+ nmTokens.put(token.getNodeId().toString(), token.getToken());
+ }
+ }
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
@@ -261,8 +270,7 @@ public class TestNMClient {
// getContainerStatus shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
- nmClient.getContainerStatus(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.getContainerStatus(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
@@ -272,12 +280,11 @@ public class TestNMClient {
// stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown
try {
- nmClient.stopContainer(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.stopContainer(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
if (!e.getMessage()
- .contains("is either not started yet or already stopped")) {
+ .contains("is not handled by this NodeManager")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e).initCause(
e));
@@ -306,8 +313,7 @@ public class TestNMClient {
-1000);
try {
- nmClient.stopContainer(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.stopContainer(container.getId(), container.getNodeId());
} catch (YarnException e) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
@@ -335,8 +341,7 @@ public class TestNMClient {
while (true) {
try {
ContainerStatus status = nmClient.getContainerStatus(
- container.getId(), container.getNodeId(),
- container.getContainerToken());
+ container.getId(), container.getNodeId());
// NodeManager may still need some time to get the stable
// container status
if (status.getState() == state) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
index 1f14b2ab07e..67dfddbc600 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
@@ -55,7 +55,7 @@ public class ContainerManagerSecurityInfo extends SecurityInfo {
@Override
public Class extends TokenSelector extends TokenIdentifier>>
value() {
- return ContainerTokenSelector.class;
+ return NMTokenSelector.class;
}
};
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
index 6071ba35382..55ddd0ac632 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
@@ -21,21 +21,17 @@ package org.apache.hadoop.yarn.security;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
@Public
@Evolving
@@ -48,14 +44,14 @@ public class NMTokenIdentifier extends TokenIdentifier {
private ApplicationAttemptId appAttemptId;
private NodeId nodeId;
private String appSubmitter;
- private int masterKeyId;
+ private int keyId;
public NMTokenIdentifier(ApplicationAttemptId appAttemptId, NodeId nodeId,
String applicationSubmitter, int masterKeyId) {
this.appAttemptId = appAttemptId;
this.nodeId = nodeId;
this.appSubmitter = applicationSubmitter;
- this.masterKeyId = masterKeyId;
+ this.keyId = masterKeyId;
}
/**
@@ -76,8 +72,8 @@ public class NMTokenIdentifier extends TokenIdentifier {
return appSubmitter;
}
- public int getMastKeyId() {
- return masterKeyId;
+ public int getKeyId() {
+ return keyId;
}
@Override
@@ -89,7 +85,7 @@ public class NMTokenIdentifier extends TokenIdentifier {
out.writeInt(appAttemptId.getAttemptId());
out.writeUTF(this.nodeId.toString());
out.writeUTF(this.appSubmitter);
- out.writeInt(this.masterKeyId);
+ out.writeInt(this.keyId);
}
@Override
@@ -101,7 +97,7 @@ public class NMTokenIdentifier extends TokenIdentifier {
String[] hostAddr = in.readUTF().split(":");
nodeId = NodeId.newInstance(hostAddr[0], Integer.parseInt(hostAddr[1]));
appSubmitter = in.readUTF();
- masterKeyId = in.readInt();
+ keyId = in.readInt();
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
new file mode 100644
index 00000000000..c57e4a2150e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.security;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class NMTokenSelector implements
+ TokenSelector {
+
+ private static final Log LOG = LogFactory
+ .getLog(NMTokenSelector.class);
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Token selectToken(Text service,
+ Collection> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token extends TokenIdentifier> token : tokens) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Looking for service: " + service + ". Current token is "
+ + token);
+ }
+ if (NMTokenIdentifier.KIND.equals(token.getKind()) &&
+ service.equals(token.getService())) {
+ return (Token) token;
+ }
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
index 427f920776e..d860461467e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -15,3 +15,4 @@ org.apache.hadoop.yarn.security.ContainerTokenIdentifier
org.apache.hadoop.yarn.security.AMRMTokenIdentifier
org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier
org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier
+org.apache.hadoop.yarn.security.NMTokenIdentifier
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e2457cb7264..72845384857 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -659,7 +659,25 @@
yarn.client.nodemanager-client-async.thread-pool-max-size
500
-
+
+
+
+ Maximum number of proxy connections for node manager. It should always be
+ more than 1. NMClient and MRAppMaster will use this to cache connection
+ with node manager. There will be at max one connection per node manager.
+ Ex. configuring it to a value of 5 will make sure that client will at
+ max have 5 connections cached with 5 different node managers. These
+ connections will be timed out if idle for more than system wide idle
+ timeout period. The token if used for authentication then it will be used
+ only at connection creation time. If new token is received then earlier
+ connection should be closed in order to use newer token. This and
+ (yarn.client.nodemanager-client-async.thread-pool-max-size) are related
+ and should be sync (no need for them to be equal).
+
+ yarn.client.max-nodemanagers-proxies
+ 500
+
+
yarn.nodemanager.aux-services.mapreduce.shuffle.class
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
index 1094541187d..01da1af8aa3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
@@ -114,4 +114,38 @@ public class BaseNMTokenSecretManager extends
public NMTokenIdentifier createIdentifier() {
return new NMTokenIdentifier();
}
+
+ /**
+ * Helper function for creating NMTokens.
+ */
+ public Token createNMToken(ApplicationAttemptId applicationAttemptId,
+ NodeId nodeId, String applicationSubmitter) {
+ byte[] password;
+ NMTokenIdentifier identifier;
+
+ this.readLock.lock();
+ try {
+ identifier =
+ new NMTokenIdentifier(applicationAttemptId, nodeId,
+ applicationSubmitter, this.currentMasterKey.getMasterKey()
+ .getKeyId());
+ password = this.createPassword(identifier);
+ } finally {
+ this.readLock.unlock();
+ }
+ return newInstance(password, identifier);
+ }
+
+ public static Token newInstance(byte[] password,
+ NMTokenIdentifier identifier) {
+ NodeId nodeId = identifier.getNodeId();
+ // RPC layer client expects ip:port as service for tokens
+ InetSocketAddress addr =
+ NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+ Token nmToken =
+ Token.newInstance(identifier.getBytes(),
+ NMTokenIdentifier.KIND.toString(), password, SecurityUtil
+ .buildTokenService(addr).toString());
+ return nmToken;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index f21a60f1769..b6c59b61d9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -27,6 +27,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -63,6 +64,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Builder utilities to construct various objects.
*
@@ -152,7 +155,8 @@ public class BuilderUtils {
int port, String user, Resource r, long expiryTime, int masterKeyId,
byte[] password, long rmIdentifier) throws IOException {
ContainerTokenIdentifier identifier =
- new ContainerTokenIdentifier(cId, host, user, r, expiryTime,
+ new ContainerTokenIdentifier(cId, host + ":" + port, user, r,
+ expiryTime,
masterKeyId, rmIdentifier);
return newContainerToken(BuilderUtils.newNodeId(host, port), password,
identifier);
@@ -228,6 +232,8 @@ public class BuilderUtils {
return newToken(Token.class, identifier, kind, password, service);
}
+ @Private
+ @VisibleForTesting
public static Token newContainerToken(NodeId nodeId,
byte[] password, ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index e5a64cd96b3..8ec3a5a70cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -435,7 +435,7 @@ public class NodeManager extends CompositeService
}
@VisibleForTesting
- Context getNMContext() {
+ public Context getNMContext() {
return this.context;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 69a5006a0d7..c6ab545ebb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.service.Service.STATE.STARTED;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +39,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
@@ -65,6 +67,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -234,7 +237,7 @@ public class ContainerManagerImpl extends CompositeService implements
server =
rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, conf,
- this.context.getContainerTokenSecretManager(),
+ this.context.getNMTokenSecretManager(),
conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
@@ -253,6 +256,8 @@ public class ContainerManagerImpl extends CompositeService implements
NodeId nodeId = NodeId.newInstance(connectAddress.getHostName(),
connectAddress.getPort());
((NodeManager.NMContext)context).setNodeId(nodeId);
+ this.context.getNMTokenSecretManager().setNodeId(nodeId);
+ this.context.getContainerTokenSecretManager().setNodeId(nodeId);
LOG.info("ContainerManager started at " + connectAddress);
super.serviceStart();
}
@@ -274,7 +279,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
// Get the remoteUGI corresponding to the api call.
- private UserGroupInformation getRemoteUgi()
+ protected UserGroupInformation getRemoteUgi()
throws YarnException {
UserGroupInformation remoteUgi;
try {
@@ -291,91 +296,67 @@ public class ContainerManagerImpl extends CompositeService implements
// Obtain the needed ContainerTokenIdentifier from the remote-UGI. RPC layer
// currently sets only the required id, but iterate through anyways just to
// be sure.
- private ContainerTokenIdentifier selectContainerTokenIdentifier(
+ @Private
+ @VisibleForTesting
+ protected NMTokenIdentifier selectNMTokenIdentifier(
UserGroupInformation remoteUgi) {
Set tokenIdentifiers = remoteUgi.getTokenIdentifiers();
- ContainerTokenIdentifier resultId = null;
+ NMTokenIdentifier resultId = null;
for (TokenIdentifier id : tokenIdentifiers) {
- if (id instanceof ContainerTokenIdentifier) {
- resultId = (ContainerTokenIdentifier) id;
+ if (id instanceof NMTokenIdentifier) {
+ resultId = (NMTokenIdentifier) id;
break;
}
}
return resultId;
}
- @Private
- @VisibleForTesting
- protected ContainerTokenIdentifier getContainerTokenIdentifier(
- UserGroupInformation remoteUgi,
- ContainerTokenIdentifier containerTokenIdentifier)
- throws YarnException {
- if (UserGroupInformation.isSecurityEnabled()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
- + remoteUgi.getTokenIdentifiers().size());
- }
- // Get the tokenId from the remote user ugi
- return selectContainerTokenIdentifier(remoteUgi);
- } else {
- return containerTokenIdentifier;
- }
- }
-
/**
- * Authorize the request.
- *
- * @param containerIDStr
- * of the container
- * @param launchContext
- * passed if verifying the startContainer, null otherwise.
- * @param remoteUgi
+ * @param containerTokenIdentifier
+ * of the container to be started
+ * @param ugi
* ugi corresponding to the remote end making the api-call
* @throws YarnException
*/
@Private
@VisibleForTesting
- protected void authorizeRequest(String containerIDStr,
- ContainerLaunchContext launchContext,
- UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
- throws YarnException {
+ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ UserGroupInformation ugi) throws YarnException {
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ String containerIDStr = containerId.toString();
boolean unauthorized = false;
StringBuilder messageBuilder =
new StringBuilder("Unauthorized request to start container. ");
-
- if (!remoteUgi.getUserName().equals(containerIDStr)) {
+ if (!nmTokenIdentifier.getApplicationAttemptId().equals(
+ containerId.getApplicationAttemptId())) {
unauthorized = true;
- messageBuilder.append("\nExpected containerId: "
- + remoteUgi.getUserName() + " Found: " + containerIDStr);
- } else if (launchContext != null) {
- // Verify other things also for startContainer() request.
-
-
- if (tokenId == null) {
- unauthorized = true;
- messageBuilder
- .append("\nNo ContainerToken found for " + containerIDStr);
- } else {
-
- // Is the container being relaunched? Or RPC layer let startCall with
- // tokens generated off old-secret through?
- if (!this.context.getContainerTokenSecretManager()
- .isValidStartContainerRequest(tokenId.getContainerID())) {
- unauthorized = true;
- messageBuilder.append("\n Attempt to relaunch the same "
- + "container with id " + containerIDStr + ".");
- }
-
- // Ensure the token is not expired.
- // Token expiry is not checked for stopContainer/getContainerStatus
- if (tokenId.getExpiryTimeStamp() < System.currentTimeMillis()) {
- unauthorized = true;
- messageBuilder.append("\nThis token is expired. current time is "
- + System.currentTimeMillis() + " found "
- + tokenId.getExpiryTimeStamp());
- }
- }
+ messageBuilder.append("\nNMToken for application attempt : ")
+ .append(nmTokenIdentifier.getApplicationAttemptId())
+ .append(" was used for starting container with container token")
+ .append(" issued for application attempt : ")
+ .append(containerId.getApplicationAttemptId());
+ } else if (!ugi.getUserName().equals(
+ nmTokenIdentifier.getApplicationAttemptId().toString())) {
+ unauthorized = true;
+ messageBuilder.append("\nExpected applicationAttemptId: ")
+ .append(ugi.getUserName()).append(" Found: ")
+ .append(nmTokenIdentifier.getApplicationAttemptId().toString());
+ } else if (!this.context.getContainerTokenSecretManager()
+ .isValidStartContainerRequest(containerId)) {
+ // Is the container being relaunched? Or RPC layer let startCall with
+ // tokens generated off old-secret through?
+ unauthorized = true;
+ messageBuilder.append("\n Attempt to relaunch the same ")
+ .append("container with id ").append(containerIDStr).append(".");
+ } else if (containerTokenIdentifier.getExpiryTimeStamp() < System
+ .currentTimeMillis()) {
+ // Ensure the token is not expired.
+ unauthorized = true;
+ messageBuilder.append("\nThis token is expired. current time is ")
+ .append(System.currentTimeMillis()).append(" found ")
+ .append(containerTokenIdentifier.getExpiryTimeStamp());
}
if (unauthorized) {
@@ -384,7 +365,7 @@ public class ContainerManagerImpl extends CompositeService implements
throw RPCUtil.getRemoteException(msg);
}
}
-
+
/**
* Start a container on this NodeManager.
*/
@@ -395,44 +376,133 @@ public class ContainerManagerImpl extends CompositeService implements
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
- "Rejecting new containers as NodeManager has not" +
- " yet connected with ResourceManager");
+ "Rejecting new containers as NodeManager has not"
+ + " yet connected with ResourceManager");
}
+ /*
+ * 1) It should save the NMToken into NMTokenSecretManager. This is done
+ * here instead of RPC layer because at the time of opening/authenticating
+ * the connection it doesn't know what all RPC calls user will make on it.
+ * Also new NMToken is issued only at startContainer (once it gets renewed).
+ *
+ * 2) It should validate containerToken. Need to check below things. a) It
+ * is signed by correct master key (part of retrieve password). b) It
+ * belongs to correct Node Manager (part of retrieve password). c) It has
+ * correct RMIdentifier. d) It is not expired.
+ */
+ // update NMToken
+
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+
+ // Validate containerToken
+ ContainerTokenIdentifier containerTokenIdentifier =
+ verifyAndGetContainerTokenIdentifier(request.getContainerToken());
+
+ authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier,
+ remoteUgi);
+
+ if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater
+ .getRMIdentifier()) {
+ // Is the container coming from unknown RM
+ StringBuilder sb = new StringBuilder("\nContainer ");
+ sb.append(containerTokenIdentifier.getContainerID().toString())
+ .append(" rejected as it is allocated by a previous RM");
+ throw new InvalidContainerException(sb.toString());
+ }
+
+ updateNMTokenIdentifier(nmTokenIdentifier);
+
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ String containerIdStr = containerId.toString();
+ String user = containerTokenIdentifier.getApplicationSubmitter();
+
+ LOG.info("Start request for " + containerIdStr + " by user " + user);
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
- org.apache.hadoop.yarn.api.records.Token token = request.getContainerToken();
- ContainerTokenIdentifier tokenIdentifier = null;
+ Credentials credentials = parseCredentials(launchContext);
+
+ Container container =
+ new ContainerImpl(getConfig(), this.dispatcher, launchContext,
+ credentials, metrics, containerTokenIdentifier);
+ ApplicationId applicationID =
+ containerId.getApplicationAttemptId().getApplicationId();
+ if (context.getContainers().putIfAbsent(containerId, container) != null) {
+ NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER,
+ "ContainerManagerImpl", "Container already running on this node!",
+ applicationID, containerId);
+ throw RPCUtil.getRemoteException("Container " + containerIdStr
+ + " already is running on this node!!");
+ }
+
+ // Create the application
+ Application application =
+ new ApplicationImpl(dispatcher, this.aclsManager, user, applicationID,
+ credentials, context);
+ if (null == context.getApplications().putIfAbsent(applicationID,
+ application)) {
+ LOG.info("Creating a new application reference for app " + applicationID);
+
+ dispatcher.getEventHandler().handle(
+ new ApplicationInitEvent(applicationID, container.getLaunchContext()
+ .getApplicationACLs()));
+ }
+
+ dispatcher.getEventHandler().handle(
+ new ApplicationContainerInitEvent(container));
+
+ this.context.getContainerTokenSecretManager().startContainerSuccessful(
+ containerTokenIdentifier);
+ NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
+ "ContainerManageImpl", applicationID, containerId);
+ StartContainerResponse response =
+ recordFactory.newRecordInstance(StartContainerResponse.class);
+ response.setAllServicesMetaData(auxiliaryServices.getMetaData());
+ // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+ // launch. A finished Application will not launch containers.
+ metrics.launchedContainer();
+ metrics.allocateContainer(containerTokenIdentifier.getResource());
+ return response;
+ }
+
+ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
+ org.apache.hadoop.yarn.api.records.Token token) throws YarnException,
+ InvalidToken {
+ ContainerTokenIdentifier containerTokenIdentifier = null;
try {
- tokenIdentifier = BuilderUtils.newContainerTokenIdentifier(token);
+ containerTokenIdentifier =
+ BuilderUtils.newContainerTokenIdentifier(token);
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
-
- UserGroupInformation remoteUgi = getRemoteUgi();
- ContainerTokenIdentifier tokenId =
- getContainerTokenIdentifier(remoteUgi, tokenIdentifier);
-
- ContainerId containerID = tokenId.getContainerID();
- String containerIDStr = containerID.toString();
-
- authorizeRequest(containerIDStr, launchContext, remoteUgi, tokenId);
-
- // Is the container coming from unknown RM
- if (tokenId.getRMIdentifer() != nodeStatusUpdater
- .getRMIdentifier()) {
- String msg = "\nContainer "+ containerIDStr
- + " rejected as it is allocated by a previous RM";
- LOG.error(msg);
- throw new InvalidContainerException(msg);
+ byte[] password =
+ context.getContainerTokenSecretManager().retrievePassword(
+ containerTokenIdentifier);
+ byte[] tokenPass = token.getPassword().array();
+ if (password == null || tokenPass == null
+ || !Arrays.equals(password, tokenPass)) {
+ throw new InvalidToken(
+ "Invalid container token used for starting container on : "
+ + context.getNodeId().toString());
}
+ return containerTokenIdentifier;
+ }
- LOG.info("Start request for " + containerIDStr + " by user "
- + tokenId.getApplicationSubmitter());
+ @Private
+ @VisibleForTesting
+ protected void updateNMTokenIdentifier(NMTokenIdentifier nmTokenIdentifier)
+ throws InvalidToken {
+ context.getNMTokenSecretManager().appAttemptStartContainer(
+ nmTokenIdentifier);
+ }
+ private Credentials parseCredentials(ContainerLaunchContext launchContext)
+ throws YarnException {
+ Credentials credentials = new Credentials();
// //////////// Parse credentials
ByteBuffer tokens = launchContext.getTokens();
- Credentials credentials = new Credentials();
+
if (tokens != null) {
DataInputByteBuffer buf = new DataInputByteBuffer();
tokens.rewind();
@@ -440,8 +510,7 @@ public class ContainerManagerImpl extends CompositeService implements
try {
credentials.readTokenStorageStream(buf);
if (LOG.isDebugEnabled()) {
- for (Token extends TokenIdentifier> tk : credentials
- .getAllTokens()) {
+ for (Token extends TokenIdentifier> tk : credentials.getAllTokens()) {
LOG.debug(tk.getService() + " = " + tk.toString());
}
}
@@ -450,53 +519,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
// //////////// End of parsing credentials
- String user = tokenId.getApplicationSubmitter();
-
- Container container =
- new ContainerImpl(getConfig(), this.dispatcher, launchContext,
- credentials, metrics, tokenId);
- ApplicationId applicationID =
- containerID.getApplicationAttemptId().getApplicationId();
- if (context.getContainers().putIfAbsent(containerID, container) != null) {
- NMAuditLogger.logFailure(user,
- AuditConstants.START_CONTAINER, "ContainerManagerImpl",
- "Container already running on this node!",
- applicationID, containerID);
- throw RPCUtil.getRemoteException("Container " + containerIDStr
- + " already is running on this node!!");
- }
-
- // Create the application
- Application application =
- new ApplicationImpl(dispatcher, this.aclsManager,
- user, applicationID, credentials,
- context);
- if (null ==
- context.getApplications().putIfAbsent(applicationID, application)) {
- LOG.info("Creating a new application reference for app "
- + applicationID);
- dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, container
- .getLaunchContext().getApplicationACLs()));
- }
-
- // TODO: Validate the request
- dispatcher.getEventHandler().handle(
- new ApplicationContainerInitEvent(container));
-
- this.context.getContainerTokenSecretManager().startContainerSuccessful(
- tokenId);
- NMAuditLogger.logSuccess(user,
- AuditConstants.START_CONTAINER, "ContainerManageImpl",
- applicationID, containerID);
-
- StartContainerResponse response =
- StartContainerResponse.newInstance(auxiliaryServices.getMetaData());
- // TODO launchedContainer misplaced -> doesn't necessarily mean a container
- // launch. A finished Application will not launch containers.
- metrics.launchedContainer();
- metrics.allocateContainer(tokenId.getResource());
- return response;
+ return credentials;
}
/**
@@ -509,34 +532,20 @@ public class ContainerManagerImpl extends CompositeService implements
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();
-
- // TODO: Only the container's owner can kill containers today.
-
- UserGroupInformation remoteUgi = getRemoteUgi();
Container container = this.context.getContainers().get(containerID);
+ LOG.info("Getting container-status for " + containerIDStr);
+ authorizeGetAndStopContainerRequest(containerID, container, true);
+
StopContainerResponse response =
recordFactory.newRecordInstance(StopContainerResponse.class);
- if (container == null) {
- LOG.warn("Trying to stop unknown container " + containerID);
- NMAuditLogger.logFailure("UnknownUser",
- AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
- "Trying to stop unknown container!",
- containerID.getApplicationAttemptId().getApplicationId(),
- containerID);
- return response; // Return immediately.
- }
- authorizeRequest(containerIDStr, null, remoteUgi,
- getContainerTokenIdentifier(remoteUgi, container.getContainerTokenIdentifier()));
-
dispatcher.getEventHandler().handle(
- new ContainerKillEvent(containerID,
- "Container killed by the ApplicationMaster."));
-
- NMAuditLogger.logSuccess(container.getUser(),
- AuditConstants.STOP_CONTAINER, "ContainerManageImpl",
- containerID.getApplicationAttemptId().getApplicationId(),
- containerID);
+ new ContainerKillEvent(containerID,
+ "Container killed by the ApplicationMaster."));
+
+ NMAuditLogger.logSuccess(container.getUser(),
+ AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
+ .getApplicationAttemptId().getApplicationId(), containerID);
// TODO: Move this code to appropriate place once kill_container is
// implemented.
@@ -547,23 +556,14 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public GetContainerStatusResponse getContainerStatus(
- GetContainerStatusRequest request) throws YarnException,
- IOException {
+ GetContainerStatusRequest request) throws YarnException, IOException {
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();
-
- // TODO: Only the container's owner can get containers' status today.
-
- UserGroupInformation remoteUgi = getRemoteUgi();
- LOG.info("Getting container-status for " + containerIDStr);
Container container = this.context.getContainers().get(containerID);
- if (container == null) {
- throw RPCUtil.getRemoteException("Container " + containerIDStr
- + " is not handled by this NodeManager");
- }
- authorizeRequest(containerIDStr, null, remoteUgi,
- getContainerTokenIdentifier(remoteUgi, container.getContainerTokenIdentifier()));
+
+ LOG.info("Getting container-status for " + containerIDStr);
+ authorizeGetAndStopContainerRequest(containerID, container, false);
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
LOG.info("Returning " + containerStatus);
@@ -573,6 +573,48 @@ public class ContainerManagerImpl extends CompositeService implements
return response;
}
+ @Private
+ @VisibleForTesting
+ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+ Container container, boolean stopRequest) throws YarnException {
+
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+
+ /*
+ * For get/stop container status; we need to verify that 1) User (NMToken)
+ * application attempt only has started container. 2) Requested containerId
+ * belongs to the same application attempt (NMToken) which was used. (Note:-
+ * This will prevent user in knowing another application's containers).
+ */
+
+ if ((!identifier.getApplicationAttemptId().equals(
+ containerId.getApplicationAttemptId()))
+ || (container != null && !identifier.getApplicationAttemptId().equals(
+ container.getContainerId().getApplicationAttemptId()))) {
+ if (stopRequest) {
+ LOG.warn(identifier.getApplicationAttemptId()
+ + " attempted to stop non-application container : "
+ + container.getContainerId().toString());
+ NMAuditLogger.logFailure("UnknownUser", AuditConstants.STOP_CONTAINER,
+ "ContainerManagerImpl", "Trying to stop unknown container!",
+ identifier.getApplicationAttemptId().getApplicationId(),
+ container.getContainerId());
+ } else {
+ LOG.warn(identifier.getApplicationAttemptId()
+ + " attempted to get get status for non-application container : "
+ + container.getContainerId().toString());
+ }
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " is not started by this application attempt.");
+ }
+
+ if (container == null) {
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " is not handled by this NodeManager");
+ }
+ }
+
class ContainerEventDispatcher implements EventHandler {
@Override
public void handle(ContainerEvent event) {
@@ -643,9 +685,19 @@ public class ContainerManagerImpl extends CompositeService implements
this.blockNewContainerRequests.set(blockNewContainerRequests);
}
+ @Private
+ @VisibleForTesting
+ public boolean getBlockNewContainerRequestsStatus() {
+ return this.blockNewContainerRequests.get();
+ }
+
@Override
public void stateChanged(Service service) {
// TODO Auto-generated method stub
}
+
+ public Context getContext() {
+ return this.context;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java
index 3f77059e570..5eff43b936e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java
@@ -18,26 +18,32 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security;
+import javax.crypto.SecretKey;
+
import org.apache.hadoop.security.token.SecretManager;
public class LocalizerTokenSecretManager extends
SecretManager {
+ private final SecretKey secretKey;
+
+ public LocalizerTokenSecretManager() {
+ this.secretKey = generateSecret();
+ }
+
@Override
protected byte[] createPassword(LocalizerTokenIdentifier identifier) {
- return "testing".getBytes();
+ return createPassword(identifier.getBytes(), secretKey);
}
@Override
public byte[] retrievePassword(LocalizerTokenIdentifier identifier)
throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
- // TODO Auto-generated method stub
- return "testing".getBytes();
+ return createPassword(identifier.getBytes(), secretKey);
}
@Override
public LocalizerTokenIdentifier createIdentifier() {
- // TODO Auto-generated method stub
return new LocalizerTokenIdentifier();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
index c5e00f25faa..bc349f4960f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
@@ -30,13 +30,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* The NM maintains only two master-keys. The current key that RM knows and the
* key from the previous rolling-interval.
@@ -51,6 +50,7 @@ public class NMContainerTokenSecretManager extends
private MasterKeyData previousMasterKey;
private final Map> oldMasterKeys;
+ private String nodeHostAddr;
public NMContainerTokenSecretManager(Configuration conf) {
super(conf);
@@ -122,6 +122,15 @@ public class NMContainerTokenSecretManager extends
masterKeyToUse = this.oldMasterKeys.get(appId).get(containerId);
}
+ if (nodeHostAddr != null
+ && !identifier.getNmHostAddress().equals(nodeHostAddr)) {
+ // Valid container token used for incorrect node.
+ throw new SecretManager.InvalidToken("Given Container "
+ + identifier.getContainerID().toString()
+ + " identifier is not valid for current Node manager. Expected : "
+ + nodeHostAddr + " Found : " + identifier.getNmHostAddress());
+ }
+
if (masterKeyToUse != null) {
return retrievePasswordInternal(identifier, masterKeyToUse);
}
@@ -186,4 +195,9 @@ public class NMContainerTokenSecretManager extends
public synchronized void appFinished(ApplicationId appId) {
this.oldMasterKeys.remove(appId);
}
+
+ public synchronized void setNodeId(NodeId nodeId) {
+ nodeHostAddr = nodeId.toString();
+ LOG.info("Updating node address : " + nodeHostAddr);
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
index 28c40e6d6fe..9569fdc08c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
@@ -18,18 +18,24 @@
package org.apache.hadoop.yarn.server.nodemanager.security;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import com.google.common.annotations.VisibleForTesting;
+
public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
private static final Log LOG = LogFactory
@@ -38,10 +44,15 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
private MasterKeyData previousMasterKey;
private final Map oldMasterKeys;
+ private final Map> appToAppAttemptMap;
+ private NodeId nodeId;
+
public NMTokenSecretManagerInNM() {
this.oldMasterKeys =
new HashMap();
+ appToAppAttemptMap =
+ new HashMap>();
}
/**
@@ -69,46 +80,117 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
}
/**
- * This method will be used to verify NMTokens generated by different
- * master keys.
+ * This method will be used to verify NMTokens generated by different master
+ * keys.
*/
@Override
- public synchronized byte[] retrievePassword(
- NMTokenIdentifier identifier) throws InvalidToken {
- int keyId = identifier.getMastKeyId();
+ public synchronized byte[] retrievePassword(NMTokenIdentifier identifier)
+ throws InvalidToken {
+ int keyId = identifier.getKeyId();
ApplicationAttemptId appAttemptId = identifier.getApplicationAttemptId();
-
+
/*
- * MasterKey used for retrieving password will be as follows.
- * 1) By default older saved master key will be used.
- * 2) If identifier's master key id matches that of previous master key
- * id then previous key will be used.
- * 3) If identifier's master key id matches that of current master key
- * id then current key will be used.
+ * MasterKey used for retrieving password will be as follows. 1) By default
+ * older saved master key will be used. 2) If identifier's master key id
+ * matches that of previous master key id then previous key will be used. 3)
+ * If identifier's master key id matches that of current master key id then
+ * current key will be used.
*/
MasterKeyData oldMasterKey = oldMasterKeys.get(appAttemptId);
MasterKeyData masterKeyToUse = oldMasterKey;
if (previousMasterKey != null
&& keyId == previousMasterKey.getMasterKey().getKeyId()) {
masterKeyToUse = previousMasterKey;
- } else if ( keyId == currentMasterKey.getMasterKey().getKeyId()) {
+ } else if (keyId == currentMasterKey.getMasterKey().getKeyId()) {
masterKeyToUse = currentMasterKey;
}
+ if (nodeId != null && !identifier.getNodeId().equals(nodeId)) {
+ throw new InvalidToken("Given NMToken for application : "
+ + appAttemptId.toString() + " is not valid for current node manager."
+ + "expected : " + nodeId.toString() + " found : "
+ + identifier.getNodeId().toString());
+ }
+
if (masterKeyToUse != null) {
byte[] password = retrivePasswordInternal(identifier, masterKeyToUse);
- if (masterKeyToUse.getMasterKey().getKeyId() != oldMasterKey
- .getMasterKey().getKeyId()) {
- oldMasterKeys.put(appAttemptId, masterKeyToUse);
- }
+ LOG.debug("NMToken password retrieved successfully!!");
return password;
}
-
+
throw new InvalidToken("Given NMToken for application : "
+ appAttemptId.toString() + " seems to have been generated illegally.");
}
+
+ public synchronized void appFinished(ApplicationId appId) {
+ List appAttemptList = appToAppAttemptMap.get(appId);
+ if (appAttemptList != null) {
+ LOG.debug("Removing application attempts NMToken keys for application "
+ + appId);
+ for (ApplicationAttemptId appAttemptId : appAttemptList) {
+ this.oldMasterKeys.remove(appAttemptId);
+ }
+ appToAppAttemptMap.remove(appId);
+ } else {
+ LOG.error("No application Attempt for application : " + appId
+ + " started on this NM.");
+ }
+ }
+
+ /**
+ * This will be called by startContainer. It will add the master key into
+ * the cache used for starting this container. This should be called before
+ * validating the startContainer request.
+ */
+ public synchronized void appAttemptStartContainer(
+ NMTokenIdentifier identifier)
+ throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+ ApplicationAttemptId appAttemptId = identifier.getApplicationAttemptId();
+ if (!appToAppAttemptMap.containsKey(appAttemptId.getApplicationId())) {
+ // First application attempt for the given application
+ appToAppAttemptMap.put(appAttemptId.getApplicationId(),
+ new ArrayList());
+ }
+ MasterKeyData oldKey = oldMasterKeys.get(appAttemptId);
+
+ if (oldKey == null) {
+ // This is a new application attempt.
+ appToAppAttemptMap.get(appAttemptId.getApplicationId()).add(appAttemptId);
+ }
+ if (oldKey == null
+ || oldKey.getMasterKey().getKeyId() != identifier.getKeyId()) {
+ // Update key only if it is modified.
+ LOG.debug("NMToken key updated for application attempt : "
+ + identifier.getApplicationAttemptId().toString());
+ if (identifier.getKeyId() == currentMasterKey.getMasterKey()
+ .getKeyId()) {
+ oldMasterKeys.put(appAttemptId, currentMasterKey);
+ } else if (previousMasterKey != null
+ && identifier.getKeyId() == previousMasterKey.getMasterKey()
+ .getKeyId()) {
+ oldMasterKeys.put(appAttemptId, previousMasterKey);
+ } else {
+ throw new InvalidToken(
+ "Older NMToken should not be used while starting the container.");
+ }
+ }
+ }
- public synchronized void appFinished(ApplicationAttemptId appAttemptId) {
- this.oldMasterKeys.remove(appAttemptId);
+ public synchronized void setNodeId(NodeId nodeId) {
+ LOG.debug("updating nodeId : " + nodeId);
+ this.nodeId = nodeId;
+ }
+
+ @Private
+ @VisibleForTesting
+ public synchronized boolean
+ isAppAttemptNMTokenKeyPresent(ApplicationAttemptId appAttemptId) {
+ return oldMasterKeys.containsKey(appAttemptId);
+ }
+
+ @Private
+ @VisibleForTesting
+ public synchronized NodeId getNodeId() {
+ return this.nodeId;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index d83f9b6ac62..76ef074778f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -27,10 +27,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -125,6 +127,19 @@ public class DummyContainerManager extends ContainerManagerImpl {
};
}
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, getContext()
+ .getNodeId(), "testuser", getContext().getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+
@Override
@SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
@@ -179,17 +194,16 @@ public class DummyContainerManager extends ContainerManagerImpl {
}
@Override
- protected void authorizeRequest(String containerIDStr,
- ContainerLaunchContext launchContext,
- UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
- throws YarnException {
- // do Nothing
+ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ UserGroupInformation ugi) throws YarnException {
+ // do nothing
+ }
+
+ @Override
+ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+ Container container, boolean stopRequest) throws YarnException {
+ // do nothing
}
- @Override
- protected ContainerTokenIdentifier
- getContainerTokenIdentifier(UserGroupInformation remoteUgi,
- ContainerTokenIdentifier containerTokenId) throws YarnException {
- return containerTokenId;
- }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index fe2655d9903..13dc0991322 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -74,7 +74,7 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
}
@Override
- public void testContainerSetup() throws IOException, InterruptedException,
+ public void testContainerSetup() throws Exception, InterruptedException,
YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
index 99467046be4..668b85b6511 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -133,18 +134,13 @@ public class TestEventFlow {
ApplicationAttemptId.newInstance(applicationId, 0);
ContainerId cID = ContainerId.newInstance(applicationAttemptId, 0);
- Resource r = BuilderUtils.newResource(1024, 1);
String user = "testing";
- String host = "127.0.0.1";
- int port = 1234;
- Token containerToken =
- BuilderUtils.newContainerToken(cID, host, port, user, r,
- System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
- SIMULATED_RM_IDENTIFIER);
StartContainerRequest request =
recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(launchContext);
- request.setContainerToken(containerToken);
+ request.setContainerToken(TestContainerManager.createContainerToken(cID,
+ SIMULATED_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(request);
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
index cda47b22770..3b88b03c7e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -50,17 +49,16 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
@@ -131,24 +129,23 @@ public class TestNodeManagerReboot {
containerLaunchContext.setLocalResources(localResources);
List commands = new ArrayList();
containerLaunchContext.setCommands(commands);
- Resource resource = Records.newRecord(Resource.class);
- resource.setMemory(1024);
- NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 12345);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
- user, resource, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), 0);
final StartContainerRequest startRequest =
Records.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ NodeId nodeId = nm.getNMContext().getNodeId();
+ startRequest.setContainerToken(TestContainerManager.createContainerToken(
+ cId, 0, nodeId, destinationFile, nm.getNMContext()
+ .getContainerTokenSecretManager()));
final UserGroupInformation currentUser = UserGroupInformation
- .createRemoteUser(cId.toString());
+ .createRemoteUser(cId.getApplicationAttemptId().toString());
+ NMTokenIdentifier nmIdentifier =
+ new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123);
+ currentUser.addTokenIdentifier(nmIdentifier);
currentUser.doAs(new PrivilegedExceptionAction() {
@Override
public Void run() throws YarnException, IOException {
- containerManager.startContainer(startRequest);
+ nm.getContainerManager().startContainer(startRequest);
return null;
}
});
@@ -208,8 +205,6 @@ public class TestNodeManagerReboot {
ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
== 0);
- verify(delService, times(1)).delete(eq(user),
- argThat(new PathInclude(user)));
verify(delService, times(1)).delete(
(String) isNull(),
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index 51bae7cbe84..93978f25228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -184,6 +184,7 @@ public class TestNodeManagerResync {
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
} catch (AssertionError ae) {
+ ae.printStackTrace();
assertionFailedInThread.set(true);
}
}
@@ -228,6 +229,7 @@ public class TestNodeManagerResync {
.setStopThreadFlag(false);
super.setBlockNewContainerRequests(blockNewContainerRequests);
} catch (InterruptedException e) {
+ e.printStackTrace();
}
}
}
@@ -258,6 +260,7 @@ public class TestNodeManagerResync {
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
} catch (AssertionError ae) {
+ ae.printStackTrace();
assertionFailedInThread.set(true);
}
}
@@ -296,6 +299,7 @@ public class TestNodeManagerResync {
Assert.assertEquals(NMNotYetReadyException.class.getName(), e
.getClass().getName());
} catch (IOException e) {
+ e.printStackTrace();
assertionFailedInThread.set(true);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
index 312f7cce1cf..20f7f77fffe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
@@ -52,16 +52,17 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
@@ -161,7 +162,7 @@ public class TestNodeManagerShutdown {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
+ NodeId nodeId = BuilderUtils.newNodeId("localhost", 12345);
URL localResourceUri =
ConverterUtils.getYarnUrlFromPath(localFS
@@ -180,17 +181,22 @@ public class TestNodeManagerShutdown {
containerLaunchContext.setLocalResources(localResources);
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource resource = BuilderUtils.newResource(1024, 1);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
- user, resource, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), 0);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest
+ .setContainerToken(TestContainerManager.createContainerToken(cId, 0,
+ nodeId, user, nm.getNMContext().getContainerTokenSecretManager()));
+ final InetSocketAddress containerManagerBindAddress =
+ NetUtils.createSocketAddrForHost("127.0.0.1", 12345);
UserGroupInformation currentUser = UserGroupInformation
.createRemoteUser(cId.toString());
+ org.apache.hadoop.security.token.Token nmToken =
+ ConverterUtils.convertFromYarn(
+ nm.getNMContext().getNMTokenSecretManager()
+ .createNMToken(cId.getApplicationAttemptId(), nodeId, user),
+ containerManagerBindAddress);
+ currentUser.addToken(nmToken);
ContainerManagementProtocol containerManager =
currentUser.doAs(new PrivilegedAction() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 843fd3ca9e7..e17131fd3a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -201,7 +201,7 @@ public class TestNodeStatusUpdater {
String user = "testUser";
ContainerTokenIdentifier containerToken =
BuilderUtils.newContainerTokenIdentifier(BuilderUtils
- .newContainerToken(firstContainerID, "127.0.0.1", 1234, user,
+ .newContainerToken(firstContainerID, "localhost", 1234, user,
resource, currentTime + 10000, 123, "password".getBytes(),
currentTime));
Container container =
@@ -232,7 +232,7 @@ public class TestNodeStatusUpdater {
Resource resource = BuilderUtils.newResource(3, 1);
ContainerTokenIdentifier containerToken =
BuilderUtils.newContainerTokenIdentifier(BuilderUtils
- .newContainerToken(secondContainerID, "127.0.0.1", 1234, user,
+ .newContainerToken(secondContainerID, "localhost", 1234, user,
resource, currentTime + 10000, 123,
"password".getBytes(), currentTime));
Container container =
@@ -1168,8 +1168,8 @@ public class TestNodeStatusUpdater {
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
- conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
- conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+ conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345");
+ conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
.getPath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 6c9198ce678..2a157d85b4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -30,21 +30,20 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -58,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -150,7 +150,7 @@ public abstract class BaseContainerManagerTest {
LOG.info("Created localDir in " + localDir.getAbsolutePath());
LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
- String bindAddress = "0.0.0.0:5555";
+ String bindAddress = "127.0.0.1:12345";
conf.set(YarnConfiguration.NM_ADDRESS, bindAddress);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
@@ -173,6 +173,7 @@ public abstract class BaseContainerManagerTest {
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
+
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, new ApplicationACLsManager(conf), dirsHandler) {
@Override
@@ -182,11 +183,24 @@ public abstract class BaseContainerManagerTest {
}
@Override
- protected void authorizeRequest(String containerIDStr,
- ContainerLaunchContext launchContext, UserGroupInformation remoteUgi,
- ContainerTokenIdentifier tokenId) throws YarnException {
- // do nothing
- }
+ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+ Container container, boolean stopRequest) throws YarnException {
+ // do nothing
+ }
+
+ @Override
+ protected void authorizeStartRequest(
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ UserGroupInformation ugi) throws YarnException {
+ // do nothing
+ }
+
+ @Override
+ protected void updateNMTokenIdentifier(
+ NMTokenIdentifier nmTokenIdentifier) throws InvalidToken {
+ // Do nothing
+ }
};
}
@@ -242,7 +256,7 @@ public abstract class BaseContainerManagerTest {
throws InterruptedException {
// Wait for app-finish
Application app =
- containerManager.context.getApplications().get(appID);
+ containerManager.getContext().getApplications().get(appID);
int timeout = 0;
while (!(app.getApplicationState().equals(finalState))
&& timeout++ < 15) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 5fb88121646..a66be90ad12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -47,10 +48,13 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@@ -59,8 +63,11 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
import org.junit.Test;
public class TestContainerManager extends BaseContainerManagerTest {
@@ -72,6 +79,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
static {
LOG = LogFactory.getLog(TestContainerManager.class);
}
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+ }
private ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
@@ -81,6 +94,32 @@ public class TestContainerManager extends BaseContainerManagerTest {
return containerId;
}
+ @Override
+ protected ContainerManagerImpl
+ createContainerManager(DeletionService delSrvc) {
+ return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+ metrics, new ApplicationACLsManager(conf), dirsHandler) {
+ @Override
+ public void
+ setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+ // do nothing
+ }
+
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+ .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+ };
+ }
+
@Test
public void testContainerManagerInitialization() throws IOException {
@@ -101,8 +140,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
}
@Test
- public void testContainerSetup() throws IOException, InterruptedException,
- YarnException {
+ public void testContainerSetup() throws Exception {
containerManager.start();
@@ -134,16 +172,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
new HashMap();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
- Resource r = BuilderUtils.newResource(512, 1);
- int port = 12345;
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(startRequest);
@@ -227,16 +261,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
containerLaunchContext.setLocalResources(localResources);
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(100, 1);
- int port = 12345;
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(startRequest);
int timeoutSecs = 0;
@@ -335,15 +365,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
containerLaunchContext.setLocalResources(localResources);
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(100, 1);
- int port = 12345;
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
+
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(startRequest);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
@@ -423,16 +450,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
new HashMap();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
- Resource r = BuilderUtils.newResource(100, 1);
- int port = 12345;
-
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(containerLaunchContext);
- request.setContainerToken(containerToken);
+ request.setContainerToken(createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user, context.getContainerTokenSecretManager()));
containerManager.startContainer(request);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
@@ -503,24 +524,19 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- String host = "127.0.0.1";
- int port = 1234;
ContainerId cId1 = createContainerId();
ContainerId cId2 = createContainerId();
containerLaunchContext
.setLocalResources(new HashMap());
- Resource mockResource = BuilderUtils.newResource(1024, 1);
// Construct the Container with Invalid RMIdentifier
StartContainerRequest startRequest1 =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest1.setContainerLaunchContext(containerLaunchContext);
- Token containerToken1 =
- BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
- System.currentTimeMillis() + 10000, 123, "password".getBytes(),
- (long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
- startRequest1.setContainerToken(containerToken1);
+ startRequest1.setContainerToken(createContainerToken(cId1,
+ ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
boolean catchException = false;
try {
containerManager.startContainer(startRequest1);
@@ -528,8 +544,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
catchException = true;
Assert.assertTrue(e.getMessage().contains(
"Container " + cId1 + " rejected as it is allocated by a previous RM"));
- Assert.assertEquals(InvalidContainerException.class.getName(), e
- .getClass().getName());
+ Assert.assertTrue(e.getClass().getName()
+ .equalsIgnoreCase(InvalidContainerException.class.getName()));
}
// Verify that startContainer fail because of invalid container request
@@ -539,11 +555,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
StartContainerRequest startRequest2 =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest2.setContainerLaunchContext(containerLaunchContext);
- Token containerToken2 =
- BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
- System.currentTimeMillis() + 10000, 123, "password".getBytes(),
- super.DUMMY_RM_IDENTIFIER);
- startRequest2.setContainerToken(containerToken2);
+ startRequest2.setContainerToken(createContainerToken(cId2,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
boolean noException = true;
try {
containerManager.startContainer(startRequest2);
@@ -553,4 +567,20 @@ public class TestContainerManager extends BaseContainerManagerTest {
// Verify that startContainer get no YarnException
Assert.assertTrue(noException);
}
+
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user,
+ NMContainerTokenSecretManager containerTokenSecretManager)
+ throws IOException {
+ Resource r = BuilderUtils.newResource(1024, 1);
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
+ System.currentTimeMillis() + 100000L, 123, rmIdentifier);
+ Token containerToken =
+ BuilderUtils
+ .newContainerToken(nodeId, containerTokenSecretManager
+ .retrievePassword(containerTokenIdentifier),
+ containerTokenIdentifier);
+ return containerToken;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
index b08fe275ee8..231e2fa6eb6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
@@ -37,6 +37,7 @@ import junit.framework.Assert;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@@ -229,14 +231,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
// set up the rest of the container
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(1024, 1);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 1234,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId));
containerManager.startContainer(startRequest);
int timeoutSecs = 0;
@@ -378,12 +375,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
// set up the rest of the container
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(1024, 1);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ Token containerToken = createContainerToken(cId);
+ StartContainerRequest startRequest =
+ recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainerToken(containerToken);
containerManager.startContainer(startRequest);
@@ -441,4 +435,17 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
}
}
+ protected Token createContainerToken(ContainerId cId) throws InvalidToken {
+ Resource r = BuilderUtils.newResource(1024, 1);
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
+ r, System.currentTimeMillis() + 10000L, 123, DUMMY_RM_IDENTIFIER);
+ Token containerToken =
+ BuilderUtils.newContainerToken(
+ context.getNodeId(),
+ context.getContainerTokenSecretManager().retrievePassword(
+ containerTokenIdentifier), containerTokenIdentifier);
+ return containerToken;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 58a96c9e2b6..152b988cb50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -23,9 +23,9 @@ import static junit.framework.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -73,8 +73,6 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -94,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
@@ -810,15 +809,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(100 * 1024 * 1024, 1);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, "127.0.0.1", 1234, user, r,
- System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
- super.DUMMY_RM_IDENTIFIER);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(TestContainerManager.createContainerToken(
+ cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
this.containerManager.startContainer(startRequest);
BaseContainerManagerTest.waitForContainerState(this.containerManager,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
index 7a8c61af276..cd73fab4068 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
@@ -229,13 +230,16 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
+ ContainerTokenIdentifier containerIdentifier =
+ new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
+ r, System.currentTimeMillis() + 120000, 123, DUMMY_RM_IDENTIFIER);
Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
+ BuilderUtils.newContainerToken(context.getNodeId(),
+ containerManager.getContext().getContainerTokenSecretManager()
+ .createPassword(containerIdentifier), containerIdentifier);
startRequest.setContainerToken(containerToken);
containerManager.startContainer(startRequest);
-
+
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml
new file mode 100644
index 00000000000..f0d3085ef85
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+
+ hadoop.security.token.service.use_ip
+ false
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 7cbfc1c51bd..f6b45232f1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -380,7 +380,7 @@ public class ApplicationMasterService extends AbstractService implements
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
- .getNMTokens(app.getUser(), appAttemptId,
+ .createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
return allocateResponse;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 673c7e47fe1..51a04003761 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -131,21 +131,30 @@ public class AMLauncher implements Runnable {
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
- UserGroupInformation currentUser = UserGroupInformation
- .createRemoteUser(containerId.toString());
- if (UserGroupInformation.isSecurityEnabled()) {
- Token token =
- ConverterUtils.convertFromYarn(masterContainer
- .getContainerToken(), containerManagerBindAddress);
- currentUser.addToken(token);
- }
- return currentUser.doAs(new PrivilegedAction() {
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
- containerManagerBindAddress, conf);
- }
- });
+ UserGroupInformation currentUser =
+ UserGroupInformation.createRemoteUser(containerId
+ .getApplicationAttemptId().toString());
+
+ String user =
+ rmContext.getRMApps()
+ .get(containerId.getApplicationAttemptId().getApplicationId())
+ .getUser();
+ org.apache.hadoop.yarn.api.records.Token token =
+ rmContext.getNMTokenSecretManager().createNMToken(
+ containerId.getApplicationAttemptId(), node, user);
+ currentUser.addToken(ConverterUtils.convertFromYarn(token,
+ containerManagerBindAddress));
+
+ return currentUser
+ .doAs(new PrivilegedAction() {
+
+ @Override
+ public ContainerManagementProtocol run() {
+ return (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class,
+ containerManagerBindAddress, conf);
+ }
+ });
}
private ContainerLaunchContext createAMContainerLaunchContext(
@@ -234,7 +243,13 @@ public class AMLauncher implements Runnable {
} catch(IOException ie) {
LOG.info("Error cleaning master ", ie);
} catch (YarnException e) {
- LOG.info("Error cleaning master ", e);
+ StringBuilder sb = new StringBuilder("Container ");
+ sb.append(masterContainer.getId().toString());
+ sb.append(" is not handled by this NodeManager");
+ if (!e.getMessage().contains(sb.toString())) {
+ // Ignoring if container is already killed by Node Manager.
+ LOG.info("Error cleaning master ", e);
+ }
}
break;
default:
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
index 3931c6ce109..ab31eaf3af1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,16 +30,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -183,7 +178,7 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
}
}
- public List getNMTokens(String applicationSubmitter,
+ public List createAndGetNMTokens(String applicationSubmitter,
ApplicationAttemptId appAttemptId, List containers) {
try {
this.readLock.lock();
@@ -193,12 +188,14 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
for (Container container : containers) {
if (!nodeSet.contains(container.getNodeId())) {
LOG.debug("Sending NMToken for nodeId : "
- + container.getNodeId().toString());
+ + container.getNodeId().toString()
+ + " for application attempt : " + appAttemptId.toString());
Token token = createNMToken(appAttemptId, container.getNodeId(),
applicationSubmitter);
NMToken nmToken =
NMToken.newInstance(container.getNodeId(), token);
nmTokens.add(nmToken);
+ // This will update the nmToken set.
nodeSet.add(container.getNodeId());
}
}
@@ -273,38 +270,4 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
this.writeLock.unlock();
}
}
-
- public static Token newNMToken(byte[] password,
- NMTokenIdentifier identifier) {
- NodeId nodeId = identifier.getNodeId();
- // RPC layer client expects ip:port as service for tokens
- InetSocketAddress addr =
- NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
- Token nmToken =
- Token.newInstance(identifier.getBytes(),
- NMTokenIdentifier.KIND.toString(), password, SecurityUtil
- .buildTokenService(addr).toString());
- return nmToken;
- }
-
- /**
- * Helper function for creating NMTokens.
- */
- public Token createNMToken(ApplicationAttemptId applicationAttemptId,
- NodeId nodeId, String applicationSubmitter) {
- byte[] password;
- NMTokenIdentifier identifier;
-
- this.readLock.lock();
- try {
- identifier =
- new NMTokenIdentifier(applicationAttemptId, nodeId,
- applicationSubmitter, this.currentMasterKey.getMasterKey()
- .getKeyId());
- password = this.createPassword(identifier);
- } finally {
- this.readLock.unlock();
- }
- return newNMToken(password, identifier);
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index c1d3f926331..522debbb464 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -59,9 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@@ -310,6 +308,7 @@ public class MockRM extends ResourceManager {
Configuration conf = new Configuration();
containerTokenSecretManager.rollMasterKey();
+ nmTokenSecretManager.rollMasterKey();
return new ResourceTrackerService(getRMContext(), nodesListManager,
this.nmLivelinessMonitor, containerTokenSecretManager,
nmTokenSecretManager) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
index a4ccb437ca8..b78a1e51e1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
@@ -22,14 +22,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import junit.framework.Assert;
@@ -37,45 +30,30 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
@@ -111,15 +89,15 @@ public class TestContainerManagerSecurity {
yarnCluster.init(conf);
yarnCluster.start();
- // Testing for authenticated user
- testAuthenticatedUser();
+ // TestNMTokens.
+ testNMTokens(conf);
- // Testing for malicious user
- testMaliceUser();
-
- // Testing for usage of expired tokens
- testExpiredTokens();
+ // Testing for container token tampering
+ testContainerToken(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
@@ -128,57 +106,264 @@ public class TestContainerManagerSecurity {
}
}
- private void testAuthenticatedUser() throws IOException,
- InterruptedException, YarnException {
+ private void testNMTokens(Configuration conf) throws Exception {
+ NMTokenSecretManagerInRM nmTokenSecretManagerRM =
+ yarnCluster.getResourceManager().getRMContext()
+ .getNMTokenSecretManager();
+ NMTokenSecretManagerInNM nmTokenSecretManagerNM =
+ yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager();
+ RMContainerTokenSecretManager containerTokenSecretManager =
+ yarnCluster.getResourceManager().getRMContainerTokenSecretManager();
+
+ NodeManager nm = yarnCluster.getNodeManager(0);
+
+ waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM, nm);
+
+ // Both id should be equal.
+ Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(),
+ nmTokenSecretManagerRM.getCurrentKey().getKeyId());
+
+ /*
+ * Below cases should be tested.
+ * 1) If Invalid NMToken is used then it should be rejected.
+ * 2) If valid NMToken but belonging to another Node is used then that
+ * too should be rejected.
+ * 3) NMToken for say appAttempt-1 is used for starting/stopping/retrieving
+ * status for container with containerId for say appAttempt-2 should
+ * be rejected.
+ * 4) After start container call is successful nmtoken should have been
+ * saved in NMTokenSecretManagerInNM.
+ * 5) If start container call was successful (no matter if container is
+ * still running or not), appAttempt->NMToken should be present in
+ * NMTokenSecretManagerInNM's cache. Any future getContainerStatus call
+ * for containerId belonging to that application attempt using
+ * applicationAttempt's older nmToken should not get any invalid
+ * nmToken error. (This can be best tested if we roll over NMToken
+ * master key twice).
+ */
+ YarnRPC rpc = YarnRPC.create(conf);
+ String user = "test";
+ Resource r = Resource.newInstance(1024, 1);
- LOG.info("Running test for authenticated user");
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId validAppAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ApplicationAttemptId invalidAppAttemptId =
+ ApplicationAttemptId.newInstance(appId, 2);
+
+ ContainerId validContainerId =
+ ContainerId.newInstance(validAppAttemptId, 0);
+
+ NodeId validNode = yarnCluster.getNodeManager(0).getNMContext().getNodeId();
+ NodeId invalidNode = NodeId.newInstance("InvalidHost", 1234);
- ResourceManager resourceManager = yarnCluster.getResourceManager();
+
+ org.apache.hadoop.yarn.api.records.Token validNMToken =
+ nmTokenSecretManagerRM.createNMToken(validAppAttemptId, validNode, user);
+
+ org.apache.hadoop.yarn.api.records.Token validContainerToken =
+ containerTokenSecretManager.createContainerToken(validContainerId,
+ validNode, user, r);
+
+ StringBuilder sb;
+ // testInvalidNMToken ... creating NMToken using different secret manager.
+
+ NMTokenSecretManagerInRM tempManager = new NMTokenSecretManagerInRM(conf);
+ tempManager.rollMasterKey();
+ do {
+ tempManager.rollMasterKey();
+ tempManager.activateNextMasterKey();
+ // Making sure key id is different.
+ } while (tempManager.getCurrentKey().getKeyId() == nmTokenSecretManagerRM
+ .getCurrentKey().getKeyId());
+
+ org.apache.hadoop.yarn.api.records.Token invalidNMToken =
+ tempManager.createNMToken(validAppAttemptId, validNode, user);
+ sb = new StringBuilder("Given NMToken for application : ");
+ sb.append(validAppAttemptId.toString())
+ .append(" seems to have been generated illegally.");
+ Assert.assertTrue(sb.toString().contains(
+ testStartContainer(rpc, validAppAttemptId, validNode,
+ validContainerToken, invalidNMToken, true)));
+
+ // valid NMToken but belonging to other node
+ invalidNMToken =
+ nmTokenSecretManagerRM.createNMToken(validAppAttemptId, invalidNode,
+ user);
+ sb = new StringBuilder("Given NMToken for application : ");
+ sb.append(validAppAttemptId)
+ .append(" is not valid for current node manager.expected : ")
+ .append(validNode.toString())
+ .append(" found : ").append(invalidNode.toString());
+ Assert.assertTrue(sb.toString().contains(
+ testStartContainer(rpc, validAppAttemptId, validNode,
+ validContainerToken, invalidNMToken, true)));
+
+ // using appAttempt-2 token for launching container for appAttempt-1.
+ invalidNMToken =
+ nmTokenSecretManagerRM.createNMToken(invalidAppAttemptId, validNode,
+ user);
+ sb = new StringBuilder("\nNMToken for application attempt : ");
+ sb.append(invalidAppAttemptId.toString())
+ .append(" was used for starting container with container token")
+ .append(" issued for application attempt : ")
+ .append(validAppAttemptId.toString());
+ Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
+ validContainerToken, invalidNMToken, true).contains(sb.toString()));
+
+ // using correct tokens. nmtoken for appattempt should get saved.
+ testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken,
+ validNMToken, false);
+ Assert.assertTrue(nmTokenSecretManagerNM
+ .isAppAttemptNMTokenKeyPresent(validAppAttemptId));
+
+ // Rolling over master key twice so that we can check whether older keys
+ // are used for authentication.
+ rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
+ // Key rolled over once.. rolling over again
+ rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
+
+ // trying get container status. Now saved nmToken should be used for
+ // authentication.
+ sb = new StringBuilder("Container ");
+ sb.append(validContainerId.toString());
+ sb.append(" is not handled by this NodeManager");
+ Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
+ validContainerId, validNMToken, false).contains(sb.toString()));
+
+ }
- final YarnRPC yarnRPC = YarnRPC.create(conf);
+ protected void waitForNMToReceiveNMTokenKey(
+ NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm)
+ throws InterruptedException {
+ int attempt = 60;
+ ContainerManagerImpl cm =
+ ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
+ while ((cm.getBlockNewContainerRequestsStatus() || nmTokenSecretManagerNM
+ .getNodeId() == null) && attempt-- > 0) {
+ Thread.sleep(2000);
+ }
+ }
- // Submit an application
- ApplicationId appID = resourceManager.getClientRMService()
- .getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
- .getApplicationId();
- ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager,
- yarnRPC, appID);
+ protected void rollNMTokenMasterKey(
+ NMTokenSecretManagerInRM nmTokenSecretManagerRM,
+ NMTokenSecretManagerInNM nmTokenSecretManagerNM) throws Exception {
+ int oldKeyId = nmTokenSecretManagerRM.getCurrentKey().getKeyId();
+ nmTokenSecretManagerRM.rollMasterKey();
+ int interval = 40;
+ while (nmTokenSecretManagerNM.getCurrentKey().getKeyId() == oldKeyId
+ && interval-- > 0) {
+ Thread.sleep(1000);
+ }
+ nmTokenSecretManagerRM.activateNextMasterKey();
+ Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId()
+ == nmTokenSecretManagerRM.getCurrentKey().getKeyId()));
+ }
- // Now request a container.
- final Container allocatedContainer = requestAndGetContainer(scheduler,
- appID);
-
- // Now talk to the NM for launching the container.
- final ContainerId containerID = allocatedContainer.getId();
- UserGroupInformation authenticatedUser = UserGroupInformation
- .createRemoteUser(containerID.toString());
- org.apache.hadoop.yarn.api.records.Token containerToken =
- allocatedContainer.getContainerToken();
- Token token = new Token(
- containerToken.getIdentifier().array(), containerToken.getPassword()
- .array(), new Text(containerToken.getKind()), new Text(
- containerToken.getService()));
- authenticatedUser.addToken(token);
- authenticatedUser.doAs(new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy(
- ContainerManagementProtocol.class, NetUtils
- .createSocketAddr(allocatedContainer.getNodeId().toString()),
- conf);
- LOG.info("Going to make a legal stopContainer() request");
- StopContainerRequest request = recordFactory
- .newRecordInstance(StopContainerRequest.class);
- request.setContainerId(containerID);
- client.stopContainer(request);
- return null;
+ private String testGetContainer(YarnRPC rpc,
+ ApplicationAttemptId appAttemptId, NodeId nodeId,
+ ContainerId containerId,
+ org.apache.hadoop.yarn.api.records.Token nmToken,
+ boolean isExceptionExpected) {
+ try {
+ getContainerStatus(rpc, nmToken, containerId, appAttemptId, nodeId,
+ isExceptionExpected);
+ if (isExceptionExpected) {
+ fail("Exception was expected!!");
}
- });
+ return "";
+ } catch (Exception e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
+ }
- KillApplicationRequest request = Records
- .newRecord(KillApplicationRequest.class);
- request.setApplicationId(appID);
- resourceManager.getClientRMService().forceKillApplication(request);
+ protected String testStartContainer(YarnRPC rpc,
+ ApplicationAttemptId appAttemptId, NodeId nodeId,
+ org.apache.hadoop.yarn.api.records.Token containerToken,
+ org.apache.hadoop.yarn.api.records.Token nmToken,
+ boolean isExceptionExpected) {
+ try {
+ startContainer(rpc, nmToken, containerToken, nodeId,
+ appAttemptId.toString());
+ if (isExceptionExpected){
+ fail("Exception was expected!!");
+ }
+ return "";
+ } catch (Exception e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
+ }
+
+ private void
+ getContainerStatus(YarnRPC rpc,
+ org.apache.hadoop.yarn.api.records.Token nmToken,
+ ContainerId containerId,
+ ApplicationAttemptId appAttemptId, NodeId nodeId,
+ boolean isExceptionExpected) throws Exception {
+ GetContainerStatusRequest request =
+ Records.newRecord(GetContainerStatusRequest.class);
+ request.setContainerId(containerId);
+
+ ContainerManagementProtocol proxy = null;
+
+ try {
+ proxy =
+ getContainerManagementProtocolProxy(rpc, nmToken, nodeId,
+ appAttemptId.toString());
+ proxy.getContainerStatus(request);
+
+ } finally {
+ if (proxy != null) {
+ rpc.stopProxy(proxy, conf);
+ }
+ }
+ }
+
+ private void startContainer(final YarnRPC rpc,
+ org.apache.hadoop.yarn.api.records.Token nmToken,
+ org.apache.hadoop.yarn.api.records.Token containerToken,
+ NodeId nodeId, String user) throws Exception {
+
+ StartContainerRequest request =
+ Records.newRecord(StartContainerRequest.class);
+ request.setContainerToken(containerToken);
+ ContainerLaunchContext context =
+ Records.newRecord(ContainerLaunchContext.class);
+ request.setContainerLaunchContext(context);
+
+ ContainerManagementProtocol proxy = null;
+ try {
+ proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
+ proxy.startContainer(request);
+ } finally {
+ if (proxy != null) {
+ rpc.stopProxy(proxy, conf);
+ }
+ }
+ }
+
+ protected ContainerManagementProtocol getContainerManagementProtocolProxy(
+ final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken,
+ NodeId nodeId, String user) {
+ ContainerManagementProtocol proxy;
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ final InetSocketAddress addr =
+ NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
+ ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
+
+ proxy = ugi
+ .doAs(new PrivilegedAction() {
+
+ @Override
+ public ContainerManagementProtocol run() {
+ return (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class,
+ addr, conf);
+ }
+ });
+ return proxy;
}
/**
@@ -190,349 +375,61 @@ public class TestContainerManagerSecurity {
* @throws InterruptedException
* @throws YarnException
*/
- private void testMaliceUser() throws IOException, InterruptedException,
- YarnException {
+ private void testContainerToken(Configuration conf) throws IOException,
+ InterruptedException, YarnException {
LOG.info("Running test for malice user");
-
- ResourceManager resourceManager = yarnCluster.getResourceManager();
-
- final YarnRPC yarnRPC = YarnRPC.create(conf);
-
- // Submit an application
- ApplicationId appID = resourceManager.getClientRMService()
- .getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
- .getApplicationId();
- ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager,
- yarnRPC, appID);
-
- // Now request a container.
- final Container allocatedContainer = requestAndGetContainer(scheduler,
- appID);
-
- // Now talk to the NM for launching the container with modified resource
-
- org.apache.hadoop.yarn.api.records.Token containerToken =
- allocatedContainer.getContainerToken();
- ContainerTokenIdentifier originalContainerTokenId =
- BuilderUtils.newContainerTokenIdentifier(containerToken);
-
- // Malice user modifies the resource amount
- Resource modifiedResource = BuilderUtils.newResource(2048, 1);
- ContainerTokenIdentifier modifiedIdentifier =
- new ContainerTokenIdentifier(originalContainerTokenId.getContainerID(),
- originalContainerTokenId.getNmHostAddress(), "testUser",
- modifiedResource, Long.MAX_VALUE,
- originalContainerTokenId.getMasterKeyId(),
- ResourceManager.clusterTimeStamp);
- Token modifiedToken =
- new Token(modifiedIdentifier.getBytes(),
- containerToken.getPassword().array(), new Text(
- containerToken.getKind()), new Text(containerToken.getService()));
- makeTamperedStartContainerCall(yarnRPC, allocatedContainer,
- modifiedIdentifier, modifiedToken);
-
- // Malice user modifies the container-Id
- ContainerId newContainerId =
- BuilderUtils.newContainerId(
- BuilderUtils.newApplicationAttemptId(originalContainerTokenId
- .getContainerID().getApplicationAttemptId().getApplicationId(), 1),
- originalContainerTokenId.getContainerID().getId() + 42);
- modifiedIdentifier =
- new ContainerTokenIdentifier(newContainerId,
- originalContainerTokenId.getNmHostAddress(), "testUser",
- originalContainerTokenId.getResource(), Long.MAX_VALUE,
- originalContainerTokenId.getMasterKeyId(),
- ResourceManager.clusterTimeStamp);
- modifiedToken =
- new Token(modifiedIdentifier.getBytes(),
- containerToken.getPassword().array(), new Text(
- containerToken.getKind()), new Text(containerToken.getService()));
- makeTamperedStartContainerCall(yarnRPC, allocatedContainer,
- modifiedIdentifier, modifiedToken);
-
- // Similarly messing with anything else will fail.
-
- KillApplicationRequest request = Records
- .newRecord(KillApplicationRequest.class);
- request.setApplicationId(appID);
- resourceManager.getClientRMService().forceKillApplication(request);
- }
-
- private void makeTamperedStartContainerCall(final YarnRPC yarnRPC,
- final Container allocatedContainer,
- final ContainerTokenIdentifier modifiedIdentifier,
- Token modifiedToken) {
- final ContainerId containerID = allocatedContainer.getId();
- UserGroupInformation maliceUser = UserGroupInformation
- .createRemoteUser(containerID.toString());
- maliceUser.addToken(modifiedToken);
- maliceUser.doAs(new PrivilegedAction() {
- @Override
- public Void run() {
- ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy(
- ContainerManagementProtocol.class, NetUtils
- .createSocketAddr(allocatedContainer.getNodeId().toString()),
- conf);
-
- LOG.info("Going to contact NM: ilLegal request");
- StartContainerRequest request =
- Records.newRecord(StartContainerRequest.class);
- try {
- request.setContainerToken(allocatedContainer.getContainerToken());
- ContainerLaunchContext context =
- createContainerLaunchContextForTest(modifiedIdentifier);
- request.setContainerLaunchContext(context);
- client.startContainer(request);
- fail("Connection initiation with illegally modified "
- + "tokens is expected to fail.");
- } catch (YarnException e) {
- LOG.error("Got exception", e);
- fail("Cannot get a YARN remote exception as "
- + "it will indicate RPC success");
- } catch (Exception e) {
- Assert.assertEquals(
- javax.security.sasl.SaslException.class
- .getCanonicalName(), e.getClass().getCanonicalName());
- Assert.assertTrue(e
- .getMessage()
- .contains(
- "DIGEST-MD5: digest response format violation. "
- + "Mismatched response."));
- }
- return null;
- }
- });
- }
-
- private void testExpiredTokens() throws IOException, InterruptedException,
- YarnException {
-
- LOG.info("\n\nRunning test for malice user");
-
- ResourceManager resourceManager = yarnCluster.getResourceManager();
-
- final YarnRPC yarnRPC = YarnRPC.create(conf);
-
- // Submit an application
- final ApplicationId appID = resourceManager.getClientRMService()
- .getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
- .getApplicationId();
- ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager,
- yarnRPC, appID);
-
- // Now request a container.
- final Container allocatedContainer = requestAndGetContainer(scheduler,
- appID);
-
- // Now talk to the NM for launching the container with modified containerID
- final ContainerId containerID = allocatedContainer.getId();
-
- org.apache.hadoop.yarn.api.records.Token containerToken =
- allocatedContainer.getContainerToken();
- final ContainerTokenIdentifier tokenId =
- BuilderUtils.newContainerTokenIdentifier(containerToken);
-
- /////////// Test calls with expired tokens
- UserGroupInformation unauthorizedUser = UserGroupInformation
- .createRemoteUser(containerID.toString());
-
- RMContainerTokenSecretManager containerTokenSecreteManager =
- resourceManager.getRMContainerTokenSecretManager();
- final ContainerTokenIdentifier newTokenId =
- new ContainerTokenIdentifier(tokenId.getContainerID(),
- tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(),
- tokenId.getResource(), System.currentTimeMillis() - 1,
- containerTokenSecreteManager.getCurrentKey().getKeyId(),
- ResourceManager.clusterTimeStamp);
- final byte[] passowrd =
- containerTokenSecreteManager.createPassword(
- newTokenId);
- // Create a valid token by using the key from the RM.
- Token token =
- new Token(newTokenId.getBytes(), passowrd,
- new Text(containerToken.getKind()), new Text(
- containerToken.getService()));
-
- unauthorizedUser.addToken(token);
- unauthorizedUser.doAs(new PrivilegedAction() {
- @Override
- public Void run() {
- ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy(
- ContainerManagementProtocol.class, NetUtils
- .createSocketAddr(allocatedContainer.getNodeId().toString()),
- conf);
-
- LOG.info("Going to contact NM with expired token");
- ContainerLaunchContext context = createContainerLaunchContextForTest(newTokenId);
- StartContainerRequest request =
- Records.newRecord(StartContainerRequest.class);
- request.setContainerLaunchContext(context);
- allocatedContainer.setContainerToken(BuilderUtils.newContainerToken(
- allocatedContainer.getNodeId(), passowrd, newTokenId));
- request.setContainerToken(allocatedContainer.getContainerToken());
-
- //Calling startContainer with an expired token.
- try {
- client.startContainer(request);
- fail("Connection initiation with expired "
- + "token is expected to fail.");
- } catch (Throwable t) {
- LOG.info("Got exception : ", t);
- Assert.assertTrue(t.getMessage().contains(
- "This token is expired. current time is"));
- }
-
- // Try stopping a container - should not get an expiry error.
- StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(newTokenId.getContainerID());
- try {
- client.stopContainer(stopRequest);
- } catch (Throwable t) {
- fail("Stop Container call should have succeeded");
- }
-
- return null;
- }
- });
- /////////// End of testing calls with expired tokens
-
- KillApplicationRequest request = Records
- .newRecord(KillApplicationRequest.class);
- request.setApplicationId(appID);
- resourceManager.getClientRMService().forceKillApplication(request);
- }
-
- private ApplicationMasterProtocol submitAndRegisterApplication(
- ResourceManager resourceManager, final YarnRPC yarnRPC,
- ApplicationId appID) throws IOException,
- UnsupportedFileSystemException, YarnException,
- InterruptedException {
-
- // Use ping to simulate sleep on Windows.
- List cmd = Shell.WINDOWS ?
- Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul") :
- Arrays.asList("sleep", "100");
-
- ContainerLaunchContext amContainer =
- BuilderUtils.newContainerLaunchContext(
- Collections. emptyMap(),
- new HashMap(), cmd,
- new HashMap(), null,
- new HashMap());
-
- ApplicationSubmissionContext appSubmissionContext = recordFactory
- .newRecordInstance(ApplicationSubmissionContext.class);
- appSubmissionContext.setApplicationId(appID);
- appSubmissionContext.setAMContainerSpec(amContainer);
- appSubmissionContext.setResource(BuilderUtils.newResource(1024, 1));
-
- SubmitApplicationRequest submitRequest = recordFactory
- .newRecordInstance(SubmitApplicationRequest.class);
- submitRequest.setApplicationSubmissionContext(appSubmissionContext);
- resourceManager.getClientRMService().submitApplication(submitRequest);
-
- // Wait till container gets allocated for AM
- int waitCounter = 0;
- RMApp app = resourceManager.getRMContext().getRMApps().get(appID);
- RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt();
- RMAppAttemptState state = appAttempt == null ? null : appAttempt
- .getAppAttemptState();
- while ((app == null || appAttempt == null || state == null || !state
- .equals(RMAppAttemptState.LAUNCHED))
- && waitCounter++ != 20) {
- LOG.info("Waiting for applicationAttempt to be created.. ");
- Thread.sleep(1000);
- app = resourceManager.getRMContext().getRMApps().get(appID);
- appAttempt = app == null ? null : app.getCurrentAppAttempt();
- state = appAttempt == null ? null : appAttempt.getAppAttemptState();
- }
- Assert.assertNotNull(app);
- Assert.assertNotNull(appAttempt);
- Assert.assertNotNull(state);
- Assert.assertEquals(RMAppAttemptState.LAUNCHED, state);
-
- UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(
- appAttempt.getAppAttemptId().toString());
-
- // Ask for a container from the RM
- final InetSocketAddress schedulerAddr =
- resourceManager.getApplicationMasterService().getBindAddress();
- if (UserGroupInformation.isSecurityEnabled()) {
- AMRMTokenIdentifier appTokenIdentifier = new AMRMTokenIdentifier(
- appAttempt.getAppAttemptId());
- AMRMTokenSecretManager appTokenSecretManager =
- new AMRMTokenSecretManager(conf);
- appTokenSecretManager.setMasterKey(resourceManager
- .getAMRMTokenSecretManager().getMasterKey());
- Token appToken =
- new Token(appTokenIdentifier,
- appTokenSecretManager);
- SecurityUtil.setTokenService(appToken, schedulerAddr);
- currentUser.addToken(appToken);
- }
+ /*
+ * We need to check for containerToken (authorization).
+ * Here we will be assuming that we have valid NMToken
+ * 1) ContainerToken used is expired.
+ * 2) ContainerToken is tampered (resource is modified).
+ */
+ NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
+ yarnCluster.getResourceManager().getRMContext()
+ .getNMTokenSecretManager();
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+ ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
+ NodeManager nm = yarnCluster.getNodeManager(0);
+ NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
+ nm.getNMContext().getNMTokenSecretManager();
+ String user = "test";
- ApplicationMasterProtocol scheduler = currentUser
- .doAs(new PrivilegedAction() {
- @Override
- public ApplicationMasterProtocol run() {
- return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class,
- schedulerAddr, conf);
- }
- });
+ waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
- // Register the appMaster
- RegisterApplicationMasterRequest request = recordFactory
- .newRecordInstance(RegisterApplicationMasterRequest.class);
- request.setApplicationAttemptId(resourceManager.getRMContext()
- .getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
- scheduler.registerApplicationMaster(request);
- return scheduler;
- }
-
- private Container requestAndGetContainer(ApplicationMasterProtocol scheduler,
- ApplicationId appID) throws YarnException, InterruptedException,
- IOException {
-
- // Request a container allocation.
- List ask = new ArrayList();
- ask.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
- ResourceRequest.ANY, BuilderUtils.newResource(1024, 1), 1));
-
- AllocateRequest allocateRequest = AllocateRequest.newInstance(
- BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
- new ArrayList(), null);
- List allocatedContainers = scheduler.allocate(allocateRequest)
- .getAllocatedContainers();
-
- // Modify ask to request no more.
- allocateRequest.setAskList(new ArrayList());
-
- int waitCounter = 0;
- while ((allocatedContainers == null || allocatedContainers.size() == 0)
- && waitCounter++ != 20) {
- LOG.info("Waiting for container to be allocated..");
- Thread.sleep(1000);
- allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
- allocatedContainers = scheduler.allocate(allocateRequest)
- .getAllocatedContainers();
- }
-
- Assert.assertNotNull("Container is not allocted!", allocatedContainers);
- Assert.assertEquals("Didn't get one container!", 1, allocatedContainers
- .size());
-
- return allocatedContainers.get(0);
- }
-
- private ContainerLaunchContext createContainerLaunchContextForTest(
- ContainerTokenIdentifier tokenId) {
- ContainerLaunchContext context =
- BuilderUtils.newContainerLaunchContext(
- new HashMap(),
- new HashMap(), new ArrayList(),
- new HashMap(), null,
- new HashMap());
- return context;
+ NodeId nodeId = nm.getNMContext().getNodeId();
+
+ // Both id should be equal.
+ Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
+ nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
+
+ // Creating a tampered Container Token
+ RMContainerTokenSecretManager containerTokenSecretManager =
+ yarnCluster.getResourceManager().getRMContainerTokenSecretManager();
+
+ RMContainerTokenSecretManager tamperedContainerTokenSecretManager =
+ new RMContainerTokenSecretManager(conf);
+ tamperedContainerTokenSecretManager.rollMasterKey();
+ do {
+ tamperedContainerTokenSecretManager.rollMasterKey();
+ tamperedContainerTokenSecretManager.activateNextMasterKey();
+ } while (containerTokenSecretManager.getCurrentKey().getKeyId()
+ == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId());
+
+ Resource r = Resource.newInstance(1230, 2);
+ // Creating modified containerToken
+ Token containerToken =
+ tamperedContainerTokenSecretManager.createContainerToken(cId, nodeId,
+ user, r);
+ Token nmToken =
+ nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
+ YarnRPC rpc = YarnRPC.create(conf);
+ StringBuilder sb = new StringBuilder("Given Container ");
+ sb.append(cId);
+ sb.append(" seems to have an illegally generated token.");
+ Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
+ containerToken, nmToken, true).contains(sb.toString()));
}
}