YARN-1152. Fixed a bug in ResourceManager that was causing clients to get invalid client token key errors when an appliation is about to finish. Contributed by Jason Lowe.

svn merge --ignore-ancestry -c 1521292 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1521293 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-09 21:19:38 +00:00
parent 1680bf7061
commit e5a6acb466
6 changed files with 138 additions and 17 deletions

View File

@ -153,6 +153,10 @@ Release 2.1.1-beta - UNRELEASED
YARN-1144. Unmanaged AMs registering a tracking URI should not be YARN-1144. Unmanaged AMs registering a tracking URI should not be
proxy-fied. (tucu) proxy-fied. (tucu)
YARN-1152. Fixed a bug in ResourceManager that was causing clients to get
invalid client token key errors when an appliation is about to finish.
(Jason Lowe via vinodkv)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -432,18 +432,18 @@ public class RMAppImpl implements RMApp, Recoverable {
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
trackingUrl = this.currentAttempt.getTrackingUrl(); trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl(); origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
if (UserGroupInformation.isSecurityEnabled() if (UserGroupInformation.isSecurityEnabled()) {
&& clientUserName != null) { // get a token so the client can communicate with the app attempt
// NOTE: token may be unavailable if the attempt is not running
Token<ClientToAMTokenIdentifier> attemptClientToAMToken = Token<ClientToAMTokenIdentifier> attemptClientToAMToken =
new Token<ClientToAMTokenIdentifier>( this.currentAttempt.createClientToken(clientUserName);
new ClientToAMTokenIdentifier( if (attemptClientToAMToken != null) {
currentApplicationAttemptId, clientUserName), clientToAMToken = BuilderUtils.newClientToAMToken(
rmContext.getClientToAMTokenSecretManager()); attemptClientToAMToken.getIdentifier(),
clientToAMToken = BuilderUtils.newClientToAMToken( attemptClientToAMToken.getKind().toString(),
attemptClientToAMToken.getIdentifier(), attemptClientToAMToken.getPassword(),
attemptClientToAMToken.getKind().toString(), attemptClientToAMToken.getService().toString());
attemptClientToAMToken.getPassword(), }
attemptClientToAMToken.getService().toString());
} }
host = this.currentAttempt.getHost(); host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort(); rpcPort = this.currentAttempt.getRpcPort();

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/** /**
@ -155,6 +156,13 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/ */
SecretKey getClientTokenMasterKey(); SecretKey getClientTokenMasterKey();
/**
* Create a token for authenticating a client connection to the app attempt
* @param clientName the name of the client requesting the token
* @return the token or null if the attempt is not running
*/
Token<ClientToAMTokenIdentifier> createClientToken(String clientName);
/** /**
* Get application container and resource usage information. * Get application container and resource usage information.
* @return an ApplicationResourceUsageReport object. * @return an ApplicationResourceUsageReport object.

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
@ -89,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@ -508,6 +510,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
return this.amrmToken; return this.amrmToken;
} }
@Override
public Token<ClientToAMTokenIdentifier> createClientToken(String client) {
this.readLock.lock();
try {
Token<ClientToAMTokenIdentifier> token = null;
ClientToAMTokenSecretManagerInRM secretMgr =
this.rmContext.getClientToAMTokenSecretManager();
if (client != null &&
secretMgr.getMasterKey(this.applicationAttemptId) != null) {
token = new Token<ClientToAMTokenIdentifier>(
new ClientToAMTokenIdentifier(this.applicationAttemptId, client),
secretMgr);
}
return token;
} finally {
this.readLock.unlock();
}
}
@Override @Override
public String getDiagnostics() { public String getDiagnostics() {
this.readLock.lock(); this.readLock.lock();

View File

@ -19,14 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp; package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.junit.Assume.assumeTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -57,11 +63,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class TestRMAppTransitions { public class TestRMAppTransitions {
static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class); static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
private boolean isSecurityEnabled;
private Configuration conf;
private RMContext rmContext; private RMContext rmContext;
private static int maxAppAttempts = private static int maxAppAttempts =
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
@ -132,10 +143,29 @@ public class TestRMAppTransitions {
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {
} }
} }
@Parameterized.Parameters
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][] {
{ Boolean.FALSE },
{ Boolean.TRUE }
});
}
public TestRMAppTransitions(boolean isSecurityEnabled) {
this.isSecurityEnabled = isSecurityEnabled;
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = new Configuration(); conf = new YarnConfiguration();
AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE;
if (isSecurityEnabled) {
authMethod = AuthenticationMethod.KERBEROS;
}
SecurityUtil.setAuthenticationMethod(authMethod, conf);
UserGroupInformation.setConfiguration(conf);
rmDispatcher = new DrainDispatcher(); rmDispatcher = new DrainDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class); mock(ContainerAllocationExpirer.class);
@ -171,7 +201,6 @@ public class TestRMAppTransitions {
String user = MockApps.newUserName(); String user = MockApps.newUserName();
String name = MockApps.newAppName(); String name = MockApps.newAppName();
String queue = MockApps.newQueue(); String queue = MockApps.newQueue();
Configuration conf = new YarnConfiguration();
// ensure max application attempts set to known value // ensure max application attempts set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts);
YarnScheduler scheduler = mock(YarnScheduler.class); YarnScheduler scheduler = mock(YarnScheduler.class);
@ -191,6 +220,8 @@ public class TestRMAppTransitions {
System.currentTimeMillis(), "YARN"); System.currentTimeMillis(), "YARN");
testAppStartState(applicationId, user, name, queue, application); testAppStartState(applicationId, user, name, queue, application);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
application);
return application; return application;
} }
@ -488,8 +519,6 @@ public class TestRMAppTransitions {
// SUBMITTED => KILLED event RMAppEventType.KILL // SUBMITTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEvent event = new RMAppEvent(application.getApplicationId(),
RMAppEventType.KILL); RMAppEventType.KILL);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
application);
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertKilled(application); assertKilled(application);
@ -535,8 +564,6 @@ public class TestRMAppTransitions {
// ACCEPTED => KILLED event RMAppEventType.KILL // ACCEPTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEvent event = new RMAppEvent(application.getApplicationId(),
RMAppEventType.KILL); RMAppEventType.KILL);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
application);
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertKilled(application); assertKilled(application);
@ -731,4 +758,33 @@ public class TestRMAppTransitions {
report = app.createAndGetApplicationReport("clientuser", true); report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNotNull(report.getApplicationResourceUsageReport()); Assert.assertNotNull(report.getApplicationResourceUsageReport());
} }
@Test
public void testClientTokens() throws Exception {
assumeTrue(isSecurityEnabled);
RMApp app = createNewTestApp(null);
assertAppState(RMAppState.NEW, app);
ApplicationReport report = app.createAndGetApplicationReport(null, true);
Assert.assertNull(report.getClientToAMToken());
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNull(report.getClientToAMToken());
app = testCreateAppRunning(null);
rmDispatcher.await();
assertAppState(RMAppState.RUNNING, app);
report = app.createAndGetApplicationReport(null, true);
Assert.assertNull(report.getClientToAMToken());
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNotNull(report.getClientToAMToken());
// kill the app attempt and verify client token is unavailable
app.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
rmDispatcher.await();
assertAppAndAttemptKilled(app);
report = app.createAndGetApplicationReport(null, true);
Assert.assertNull(report.getClientToAMToken());
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNull(report.getClientToAMToken());
}
} }

View File

@ -30,13 +30,17 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -85,7 +89,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class TestRMAppAttemptTransitions { public class TestRMAppAttemptTransitions {
private static final Log LOG = private static final Log LOG =
@ -95,6 +102,7 @@ public class TestRMAppAttemptTransitions {
private static final String RM_WEBAPP_ADDR = private static final String RM_WEBAPP_ADDR =
YarnConfiguration.getRMWebAppHostAndPort(new Configuration()); YarnConfiguration.getRMWebAppHostAndPort(new Configuration());
private boolean isSecurityEnabled;
private RMContext rmContext; private RMContext rmContext;
private YarnScheduler scheduler; private YarnScheduler scheduler;
private ApplicationMasterService masterService; private ApplicationMasterService masterService;
@ -162,8 +170,26 @@ public class TestRMAppAttemptTransitions {
private ApplicationSubmissionContext submissionContext = null; private ApplicationSubmissionContext submissionContext = null;
private boolean unmanagedAM; private boolean unmanagedAM;
@Parameterized.Parameters
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][] {
{ Boolean.FALSE },
{ Boolean.TRUE }
});
}
public TestRMAppAttemptTransitions(Boolean isSecurityEnabled) {
this.isSecurityEnabled = isSecurityEnabled;
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE;
if (isSecurityEnabled) {
authMethod = AuthenticationMethod.KERBEROS;
}
SecurityUtil.setAuthenticationMethod(authMethod, conf);
UserGroupInformation.setConfiguration(conf);
InlineDispatcher rmDispatcher = new InlineDispatcher(); InlineDispatcher rmDispatcher = new InlineDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = ContainerAllocationExpirer containerAllocationExpirer =
@ -270,7 +296,9 @@ public class TestRMAppAttemptTransitions {
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
verify(clientToAMTokenManager).registerApplication( verify(clientToAMTokenManager).registerApplication(
applicationAttempt.getAppAttemptId()); applicationAttempt.getAppAttemptId());
assertNotNull(applicationAttempt.createClientToken("some client"));
} }
assertNull(applicationAttempt.createClientToken(null));
assertNotNull(applicationAttempt.getAMRMToken()); assertNotNull(applicationAttempt.getAMRMToken());
// Check events // Check events
verify(masterService). verify(masterService).
@ -883,6 +911,9 @@ public class TestRMAppAttemptTransitions {
verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId); verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
verify(clientToAMTokenManager, times(count)).unRegisterApplication(appAttemptId); verify(clientToAMTokenManager, times(count)).unRegisterApplication(appAttemptId);
if (count > 0) {
assertNull(applicationAttempt.createClientToken("client"));
}
} }
} }
} }