diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f8ab27fe4af..2f5a538ea8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1405,7 +1405,7 @@ public class RMAppImpl implements RMApp, Recoverable { app.rmContext.getSystemMetricsPublisher() .appFinished(app, finalState, app.finishTime); // set the memory free - app.submissionContext.getAMContainerSpec().setTokensConf(null); + app.clearUnusedFields(); }; } @@ -1960,4 +1960,13 @@ public class RMAppImpl implements RMApp, Recoverable { public void setApplicationPriority(Priority applicationPriority) { this.applicationPriority = applicationPriority; } + + /** + * Clear Unused fields to free memory. + * @param app + */ + private void clearUnusedFields() { + this.submissionContext.setAMContainerSpec(null); + this.submissionContext.setLogAggregationContext(null); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockMemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockMemoryRMStateStore.java new file mode 100644 index 00000000000..698f1c66780 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockMemoryRMStateStore.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Test helper for MemoryRMStateStore will make sure the event. + */ +public class MockMemoryRMStateStore extends MemoryRMStateStore { + + private Map appSubCtxtCopy = + new HashMap(); + + @SuppressWarnings("rawtypes") + @Override + protected EventHandler getRMStateStoreEventHandler() { + return rmStateStoreEventHandler; + } + + @Override + public synchronized RMState loadState() throws Exception { + + RMState cloneState = super.loadState(); + + for(Entry state : + cloneState.getApplicationState().entrySet()) { + ApplicationStateData oldStateData = state.getValue(); + oldStateData.setApplicationSubmissionContext( + this.appSubCtxtCopy.get(state.getKey())); + cloneState.getApplicationState().put(state.getKey(), oldStateData); + } + return cloneState; + } + + @Override + public synchronized void storeApplicationStateInternal( + ApplicationId appId, ApplicationStateData appState) + throws Exception { + // Clone Application Submission Context + this.cloneAppSubmissionContext(appState); + super.storeApplicationStateInternal(appId, appState); + } + + @Override + public synchronized void updateApplicationStateInternal( + ApplicationId appId, ApplicationStateData appState) + throws Exception { + // Clone Application Submission Context + this.cloneAppSubmissionContext(appState); + super.updateApplicationStateInternal(appId, appState); + } + + /** + * Clone Application Submission Context and Store in Map for + * later use. + * + * @param appState + */ + private void cloneAppSubmissionContext(ApplicationStateData appState) { + ApplicationSubmissionContext oldAppSubCtxt = + appState.getApplicationSubmissionContext(); + ApplicationSubmissionContext context = + ApplicationSubmissionContext.newInstance( + oldAppSubCtxt.getApplicationId(), + oldAppSubCtxt.getApplicationName(), + oldAppSubCtxt.getQueue(), + oldAppSubCtxt.getPriority(), + oldAppSubCtxt.getAMContainerSpec(), + oldAppSubCtxt.getUnmanagedAM(), + oldAppSubCtxt.getCancelTokensWhenComplete(), + oldAppSubCtxt.getMaxAppAttempts(), + oldAppSubCtxt.getResource() + ); + context.setAttemptFailuresValidityInterval( + oldAppSubCtxt.getAttemptFailuresValidityInterval()); + context.setKeepContainersAcrossApplicationAttempts( + oldAppSubCtxt.getKeepContainersAcrossApplicationAttempts()); + context.setAMContainerResourceRequests( + oldAppSubCtxt.getAMContainerResourceRequests()); + context.setLogAggregationContext(oldAppSubCtxt.getLogAggregationContext()); + context.setApplicationType(oldAppSubCtxt.getApplicationType()); + this.appSubCtxtCopy.put(oldAppSubCtxt.getApplicationId(), context); + } + + /** + * Traverse each app state and replace cloned app sub context + * into the state. + * + * @param actualState + * @return actualState + */ + @VisibleForTesting + public RMState reloadStateWithClonedAppSubCtxt(RMState actualState) { + for(Entry state : + actualState.getApplicationState().entrySet()) { + ApplicationStateData oldStateData = state.getValue(); + oldStateData.setApplicationSubmissionContext( + this.appSubCtxtCopy.get(state.getKey())); + actualState.getApplicationState().put(state.getKey(), + oldStateData); + } + return actualState; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 90507f34686..acbcac0f9ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -158,7 +158,7 @@ public class MockRM extends ResourceManager { } else { Class storeClass = getRMContext().getStateStore().getClass(); if (storeClass.equals(MemoryRMStateStore.class)) { - MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore(); + MockMemoryRMStateStore mockStateStore = new MockMemoryRMStateStore(); mockStateStore.init(conf); setRMStateStore(mockStateStore); } else if (storeClass.equals(NullRMStateStore.class)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMMemoryStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMMemoryStateStore.java deleted file mode 100644 index d88ee1efc65..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMMemoryStateStore.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; - -/** - * Test helper for MemoryRMStateStore will make sure the event. - */ -public class MockRMMemoryStateStore extends MemoryRMStateStore { - @SuppressWarnings("rawtypes") - @Override - protected EventHandler getRMStateStoreEventHandler() { - return rmStateStoreEventHandler; - } -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index ebca7a354fb..c12ae3385d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -293,6 +293,8 @@ public class TestApplicationCleanup { // start RM MockRM rm1 = new MockRM(conf); rm1.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -304,7 +306,7 @@ public class TestApplicationCleanup { rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); // start new RM - MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()); + MockRM rm2 = new MockRM(conf, memStore); rm2.start(); // nm1 register to rm2, and do a heartbeat diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java index ba9de6c8d36..11fe0561769 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java @@ -141,6 +141,8 @@ public class TestContainerResourceUsage { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); MockRM rm0 = new MockRM(conf); rm0.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm0.getRMStateStore(); MockNM nm = new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService()); nm.registerNode(); @@ -227,7 +229,7 @@ public class TestContainerResourceUsage { vcoreSeconds, metricsBefore.getVcoreSeconds()); // create new RM to represent RM restart. Load up the state store. - MockRM rm1 = new MockRM(conf, rm0.getRMStateStore()); + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); RMApp app0After = rm1.getRMContext().getRMApps().get(app0.getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index bf15eba856f..a07f7d6f2e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -413,7 +413,7 @@ public class TestRMHA { configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); Configuration conf = new YarnConfiguration(configuration); - MemoryRMStateStore memStore = new MockRMMemoryStateStore() { + MemoryRMStateStore memStore = new MockMemoryRMStateStore() { int count = 0; @Override @@ -463,7 +463,7 @@ public class TestRMHA { configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); Configuration conf = new YarnConfiguration(configuration); - MemoryRMStateStore memStore = new MockRMMemoryStateStore() { + MemoryRMStateStore memStore = new MockMemoryRMStateStore() { @Override public void updateApplicationState(ApplicationStateData appState) { notifyStoreOperationFailed(new StoreFencedException()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 94265ac1ed8..e99bcde848c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -190,7 +190,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // PHASE 1: create RM and get state MockRM rm1 = createMockRM(conf); - MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = memStore.getState().getApplicationState(); @@ -669,7 +670,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { @Test (timeout = 60000) public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - MemoryRMStateStore memStore = new MockRMMemoryStateStore() { + MemoryRMStateStore memStore = new MockMemoryRMStateStore() { int count = 0; @Override @@ -724,7 +725,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); // create RM MockRM rm1 = createMockRM(conf); - MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = memStore.getState().getApplicationState(); // start RM @@ -770,7 +772,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); // create RM MockRM rm1 = createMockRM(conf); - MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = memStore.getState().getApplicationState(); // start RM @@ -814,18 +817,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { @Test (timeout = 60000) public void testRMRestartKilledAppWithNoAttempts() throws Exception { - MemoryRMStateStore memStore = new MockRMMemoryStateStore() { + MockMemoryRMStateStore memStore = new MockMemoryRMStateStore() { @Override public synchronized void storeApplicationAttemptStateInternal( - ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception { + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateData attemptState) throws Exception { // ignore attempt saving request. } @Override public synchronized void updateApplicationAttemptStateInternal( - ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception { + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateData attemptState) throws Exception { // ignore attempt saving request. } }; @@ -858,7 +861,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); // PHASE 1: create RM and get state MockRM rm1 = createMockRM(conf); - MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = memStore.getState().getApplicationState(); @@ -916,6 +920,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); + // a succeeded app. RMApp app0 = rm1.submitApp(200, "name", "user", null, false, "default", 1, null, "myType"); @@ -943,7 +950,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { .appCreated(any(RMApp.class), anyLong()); // restart rm - MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) { + MockRM rm2 = new MockRM(conf, memStore) { @Override protected RMAppManager createRMAppManager() { return spy(super.createRMAppManager()); @@ -1554,7 +1561,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // start RM MockRM rm1 = createMockRM(conf); rm1.start(); - MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); RMState rmState = memStore.getState(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1593,7 +1601,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // This is to test RM does not get hang on shutdown. @Test (timeout = 10000) public void testRMShutdown() throws Exception { - MemoryRMStateStore memStore = new MockRMMemoryStateStore() { + MemoryRMStateStore memStore = new MockMemoryRMStateStore() { @Override public synchronized void checkVersion() throws Exception { @@ -1672,7 +1680,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { } }; rm1.start(); - MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); RMApp app1 = null; try { app1 = rm1.submitApp(200, "name", "user", @@ -1696,7 +1705,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { @Test (timeout = 20000) public void testAppRecoveredInOrderOnRMRestart() throws Exception { - MemoryRMStateStore memStore = new MockRMMemoryStateStore(); + MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); for (int i = 10; i > 0; i--) { @@ -2334,6 +2343,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MockRM rm1 = new MockRM(conf); rm1.start(); CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -2370,7 +2381,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MockRM rm2 = null; // start RM2 try { - rm2 = new MockRM(conf, rm1.getRMStateStore()); + rm2 = new MockRM(conf, memStore); rm2.start(); Assert.assertTrue("RM start successfully", true); } catch (Exception e) { @@ -2471,6 +2482,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { } }; rm1.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); // add node label "x" and set node to label mapping Set clusterNodeLabels = new HashSet(); @@ -2497,7 +2510,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false); MockRM rm2 = new MockRM( TestUtils.getConfigurationWithDefaultQueueLabels(conf), - rm1.getRMStateStore()) { + memStore) { @Override protected RMNodeLabelsManager createNodeLabelManager() { RMNodeLabelsManager mgr = new RMNodeLabelsManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 2c37f44e416..eb73db16efb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -568,6 +568,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase conf.set(YarnConfiguration.YARN_ADMIN_ACL, ""); rm1 = new MockRM(conf); rm1.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -579,7 +581,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST}); final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST; csConf.setCapacity(noQueue, 100); - rm2 = new MockRM(csConf, rm1.getRMStateStore()); + rm2 = new MockRM(csConf, memStore); rm2.start(); UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2"); @@ -717,11 +719,15 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase private void verifyAppRecoveryWithWrongQueueConfig( CapacitySchedulerConfiguration csConf, RMApp app, String diagnostics, - MemoryRMStateStore memStore, RMState state) throws Exception { + MockMemoryRMStateStore memStore, RMState state) throws Exception { // Restart RM with fail-fast as false. App should be killed. csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, false); rm2 = new MockRM(csConf, memStore); rm2.start(); + + MockMemoryRMStateStore memStore2 = + (MockMemoryRMStateStore) rm2.getRMStateStore(); + // Wait for app to be killed. rm2.waitForState(app.getApplicationId(), RMAppState.KILLED); ApplicationReport report = rm2.getApplicationReport(app.getApplicationId()); @@ -730,24 +736,27 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase assertEquals(report.getYarnApplicationState(), YarnApplicationState.KILLED); assertEquals(report.getDiagnostics(), diagnostics); + //Reload previous state with cloned app sub context object + RMState newState = memStore2.reloadStateWithClonedAppSubCtxt(state); + // Remove updated app info(app being KILLED) from state store and reinstate // state store to previous state i.e. which indicates app is RUNNING. // This is to simulate app recovery with fail fast config as true. for(Map.Entry entry : - state.getApplicationState().entrySet()) { + newState.getApplicationState().entrySet()) { ApplicationStateData appState = mock(ApplicationStateData.class); ApplicationSubmissionContext ctxt = mock(ApplicationSubmissionContext.class); when(appState.getApplicationSubmissionContext()).thenReturn(ctxt); when(ctxt.getApplicationId()).thenReturn(entry.getKey()); - memStore.removeApplicationStateInternal(appState); - memStore.storeApplicationStateInternal( + memStore2.removeApplicationStateInternal(appState); + memStore2.storeApplicationStateInternal( entry.getKey(), entry.getValue()); } // Now restart RM with fail-fast as true. QueueException should be thrown. csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true); - MockRM rm = new MockRM(csConf, memStore); + MockRM rm = new MockRM(csConf, memStore2); try { rm.start(); Assert.fail("QueueException must have been thrown"); @@ -777,6 +786,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase setupQueueConfiguration(csConf); rm1 = new MockRM(csConf); rm1.start(); + + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm = new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); nm.registerNode(); @@ -797,7 +809,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase String diags = "Application killed on recovery as it was submitted to " + "queue QueueB which is no longer a leaf queue after restart."; verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags, - (MemoryRMStateStore) rm1.getRMStateStore(), state); + memStore, state); } //Test behavior of an app if queue is removed during recovery. Test case does @@ -822,6 +834,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase setupQueueConfiguration(csConf); rm1 = new MockRM(csConf); rm1.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); MockNM nm2 = @@ -849,7 +863,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase rm1.clearQueueMetrics(app2); // Take a copy of state store so that it can be reset to this state. - RMState state = rm1.getRMStateStore().loadState(); + RMState state = memStore.loadState(); // Set new configuration with QueueB removed. csConf = new CapacitySchedulerConfiguration(conf); @@ -858,7 +872,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase String diags = "Application killed on recovery as it was submitted to " + "queue QueueB which no longer exists after restart."; verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags, - (MemoryRMStateStore) rm1.getRMStateStore(), state); + memStore, state); } private void checkParentQueue(ParentQueue parentQueue, int numContainers, @@ -927,6 +941,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase public void testContainersNotRecoveredForCompletedApps() throws Exception { rm1 = new MockRM(conf); rm1.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -934,7 +950,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1); - rm2 = new MockRM(conf, rm1.getRMStateStore()); + rm2 = new MockRM(conf, memStore); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); NMContainerStatus runningContainer = @@ -1208,6 +1224,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase // start RM rm1 = new MockRM(conf); rm1.start(); + + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -1226,7 +1245,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase // start new RM - rm2 = new MockRM(conf, rm1.getRMStateStore()); + rm2 = new MockRM(conf, memStore); rm2.start(); am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); @@ -1366,7 +1385,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase // RM should start correctly. @Test (timeout = 20000) public void testAppStateSavedButAttemptStateNotSaved() throws Exception { - MemoryRMStateStore memStore = new MemoryRMStateStore() { + MockMemoryRMStateStore memStore = new MockMemoryRMStateStore() { @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, ApplicationAttemptStateData attemptState) { @@ -1410,6 +1429,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase // start RM rm1 = new MockRM(conf); rm1.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -1434,8 +1455,10 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase } // start new RM - rm2 = new MockRM(conf, rm1.getRMStateStore()); + rm2 = new MockRM(conf, memStore); rm2.start(); + MockMemoryRMStateStore memStore2 = + (MockMemoryRMStateStore) rm2.getRMStateStore(); rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); @@ -1484,7 +1507,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase recoveredApp.getFinalApplicationStatus()); // Restart RM once more to check UAM is not re-run - MockRM rm3 = new MockRM(conf, rm1.getRMStateStore()); + MockRM rm3 = new MockRM(conf, memStore2); rm3.start(); recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId()); Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 528afacea4d..9d0d87979cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -702,8 +703,11 @@ public class TestAMRestart { // explicitly set max-am-retry count as 2. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); MockRM rm1 = new MockRM(conf); - MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); + + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -735,7 +739,6 @@ public class TestAMRestart { RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false); app1.setSystemClock(clock); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - // Fail attempt1 normally nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); @@ -771,8 +774,12 @@ public class TestAMRestart { @SuppressWarnings("resource") MockRM rm2 = new MockRM(conf, memStore); rm2.start(); + + MockMemoryRMStateStore memStore1 = + (MockMemoryRMStateStore) rm2.getRMStateStore(); ApplicationStateData app1State = - memStore.getState().getApplicationState().get(app1.getApplicationId()); + memStore1.getState().getApplicationState(). + get(app1.getApplicationId()); Assert.assertEquals(1, app1State.getFirstAttemptId()); // re-register the NM diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 36375d076dd..e9a74deedc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -28,17 +29,23 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; @@ -46,14 +53,22 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -247,7 +262,113 @@ public class TestRMAppTransitions { rmDispatcher.start(); } - protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) { + private ByteBuffer getTokens() throws IOException { + Credentials ts = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + return securityTokens; + } + + private ByteBuffer getTokensConf() throws IOException { + + DataOutputBuffer dob = new DataOutputBuffer(); + Configuration appConf = new Configuration(false); + appConf.clear(); + appConf.set("dfs.nameservices", "mycluster1,mycluster2"); + appConf.set("dfs.namenode.rpc-address.mycluster2.nn1", + "123.0.0.1"); + appConf.set("dfs.namenode.rpc-address.mycluster3.nn2", + "123.0.0.2"); + appConf.write(dob); + ByteBuffer tokenConf = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + return tokenConf; + } + + private Map getLocalResources() + throws UnsupportedFileSystemException { + FileContext localFS = FileContext.getLocalFSFileContext(); + File tmpDir = new File("target"); + File scriptFile = new File(tmpDir, "scriptFile.sh"); + URL resourceURL = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource localRes = + Records.newRecord(LocalResource.class); + localRes.setResource(resourceURL); + localRes.setSize(-1); + localRes.setVisibility(LocalResourceVisibility.APPLICATION); + localRes.setType(LocalResourceType.FILE); + localRes.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, localRes); + return localResources; + } + + private Map getEnvironment() { + Map userSetEnv = new HashMap(); + userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id"); + userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST"); + userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT"); + userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT"); + userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR"); + userSetEnv.put(Environment.USER.key(), "user_set_" + + Environment.USER.key()); + userSetEnv.put(Environment.LOGNAME.name(), "user_set_LOGNAME"); + userSetEnv.put(Environment.PWD.name(), "user_set_PWD"); + userSetEnv.put(Environment.HOME.name(), "user_set_HOME"); + return userSetEnv; + } + + private ContainerRetryContext getContainerRetryContext() { + ContainerRetryContext containerRetryContext = ContainerRetryContext + .newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, + new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0); + return containerRetryContext; + } + + private Map getServiceData() { + Map serviceData = new HashMap(); + String serviceName = "non_exist_auxService"; + serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes())); + return serviceData; + } + + private ContainerLaunchContext prepareContainerLaunchContext() + throws IOException { + + ContainerLaunchContext clc = + Records.newRecord(ContainerLaunchContext.class); + clc.setCommands(Arrays.asList("/bin/sleep 5")); + if (UserGroupInformation.isSecurityEnabled()) { + clc.setTokens(getTokens()); + clc.setTokensConf(getTokensConf()); + } + clc.setLocalResources(getLocalResources()); + clc.setEnvironment(getEnvironment()); + clc.setContainerRetryContext(getContainerRetryContext()); + clc.setServiceData(getServiceData()); + return clc; + } + + private LogAggregationContext getLogAggregationContext() { + LogAggregationContext logAggregationContext = + LogAggregationContext.newInstance( + "includePattern", "excludePattern", + "rolledLogsIncludePattern", + "rolledLogsExcludePattern", + "policyClass", + "policyParameters"); + return logAggregationContext; + } + + protected RMApp createNewTestApp(ApplicationSubmissionContext + submissionContext) throws IOException { ApplicationId applicationId = MockApps.newAppID(appId++); String user = MockApps.newUserName(); String name = MockApps.newAppName(); @@ -266,7 +387,9 @@ public class TestRMAppTransitions { // but applicationId is still set for safety submissionContext.setApplicationId(applicationId); submissionContext.setPriority(Priority.newInstance(0)); - submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class)); + submissionContext.setAMContainerSpec(prepareContainerLaunchContext()); + submissionContext.setLogAggregationContext(getLogAggregationContext()); + RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, System.currentTimeMillis(), "YARN", null, @@ -401,6 +524,7 @@ public class TestRMAppTransitions { // verify sendATSCreateEvent() is get called during // AddApplicationToSchedulerTransition. verify(publisher).appCreated(eq(application), anyLong()); + verifyRMAppFieldsForNonFinalTransitions(application); return application; } @@ -418,6 +542,7 @@ public class TestRMAppTransitions { application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); + verifyRMAppFieldsForNonFinalTransitions(application); return application; } @@ -526,6 +651,7 @@ public class TestRMAppTransitions { assertFailed(application, ".*Unmanaged application.*Failing the application.*"); assertAppFinalStateSaved(application); + verifyRMAppFieldsForFinalTransitions(application); } @Test @@ -535,6 +661,7 @@ public class TestRMAppTransitions { RMApp application = testCreateAppFinished(null, diagMsg); Assert.assertTrue("Finished application missing diagnostics", application.getDiagnostics().indexOf(diagMsg) != -1); + verifyRMAppFieldsForFinalTransitions(application); } @Test (timeout = 30000) @@ -542,15 +669,7 @@ public class TestRMAppTransitions { LOG.info("--- START: testAppRecoverPath ---"); ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class); - ContainerLaunchContext clc = - Records.newRecord(ContainerLaunchContext.class); - Credentials credentials = new Credentials(); - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = - ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - clc.setTokens(securityTokens); - sub.setAMContainerSpec(clc); + sub.setAMContainerSpec(prepareContainerLaunchContext()); testCreateAppSubmittedRecovery(sub); } @@ -573,6 +692,7 @@ public class TestRMAppTransitions { assertAppFinalStateNotSaved(application); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); + verifyRMAppFieldsForFinalTransitions(application); } @Test @@ -590,6 +710,7 @@ public class TestRMAppTransitions { assertFailed(application, rejectedText); assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.FAILED); + verifyRMAppFieldsForFinalTransitions(application); } @Test (timeout = 30000) @@ -607,6 +728,7 @@ public class TestRMAppTransitions { assertFailed(application, rejectedText); assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.FAILED); + verifyRMAppFieldsForFinalTransitions(application); rmContext.getStateStore().removeApplication(application); } @@ -629,6 +751,7 @@ public class TestRMAppTransitions { assertKilled(application); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); + verifyRMAppFieldsForFinalTransitions(application); } @Test (timeout = 30000) @@ -646,6 +769,7 @@ public class TestRMAppTransitions { assertFailed(application, rejectedText); assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.FAILED); + verifyRMAppFieldsForFinalTransitions(application); } @Test (timeout = 30000) @@ -680,6 +804,7 @@ public class TestRMAppTransitions { assertFailed(application, rejectedText); assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.FAILED); + verifyRMAppFieldsForFinalTransitions(application); } @Test @@ -702,6 +827,7 @@ public class TestRMAppTransitions { assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); + verifyRMAppFieldsForFinalTransitions(application); } @Test @@ -765,8 +891,9 @@ public class TestRMAppTransitions { assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); + verifyRMAppFieldsForFinalTransitions(application); } - + @Test public void testAppAcceptedAttemptKilled() throws IOException, InterruptedException { @@ -812,6 +939,7 @@ public class TestRMAppTransitions { assertKilled(application); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); + verifyRMAppFieldsForFinalTransitions(application); } @Test @@ -869,6 +997,7 @@ public class TestRMAppTransitions { assertFailed(application, ".*Failing the application.*"); assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.FAILED); + verifyRMAppFieldsForFinalTransitions(application); } @Test @@ -910,6 +1039,7 @@ public class TestRMAppTransitions { assertFinalAppStatus(FinalApplicationStatus.FAILED, application); Assert.assertTrue("Finished app missing diagnostics", application .getDiagnostics().indexOf(diagMsg) != -1); + verifyRMAppFieldsForFinalTransitions(application); } @Test @@ -929,6 +1059,7 @@ public class TestRMAppTransitions { Assert.assertEquals("application diagnostics is not correct", "", diag.toString()); verifyApplicationFinished(RMAppState.FINISHED); + verifyRMAppFieldsForFinalTransitions(application); } @Test (timeout = 30000) @@ -958,6 +1089,7 @@ public class TestRMAppTransitions { assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); + verifyRMAppFieldsForFinalTransitions(application); } @Test (timeout = 30000) @@ -1012,6 +1144,7 @@ public class TestRMAppTransitions { assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); + verifyRMAppFieldsForFinalTransitions(application); } @Test(timeout = 30000) @@ -1057,11 +1190,12 @@ public class TestRMAppTransitions { RMAppState finalState = appState.getState(); Assert.assertEquals("Application is not in finalState.", finalState, application.getState()); + verifyRMAppFieldsForFinalTransitions(application); } public void createRMStateForApplications( Map applicationState, - RMAppState rmAppState) { + RMAppState rmAppState) throws IOException { RMApp app = createNewTestApp(null); ApplicationStateData appState = ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), @@ -1071,7 +1205,7 @@ public class TestRMAppTransitions { } @Test - public void testGetAppReport() { + public void testGetAppReport() throws IOException { RMApp app = createNewTestApp(null); assertAppState(RMAppState.NEW, app); ApplicationReport report = app.createAndGetApplicationReport(null, true); @@ -1105,4 +1239,41 @@ public class TestRMAppTransitions { Assert.assertEquals(finalState, appRemovedEvent.getFinalState()); } } + + private void verifyRMAppFieldsForNonFinalTransitions(RMApp application) + throws IOException { + assertEquals(Arrays.asList("/bin/sleep 5"), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getCommands()); + assertEquals(getLocalResources(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getLocalResources()); + if(UserGroupInformation.isSecurityEnabled()) { + assertEquals(getTokens(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getTokens()); + assertEquals(getTokensConf(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getTokensConf()); + } + assertEquals(getEnvironment(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getEnvironment()); + assertEquals(getContainerRetryContext(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getContainerRetryContext()); + assertEquals(getServiceData(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getServiceData()); + assertEquals(getLogAggregationContext(), + application.getApplicationSubmissionContext(). + getLogAggregationContext()); + } + + private void verifyRMAppFieldsForFinalTransitions(RMApp application) { + assertEquals(null, application.getApplicationSubmissionContext(). + getAMContainerSpec()); + assertEquals(null, application.getApplicationSubmissionContext(). + getLogAggregationContext()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index 640293c3fc3..2c52377995a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.MockRMMemoryStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; @@ -78,7 +78,7 @@ public class TestRMDelegationTokens { UserGroupInformation.getLoginUser() .setAuthenticationMethod(AuthenticationMethod.KERBEROS); - MemoryRMStateStore memStore = new MockRMMemoryStateStore(); + MemoryRMStateStore memStore = new MockMemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); @@ -132,7 +132,7 @@ public class TestRMDelegationTokens { // Test all expired keys are removed from state-store. @Test(timeout = 15000) public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { - MemoryRMStateStore memStore = new MockRMMemoryStateStore(); + MemoryRMStateStore memStore = new MockMemoryRMStateStore(); memStore.init(testConf); RMState rmState = memStore.getState();