YARN-643. Fixed ResourceManager to remove all tokens consistently on app finish. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1515256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-08-19 01:53:34 +00:00
parent b7fb6fd6c4
commit 79a184505d
4 changed files with 68 additions and 19 deletions

View File

@ -74,6 +74,9 @@ Release 2.1.1-beta - UNRELEASED
forceKillApplication on non-running and finished applications. (Xuan Gong forceKillApplication on non-running and finished applications. (Xuan Gong
via vinodkv) via vinodkv)
YARN-643. Fixed ResourceManager to remove all tokens consistently on app
finish. (Xuan Gong via vinodkv)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -761,6 +761,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
rejectedEvent.getApplicationAttemptId().getApplicationId(), rejectedEvent.getApplicationAttemptId().getApplicationId(),
message) message)
); );
appAttempt.removeTokens(appAttempt);
} }
} }
@ -847,7 +849,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override @Override
public void transition(RMAppAttemptImpl appAttempt, public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) { RMAppAttemptEvent event) {
ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
// Tell the AMS. Unregister from the ApplicationMasterService // Tell the AMS. Unregister from the ApplicationMasterService
@ -894,9 +895,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId, appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
finalAttemptState)); finalAttemptState));
// Remove the AppAttempt from the AMRMTokenSecretManager appAttempt.removeTokens(appAttempt);
appAttempt.rmContext.getAMRMTokenSecretManager()
.applicationMasterFinished(appAttemptId);
} }
} }
@ -1015,7 +1014,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
" exitCode: " + status.getExitStatus() + " exitCode: " + status.getExitStatus() +
" due to: " + status.getDiagnostics() + "." + " due to: " + status.getDiagnostics() + "." +
"Failing this attempt."); "Failing this attempt.");
// Tell the app, scheduler // Tell the app, scheduler
super.transition(appAttempt, finishEvent); super.transition(appAttempt, finishEvent);
} }
@ -1042,12 +1040,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.rmContext.getAMFinishingMonitor().unregister( appAttempt.rmContext.getAMFinishingMonitor().unregister(
appAttempt.getAppAttemptId()); appAttempt.getAppAttemptId());
// Unregister from the ClientToAMTokenSecretManager
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
.unRegisterApplication(appAttempt.getAppAttemptId());
}
if(!appAttempt.submissionContext.getUnmanagedAM()) { if(!appAttempt.submissionContext.getUnmanagedAM()) {
// Tell the launcher to cleanup. // Tell the launcher to cleanup.
appAttempt.eventHandler.handle(new AMLauncherEvent( appAttempt.eventHandler.handle(new AMLauncherEvent(
@ -1116,10 +1108,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId); appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
// Remove the AppAttempt from the AMRMTokenSecretManager
appAttempt.rmContext.getAMRMTokenSecretManager()
.applicationMasterFinished(appAttemptId);
appAttempt.progress = 1.0f; appAttempt.progress = 1.0f;
RMAppAttemptUnregistrationEvent unregisterEvent RMAppAttemptUnregistrationEvent unregisterEvent
@ -1267,4 +1255,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
+ " MasterContainer: " + masterContainer); + " MasterContainer: " + masterContainer);
store.storeApplicationAttempt(this); store.storeApplicationAttempt(this);
} }
private void removeTokens(RMAppAttemptImpl appAttempt) {
// Unregister from the ClientToAMTokenSecretManager
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
.unRegisterApplication(appAttempt.getAppAttemptId());
}
// Remove the AppAttempt from the AMRMTokenSecretManager
appAttempt.rmContext.getAMRMTokenSecretManager()
.applicationMasterFinished(appAttempt.getAppAttemptId());
}
} }

View File

