diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index f0ab324ace8..bbe208dfe20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -257,6 +259,9 @@ public abstract class RMStateStore extends AbstractService { appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { + if (isAppStateFinal(appState)) { + pruneAppState(appState); + } store.updateApplicationStateInternal(appId, appState); if (((RMStateUpdateAppEvent) event).isNotifyApplication()) { store.notifyApplication(new RMAppEvent(appId, @@ -276,7 +281,34 @@ public abstract class RMStateStore extends AbstractService { } } return finalState(isFenced); - }; + } + + private boolean isAppStateFinal(ApplicationStateData appState) { + RMAppState state = appState.getState(); + return state == RMAppState.FINISHED || state == RMAppState.FAILED || + state == RMAppState.KILLED; + } + + private void pruneAppState(ApplicationStateData appState) { + ApplicationSubmissionContext srcCtx = + appState.getApplicationSubmissionContext(); + ApplicationSubmissionContextPBImpl context = + new ApplicationSubmissionContextPBImpl(); + // most fields in the ApplicationSubmissionContext are not needed, + // but the following few need to be present for recovery to succeed + context.setApplicationId(srcCtx.getApplicationId()); + context.setResource(srcCtx.getResource()); + context.setQueue(srcCtx.getQueue()); + context.setAMContainerResourceRequests( + srcCtx.getAMContainerResourceRequests()); + context.setApplicationType(srcCtx.getApplicationType()); + ContainerLaunchContextPBImpl amContainerSpec = + new ContainerLaunchContextPBImpl(); + amContainerSpec.setApplicationACLs( + srcCtx.getAMContainerSpec().getApplicationACLs()); + context.setAMContainerSpec(amContainerSpec); + appState.setApplicationSubmissionContext(context); + } } private static class RemoveAppTransition implements diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 453d805a843..dbb214897a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -162,6 +163,7 @@ public class RMStateStoreTestBase { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appId); + context.setAMContainerSpec(new ContainerLaunchContextPBImpl()); RMApp mockApp = mock(RMApp.class); when(mockApp.getApplicationId()).thenReturn(appId); @@ -378,6 +380,7 @@ public class RMStateStoreTestBase { ApplicationSubmissionContext dummyContext = new ApplicationSubmissionContextPBImpl(); dummyContext.setApplicationId(dummyAppId); + dummyContext.setAMContainerSpec(new ContainerLaunchContextPBImpl()); ApplicationStateData dummyApp = ApplicationStateData.newInstance(appState.getSubmitTime(), appState.getStartTime(), appState.getUser(), dummyContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 6a8f47d085c..0a1b15275cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -35,7 +35,9 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; @@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -83,6 +86,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -845,6 +849,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appId); + context.setAMContainerSpec(new ContainerLaunchContextPBImpl()); appStateNew = createAppState(context, submitTime, startTime, finishTime, true); } else { @@ -1488,4 +1493,65 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { tokensWithIndex, sequenceNumber, 3); store.close(); } + + @Test + public void testAppSubmissionContextIsPrunedInFinalApplicationState() + throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + ApplicationId appId = ApplicationId.fromString("application_1234_0010"); + + Configuration conf = createConfForDelegationTokenNodeSplit(1); + RMStateStore store = zkTester.getRMStateStore(conf); + ApplicationSubmissionContext ctx = + new ApplicationSubmissionContextPBImpl(); + ctx.setApplicationId(appId); + ctx.setQueue("a_queue"); + ContainerLaunchContextPBImpl containerLaunchCtx = + new ContainerLaunchContextPBImpl(); + containerLaunchCtx.setCommands(Collections.singletonList("a_command")); + ctx.setAMContainerSpec(containerLaunchCtx); + Resource resource = new ResourcePBImpl(); + resource.setMemorySize(17L); + ctx.setResource(resource); + Map schedulingPropertiesMap = + Collections.singletonMap("a_key", "a_value"); + ctx.setApplicationSchedulingPropertiesMap(schedulingPropertiesMap); + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(); + appState.setState(RMAppState.RUNNING); + appState.setApplicationSubmissionContext(ctx); + store.storeApplicationStateInternal(appId, appState); + + RMState rmState = store.loadState(); + assertEquals(1, rmState.getApplicationState().size()); + ctx = rmState.getApplicationState().get(appId) + .getApplicationSubmissionContext(); + + appState.setState(RMAppState.RUNNING); + store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null)); + + rmState = store.loadState(); + ctx = rmState.getApplicationState().get(appId) + .getApplicationSubmissionContext(); + + assertEquals("ApplicationSchedulingPropertiesMap should not have been " + + "pruned from the application submission context before the " + + "FINISHED state", + schedulingPropertiesMap, ctx.getApplicationSchedulingPropertiesMap()); + + appState.setState(RMAppState.FINISHED); + store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null)); + + rmState = store.loadState(); + ctx = rmState.getApplicationState().get(appId) + .getApplicationSubmissionContext(); + + assertEquals(appId, ctx.getApplicationId()); + assertEquals("a_queue", ctx.getQueue()); + assertNotNull(ctx.getAMContainerSpec()); + assertEquals(17L, ctx.getResource().getMemorySize()); + assertEquals("ApplicationSchedulingPropertiesMap should have been pruned" + + " from the application submission context when in FINISHED STATE", + Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap()); + store.close(); + } }