diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index de170848667..ecd7c9cd674 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -79,6 +79,9 @@ Release 2.9.0 - UNRELEASED YARN-4417. Make RM and Timeline-server REST APIs more consistent. (wtan via jianhe) + YARN-3480. Remove attempts that are beyond max-attempt limit from state + store. (Jun Gong via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d3924106725..aada69f37ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -817,6 +817,33 @@ public class ResourceManager extends CompositeService implements Recoverable { LOG.error("Error in handling event type " + event.getType() + " for applicationAttempt " + appAttemptId, t); } + } else if (rmApp.getApplicationSubmissionContext() != null + && rmApp.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts() + && event.getType() == RMAppAttemptEventType.CONTAINER_FINISHED) { + // For work-preserving AM restart, failed attempts are still + // capturing CONTAINER_FINISHED events and record the finished + // containers which will be used by current attempt. + // We just keep 'yarn.resourcemanager.am.max-attempts' in + // RMStateStore. If the finished container's attempt is deleted, we + // use the first attempt in app.attempts to deal with these events. + + RMAppAttempt previousFailedAttempt = + rmApp.getAppAttempts().values().iterator().next(); + if (previousFailedAttempt != null) { + try { + LOG.debug("Event " + event.getType() + " handled by " + + previousFailedAttempt); + previousFailedAttempt.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for applicationAttempt " + appAttemptId + + " with " + previousFailedAttempt, t); + } + } else { + LOG.error("Event " + event.getType() + + " not handled, because previousFailedAttempt is null"); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index bd24b25067d..902244b3be2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -230,6 +230,11 @@ public class ResourceTrackerService extends AbstractService implements } RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); + if (rmAppAttempt == null) { + LOG.info("Ignoring not found attempt " + appAttemptId); + return; + } + Container masterContainer = rmAppAttempt.getMasterContainer(); if (masterContainer.getId().equals(containerStatus.getContainerId()) && containerStatus.getContainerState() == ContainerState.COMPLETE) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index a1cebf59e2e..021ca36af04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -481,6 +481,18 @@ public class FileSystemRMStateStore extends RMStateStore { } } + @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) + throws Exception { + Path appDirPath = + getAppDir(rmAppRoot, appAttemptId.getApplicationId()); + Path nodeRemovePath = getNodePath(appDirPath, appAttemptId.toString()); + LOG.info("Removing info for attempt: " + appAttemptId + " at: " + + nodeRemovePath); + deleteFileWithRetries(nodeRemovePath); + } + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index afc6721352c..7ec94fc5ae3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -500,6 +500,22 @@ public class LeveldbRMStateStore extends RMStateStore { return createApplicationState(appId.toString(), data); } + @VisibleForTesting + ApplicationAttemptStateData loadRMAppAttemptState( + ApplicationAttemptId attemptId) throws IOException { + String attemptKey = getApplicationAttemptNodeKey(attemptId); + byte[] data = null; + try { + data = db.get(bytes(attemptKey)); + } catch (DBException e) { + throw new IOException(e); + } + if (data == null) { + return null; + } + return createAttemptState(attemptId.toString(), data); + } + private ApplicationAttemptStateData createAttemptState(String itemName, byte[] data) throws IOException { ApplicationAttemptId attemptId = @@ -574,6 +590,22 @@ public class LeveldbRMStateStore extends RMStateStore { storeApplicationAttemptStateInternal(attemptId, attemptStateData); } + @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) + throws IOException { + String attemptKey = getApplicationAttemptNodeKey(attemptId); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing state for attempt " + attemptId + " at " + + attemptKey); + } + try { + db.delete(bytes(attemptKey)); + } catch (DBException e) { + throw new IOException(e); + } + } + @Override protected void removeApplicationStateInternal(ApplicationStateData appState) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index ce6addbe7b2..caaea7e047b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -141,6 +141,19 @@ public class MemoryRMStateStore extends RMStateStore { appState.attempts.put(attemptState.getAttemptId(), attemptState); } + @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) throws Exception { + ApplicationStateData appState = + state.getApplicationState().get(appAttemptId.getApplicationId()); + ApplicationAttemptStateData attemptState = + appState.attempts.remove(appAttemptId); + LOG.info("Removing state for attempt: " + appAttemptId); + if (attemptState == null) { + throw new YarnRuntimeException("Application doesn't exist"); + } + } + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 96f77f5f77d..f6fd6fe5058 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -131,6 +131,12 @@ public class NullRMStateStore extends RMStateStore { ApplicationAttemptStateData attemptStateData) throws Exception { } + @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) throws Exception { + // Do nothing + } + @Override public void checkVersion() throws Exception { // Do nothing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index ec42cbe3c60..ae17aaa4234 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -135,6 +135,10 @@ public abstract class RMStateStore extends AbstractService { EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.REMOVE_APP_ATTEMPT, + new RemoveAppAttemptTransition()) .addTransition(RMStateStoreState.ACTIVE, EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_MASTERKEY, @@ -552,6 +556,32 @@ public abstract class RMStateStore extends AbstractService { return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE; } + private static class RemoveAppAttemptTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + ApplicationAttemptId attemptId = + ((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId(); + ApplicationId appId = attemptId.getApplicationId(); + LOG.info("Removing attempt " + attemptId + " from app: " + appId); + try { + store.removeApplicationAttemptInternal(attemptId); + } catch (Exception e) { + LOG.error("Error removing attempt: " + attemptId, e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + public RMStateStore() { super(RMStateStore.class.getName()); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -983,6 +1013,29 @@ public abstract class RMStateStore extends AbstractService { protected abstract void removeApplicationStateInternal( ApplicationStateData appState) throws Exception; + /** + * Non-blocking API + * ResourceManager services call this to remove an attempt from the state + * store + * This does not block the dispatcher threads + * There is no notification of completion for this operation. + */ + @SuppressWarnings("unchecked") + public synchronized void removeApplicationAttempt( + ApplicationAttemptId applicationAttemptId) { + dispatcher.getEventHandler().handle( + new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId)); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of specified + * attempt. + */ + protected abstract void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) throws Exception; + + // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-1779 public static final Text AM_RM_TOKEN_SERVICE = new Text( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index 492826de98b..b34634d0659 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -24,6 +24,7 @@ public enum RMStateStoreEventType { UPDATE_APP, UPDATE_APP_ATTEMPT, REMOVE_APP, + REMOVE_APP_ATTEMPT, FENCED, // Below events should be called synchronously diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java new file mode 100644 index 00000000000..7455c39999a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; + +/** + * A event used to remove an attempt. + */ +public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent { + private ApplicationAttemptId applicationAttemptId; + + RMStateStoreRemoveAppAttemptEvent(ApplicationAttemptId applicationAttemptId) { + super(RMStateStoreEventType.REMOVE_APP_ATTEMPT); + this.applicationAttemptId = applicationAttemptId; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index ca0f4ac03c3..ddb8a0bdc46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -658,6 +658,22 @@ public class ZKRMStateStore extends RMStateStore { } } + @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) + throws Exception { + String appId = appAttemptId.getApplicationId().toString(); + String appIdRemovePath = getNodePath(rmAppRoot, appId); + String attemptIdRemovePath = getNodePath(appIdRemovePath, + appAttemptId.toString()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing info for attempt: " + appAttemptId + " at: " + + attemptIdRemovePath); + } + safeDelete(attemptIdRemovePath); + } + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 1d199edccaf..234838084f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -80,6 +80,16 @@ public abstract class ApplicationStateData { return attempts.get(attemptId); } + public int getFirstAttemptId() { + int min = Integer.MAX_VALUE; + for(ApplicationAttemptId attemptId : attempts.keySet()) { + if (attemptId.getAttemptId() < min) { + min = attemptId.getAttemptId(); + } + } + return min == Integer.MAX_VALUE ? 1 : min; + } + public abstract ApplicationStateDataProto getProto(); /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index c4c8d2eb33d..f1d55a4f89d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -149,6 +149,8 @@ public class RMAppImpl implements RMApp, Recoverable { private long startTime; private long finishTime = 0; private long storedFinishTime = 0; + private int firstAttemptIdInStateStore = 1; + private int nextAttemptId = 1; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -809,6 +811,11 @@ public class RMAppImpl implements RMApp, Recoverable { this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); + // If interval > 0, some attempts might have been deleted. + if (submissionContext.getAttemptFailuresValidityInterval() > 0) { + this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); + this.nextAttemptId = firstAttemptIdInStateStore; + } // send the ATS create Event sendATSCreateEvent(this, this.startTime); @@ -822,7 +829,7 @@ public class RMAppImpl implements RMApp, Recoverable { private void createNewAttempt() { ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); + ApplicationAttemptId.newInstance(applicationId, nextAttemptId++); BlacklistManager currentAMBlacklist; if (currentAttempt != null) { @@ -1304,6 +1311,9 @@ public class RMAppImpl implements RMApp, Recoverable { + app.attemptFailuresValidityInterval + " milliseconds " : " ") + "is " + numberOfFailure + ". The max attempts is " + app.maxAppAttempts); + + removeExcessAttempts(app); + if (!app.submissionContext.getUnmanagedAM() && numberOfFailure < app.maxAppAttempts) { if (initialState.equals(RMAppState.KILLING)) { @@ -1340,6 +1350,19 @@ public class RMAppImpl implements RMApp, Recoverable { return RMAppState.FINAL_SAVING; } } + + private void removeExcessAttempts(RMAppImpl app) { + while (app.nextAttemptId - app.firstAttemptIdInStateStore + > app.maxAppAttempts) { + // attempts' first element is oldest attempt because it is a + // LinkedHashMap + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( + app.getApplicationId(), app.firstAttemptIdInStateStore); + app.firstAttemptIdInStateStore++; + LOG.info("Remove attempt from state store : " + attemptId); + app.rmContext.getStateStore().removeApplicationAttempt(attemptId); + } + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 67320391699..f1fe1eabb6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -859,6 +859,9 @@ public class TestAMRestart { @SuppressWarnings("resource") MockRM rm2 = new MockRM(conf, memStore); rm2.start(); + ApplicationStateData app1State = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + Assert.assertEquals(1, app1State.getFirstAttemptId()); // re-register the NM nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -871,6 +874,7 @@ public class TestAMRestart { nm1.registerNode(Collections.singletonList(status), null); rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); + Assert.assertEquals(2, app1State.getAttemptCount()); rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); @@ -884,6 +888,7 @@ public class TestAMRestart { nm1 .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am4.waitForState(RMAppAttemptState.FAILED); + Assert.assertEquals(2, app1State.getAttemptCount()); // can launch the 5th attempt successfully rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); @@ -897,6 +902,7 @@ public class TestAMRestart { nm1 .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am5.waitForState(RMAppAttemptState.FAILED); + Assert.assertEquals(2, app1State.getAttemptCount()); rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); rm1.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 32824ef3a8f..2ddaec217f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -134,6 +135,7 @@ public class RMStateStoreTestBase { void writeVersion(Version version) throws Exception; Version getCurrentVersion() throws Exception; boolean appExists(RMApp app) throws Exception; + boolean attemptExists(RMAppAttempt attempt) throws Exception; } void waitNotify(TestDispatcher dispatcher) { @@ -172,7 +174,7 @@ public class RMStateStoreTestBase { return mockApp; } - protected ContainerId storeAttempt(RMStateStore store, + protected RMAppAttempt storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, String containerIdStr, Token appToken, SecretKey clientTokenMasterKey, TestDispatcher dispatcher) @@ -195,7 +197,7 @@ public class RMStateStoreTestBase { dispatcher.attemptId = attemptId; store.storeNewApplicationAttempt(mockAttempt); waitNotify(dispatcher); - return container.getId(); + return mockAttempt; } void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) @@ -238,8 +240,9 @@ public class RMStateStoreTestBase { clientToAMTokenMgr.createMasterKey(attemptId1); ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", - appAttemptToken1, clientTokenKey1, dispatcher); + "container_1352994193343_0001_01_000001", + appAttemptToken1, clientTokenKey1, dispatcher) + .getMasterContainer().getId(); String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = @@ -252,8 +255,9 @@ public class RMStateStoreTestBase { clientToAMTokenMgr.createMasterKey(attemptId2); ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", - appAttemptToken2, clientTokenKey2, dispatcher); + "container_1352994193343_0001_02_000001", + appAttemptToken2, clientTokenKey2, dispatcher) + .getMasterContainer().getId(); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); @@ -633,6 +637,47 @@ public class RMStateStoreTestBase { Assert.assertTrue(stateStoreHelper.appExists(rmApp2)); } + public void testRemoveAttempt(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + ApplicationId appId = ApplicationId.newInstance(1383183339, 6); + storeApp(store, appId, 123456, 564321); + + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId, 1); + RMAppAttempt attempt1 = storeAttempt(store, attemptId1, + ContainerId.newContainerId(attemptId1, 1).toString(), + null, null, dispatcher); + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId, 2); + RMAppAttempt attempt2 = storeAttempt(store, attemptId2, + ContainerId.newContainerId(attemptId2, 1).toString(), + null, null, dispatcher); + store.removeApplicationAttemptInternal(attemptId1); + Assert.assertFalse(stateStoreHelper.attemptExists(attempt1)); + Assert.assertTrue(stateStoreHelper.attemptExists(attempt2)); + + // let things settle down + Thread.sleep(1000); + store.close(); + + // load state + store = stateStoreHelper.getRMStateStore(); + RMState state = store.loadState(); + Map rmAppState = + state.getApplicationState(); + + ApplicationStateData appState = rmAppState.get(appId); + // app is loaded + assertNotNull(appState); + assertEquals(2, appState.getFirstAttemptId()); + assertNull(appState.getAttempt(attemptId1)); + assertNotNull(appState.getAttempt(attemptId2)); + } + protected void modifyAppState() throws Exception { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index a2ff4b3c1cd..a51ccb53463 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Assert; import org.junit.Test; @@ -88,6 +89,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { Path appDir = new Path(appRootDir, appId); return appDir; } + + public Path getAttemptDir(String appId, String attemptId) { + Path appDir = getAppDir(appId); + Path attemptDir = new Path(appDir, attemptId); + return attemptDir; + } } public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws Exception { @@ -151,6 +158,15 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { store.getAppDir(app.getApplicationId().toString()); return fs.exists(nodePath); } + + public boolean attemptExists(RMAppAttempt attempt) throws IOException { + FileSystem fs = cluster.getFileSystem(); + ApplicationAttemptId attemptId = attempt.getAppAttemptId(); + Path nodePath = + store.getAttemptDir(attemptId.getApplicationId().toString(), + attemptId.toString()); + return fs.exists(nodePath); + } } @Test(timeout = 60000) @@ -185,6 +201,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { testAppDeletion(fsTester); testDeleteStore(fsTester); testRemoveApplication(fsTester); + testRemoveAttempt(fsTester); testAMRMTokenSecretManagerStateStore(fsTester); testReservationStateStore(fsTester); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index 46661426c8c..ce186e6bec8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -96,6 +97,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase { testRemoveApplication(tester); } + @Test(timeout = 60000) + public void testRemoveAttempt() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testRemoveAttempt(tester); + } + @Test(timeout = 60000) public void testAMTokens() throws Exception { LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); @@ -147,5 +154,14 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase { } return stateStore.loadRMAppState(app.getApplicationId()) != null; } + + @Override + public boolean attemptExists(RMAppAttempt attempt) throws Exception { + if (stateStore.isClosed()) { + getRMStateStore(); + } + return stateStore.loadRMAppAttemptState(attempt.getAppAttemptId()) + != null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 66b023cc85f..406fcf69be3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -123,6 +123,10 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { + appId; } + public String getAttemptNode(String appId, String attemptId) { + return getAppNode(appId) + "/" + attemptId; + } + /** * Emulating retrying createRootDir not to raise NodeExist exception * @throws Exception @@ -165,6 +169,13 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { return null != curatorFramework.checkExists() .forPath(store.getAppNode(app.getApplicationId().toString())); } + + public boolean attemptExists(RMAppAttempt attempt) throws Exception { + ApplicationAttemptId attemptId = attempt.getAppAttemptId(); + return null != curatorFramework.checkExists() + .forPath(store.getAttemptNode( + attemptId.getApplicationId().toString(), attemptId.toString())); + } } @Test (timeout = 60000) @@ -177,6 +188,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { testAppDeletion(zkTester); testDeleteStore(zkTester); testRemoveApplication(zkTester); + testRemoveAttempt(zkTester); testAMRMTokenSecretManagerStateStore(zkTester); testReservationStateStore(zkTester); ((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)