@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -35,6 +36,7 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -103,6 +105,11 @@ public class TestRMAppAttemptTransitions {
private RMApp application; private RMApp application;
private RMAppAttempt applicationAttempt; private RMAppAttempt applicationAttempt;
private Configuration conf = new Configuration();
private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
spy(new ClientToAMTokenSecretManagerInRM());
private final class TestApplicationAttemptEventDispatcher implements private final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> { EventHandler<RMAppAttemptEvent> {
@ -163,14 +170,13 @@ public class TestRMAppAttemptTransitions {
mock(ContainerAllocationExpirer.class); mock(ContainerAllocationExpirer.class);
amLivelinessMonitor = mock(AMLivelinessMonitor.class); amLivelinessMonitor = mock(AMLivelinessMonitor.class);
amFinishingMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class);
Configuration conf = new Configuration();
rmContext = rmContext =
new RMContextImpl(rmDispatcher, new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf), null, amRMTokenManager,
new RMContainerTokenSecretManager(conf), new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf), new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()); clientToAMTokenManager);
RMStateStore store = mock(RMStateStore.class); RMStateStore store = mock(RMStateStore.class);
((RMContextImpl) rmContext).setStateStore(store); ((RMContextImpl) rmContext).setStateStore(store);
@ -261,7 +267,11 @@ public class TestRMAppAttemptTransitions {
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
if (UserGroupInformation.isSecurityEnabled()) {
verify(clientToAMTokenManager).registerApplication(
applicationAttempt.getAppAttemptId());
}
assertNotNull(applicationAttempt.getAMRMToken());
// Check events // Check events
verify(masterService). verify(masterService).
registerAppAttempt(applicationAttempt.getAppAttemptId()); registerAppAttempt(applicationAttempt.getAppAttemptId());
@ -288,6 +298,7 @@ public class TestRMAppAttemptTransitions {
// this works for unmanaged and managed AM's because this is actually doing // this works for unmanaged and managed AM's because this is actually doing
// verify(application).handle(anyObject()); // verify(application).handle(anyObject());
verify(application).handle(any(RMAppRejectedEvent.class)); verify(application).handle(any(RMAppRejectedEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
/** /**
@ -303,6 +314,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
/** /**
@ -377,6 +389,8 @@ public class TestRMAppAttemptTransitions {
// Check events // Check events
verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class)); verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
/** /**
@ -422,6 +436,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getTrackingUrl()); applicationAttempt.getTrackingUrl());
assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 0);
} }
/** /**
@ -442,6 +457,7 @@ public class TestRMAppAttemptTransitions {
.getJustFinishedContainers().size()); .getJustFinishedContainers().size());
assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
@ -592,6 +608,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptId(), applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL)); RMAppAttemptEventType.KILL));
testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS); testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
@Test @Test
@ -666,6 +683,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptId(), cs)); applicationAttempt.getAppAttemptId(), cs));
assertEquals(RMAppAttemptState.FAILED, assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
@Test @Test
@ -709,6 +727,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptId().getApplicationId()); applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
@Test(timeout=10000) @Test(timeout=10000)
@ -725,6 +744,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptId().getApplicationId()); applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
@Test(timeout=20000) @Test(timeout=20000)
@ -742,6 +762,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptId().getApplicationId()); applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
} }
@Test @Test
@ -848,4 +869,10 @@ public class TestRMAppAttemptTransitions {
diagnostics, 0); diagnostics, 0);
} }
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
verify(clientToAMTokenManager, times(count)).unRegisterApplication(appAttemptId);
}
}
} }

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -46,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -80,6 +84,7 @@ public class TestAMRMTokens {
* *
* @throws Exception * @throws Exception
*/ */
@SuppressWarnings("unchecked")
@Test @Test
public void testTokenExpiry() throws Exception { public void testTokenExpiry() throws Exception {
@ -134,6 +139,20 @@ public class TestAMRMTokens {
finishAMRequest.setTrackingUrl("url"); finishAMRequest.setTrackingUrl("url");
rmClient.finishApplicationMaster(finishAMRequest); rmClient.finishApplicationMaster(finishAMRequest);
// Send RMAppAttemptEventType.CONTAINER_FINISHED to transit RMAppAttempt
// from Finishing state to Finished State. Both AMRMToken and
// ClientToAMToken will be removed.
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(attempt.getMasterContainer().getId(),
ContainerState.COMPLETE,
"AM Container Finished", 0);
rm.getRMContext()
.getDispatcher()
.getEventHandler()
.handle(
new RMAppAttemptContainerFinishedEvent(applicationAttemptId,
containerStatus));
// Now simulate trying to allocate. RPC call itself should throw auth // Now simulate trying to allocate. RPC call itself should throw auth
// exception. // exception.
rpc.stopProxy(rmClient, conf); // To avoid using cached client rpc.stopProxy(rmClient, conf); // To avoid using cached client