YARN-3480. Remove attempts that are beyond max-attempt limit from state store. Contributed by Jun Gong
This commit is contained in:
parent
84a8147791
commit
5273413411
|
@ -79,6 +79,9 @@ Release 2.9.0 - UNRELEASED
|
||||||
YARN-4417. Make RM and Timeline-server REST APIs more consistent.
|
YARN-4417. Make RM and Timeline-server REST APIs more consistent.
|
||||||
(wtan via jianhe)
|
(wtan via jianhe)
|
||||||
|
|
||||||
|
YARN-3480. Remove attempts that are beyond max-attempt limit from state
|
||||||
|
store. (Jun Gong via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -817,6 +817,33 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
LOG.error("Error in handling event type " + event.getType()
|
LOG.error("Error in handling event type " + event.getType()
|
||||||
+ " for applicationAttempt " + appAttemptId, t);
|
+ " for applicationAttempt " + appAttemptId, t);
|
||||||
}
|
}
|
||||||
|
} else if (rmApp.getApplicationSubmissionContext() != null
|
||||||
|
&& rmApp.getApplicationSubmissionContext()
|
||||||
|
.getKeepContainersAcrossApplicationAttempts()
|
||||||
|
&& event.getType() == RMAppAttemptEventType.CONTAINER_FINISHED) {
|
||||||
|
// For work-preserving AM restart, failed attempts are still
|
||||||
|
// capturing CONTAINER_FINISHED events and record the finished
|
||||||
|
// containers which will be used by current attempt.
|
||||||
|
// We just keep 'yarn.resourcemanager.am.max-attempts' in
|
||||||
|
// RMStateStore. If the finished container's attempt is deleted, we
|
||||||
|
// use the first attempt in app.attempts to deal with these events.
|
||||||
|
|
||||||
|
RMAppAttempt previousFailedAttempt =
|
||||||
|
rmApp.getAppAttempts().values().iterator().next();
|
||||||
|
if (previousFailedAttempt != null) {
|
||||||
|
try {
|
||||||
|
LOG.debug("Event " + event.getType() + " handled by "
|
||||||
|
+ previousFailedAttempt);
|
||||||
|
previousFailedAttempt.handle(event);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Error in handling event type " + event.getType()
|
||||||
|
+ " for applicationAttempt " + appAttemptId
|
||||||
|
+ " with " + previousFailedAttempt, t);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.error("Event " + event.getType()
|
||||||
|
+ " not handled, because previousFailedAttempt is null");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,6 +230,11 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
|
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
|
||||||
|
if (rmAppAttempt == null) {
|
||||||
|
LOG.info("Ignoring not found attempt " + appAttemptId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Container masterContainer = rmAppAttempt.getMasterContainer();
|
Container masterContainer = rmAppAttempt.getMasterContainer();
|
||||||
if (masterContainer.getId().equals(containerStatus.getContainerId())
|
if (masterContainer.getId().equals(containerStatus.getContainerId())
|
||||||
&& containerStatus.getContainerState() == ContainerState.COMPLETE) {
|
&& containerStatus.getContainerState() == ContainerState.COMPLETE) {
|
||||||
|
|
|
@ -481,6 +481,18 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeApplicationAttemptInternal(
|
||||||
|
ApplicationAttemptId appAttemptId)
|
||||||
|
throws Exception {
|
||||||
|
Path appDirPath =
|
||||||
|
getAppDir(rmAppRoot, appAttemptId.getApplicationId());
|
||||||
|
Path nodeRemovePath = getNodePath(appDirPath, appAttemptId.toString());
|
||||||
|
LOG.info("Removing info for attempt: " + appAttemptId + " at: "
|
||||||
|
+ nodeRemovePath);
|
||||||
|
deleteFileWithRetries(nodeRemovePath);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeApplicationStateInternal(
|
public synchronized void removeApplicationStateInternal(
|
||||||
ApplicationStateData appState)
|
ApplicationStateData appState)
|
||||||
|
|
|
@ -500,6 +500,22 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
return createApplicationState(appId.toString(), data);
|
return createApplicationState(appId.toString(), data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ApplicationAttemptStateData loadRMAppAttemptState(
|
||||||
|
ApplicationAttemptId attemptId) throws IOException {
|
||||||
|
String attemptKey = getApplicationAttemptNodeKey(attemptId);
|
||||||
|
byte[] data = null;
|
||||||
|
try {
|
||||||
|
data = db.get(bytes(attemptKey));
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
if (data == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return createAttemptState(attemptId.toString(), data);
|
||||||
|
}
|
||||||
|
|
||||||
private ApplicationAttemptStateData createAttemptState(String itemName,
|
private ApplicationAttemptStateData createAttemptState(String itemName,
|
||||||
byte[] data) throws IOException {
|
byte[] data) throws IOException {
|
||||||
ApplicationAttemptId attemptId =
|
ApplicationAttemptId attemptId =
|
||||||
|
@ -574,6 +590,22 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
storeApplicationAttemptStateInternal(attemptId, attemptStateData);
|
storeApplicationAttemptStateInternal(attemptId, attemptStateData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeApplicationAttemptInternal(
|
||||||
|
ApplicationAttemptId attemptId)
|
||||||
|
throws IOException {
|
||||||
|
String attemptKey = getApplicationAttemptNodeKey(attemptId);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Removing state for attempt " + attemptId + " at "
|
||||||
|
+ attemptKey);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
db.delete(bytes(attemptKey));
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void removeApplicationStateInternal(ApplicationStateData appState)
|
protected void removeApplicationStateInternal(ApplicationStateData appState)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -141,6 +141,19 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeApplicationAttemptInternal(
|
||||||
|
ApplicationAttemptId appAttemptId) throws Exception {
|
||||||
|
ApplicationStateData appState =
|
||||||
|
state.getApplicationState().get(appAttemptId.getApplicationId());
|
||||||
|
ApplicationAttemptStateData attemptState =
|
||||||
|
appState.attempts.remove(appAttemptId);
|
||||||
|
LOG.info("Removing state for attempt: " + appAttemptId);
|
||||||
|
if (attemptState == null) {
|
||||||
|
throw new YarnRuntimeException("Application doesn't exist");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeApplicationStateInternal(
|
public synchronized void removeApplicationStateInternal(
|
||||||
ApplicationStateData appState) throws Exception {
|
ApplicationStateData appState) throws Exception {
|
||||||
|
|
|
@ -131,6 +131,12 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
ApplicationAttemptStateData attemptStateData) throws Exception {
|
ApplicationAttemptStateData attemptStateData) throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeApplicationAttemptInternal(
|
||||||
|
ApplicationAttemptId attemptId) throws Exception {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkVersion() throws Exception {
|
public void checkVersion() throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
|
|
|
@ -135,6 +135,10 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
|
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
|
||||||
new UpdateAppAttemptTransition())
|
new UpdateAppAttemptTransition())
|
||||||
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
|
RMStateStoreEventType.REMOVE_APP_ATTEMPT,
|
||||||
|
new RemoveAppAttemptTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.STORE_MASTERKEY,
|
RMStateStoreEventType.STORE_MASTERKEY,
|
||||||
|
@ -552,6 +556,32 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
|
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class RemoveAppAttemptTransition implements
|
||||||
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
|
@Override
|
||||||
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
|
if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) {
|
||||||
|
// should never happen
|
||||||
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
|
return RMStateStoreState.ACTIVE;
|
||||||
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
|
ApplicationAttemptId attemptId =
|
||||||
|
((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId();
|
||||||
|
ApplicationId appId = attemptId.getApplicationId();
|
||||||
|
LOG.info("Removing attempt " + attemptId + " from app: " + appId);
|
||||||
|
try {
|
||||||
|
store.removeApplicationAttemptInternal(attemptId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error removing attempt: " + attemptId, e);
|
||||||
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public RMStateStore() {
|
public RMStateStore() {
|
||||||
super(RMStateStore.class.getName());
|
super(RMStateStore.class.getName());
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
@ -983,6 +1013,29 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
protected abstract void removeApplicationStateInternal(
|
protected abstract void removeApplicationStateInternal(
|
||||||
ApplicationStateData appState) throws Exception;
|
ApplicationStateData appState) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Non-blocking API
|
||||||
|
* ResourceManager services call this to remove an attempt from the state
|
||||||
|
* store
|
||||||
|
* This does not block the dispatcher threads
|
||||||
|
* There is no notification of completion for this operation.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public synchronized void removeApplicationAttempt(
|
||||||
|
ApplicationAttemptId applicationAttemptId) {
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocking API
|
||||||
|
* Derived classes must implement this method to remove the state of specified
|
||||||
|
* attempt.
|
||||||
|
*/
|
||||||
|
protected abstract void removeApplicationAttemptInternal(
|
||||||
|
ApplicationAttemptId attemptId) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
|
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
|
||||||
// YARN-1779
|
// YARN-1779
|
||||||
public static final Text AM_RM_TOKEN_SERVICE = new Text(
|
public static final Text AM_RM_TOKEN_SERVICE = new Text(
|
||||||
|
|
|
@ -24,6 +24,7 @@ public enum RMStateStoreEventType {
|
||||||
UPDATE_APP,
|
UPDATE_APP,
|
||||||
UPDATE_APP_ATTEMPT,
|
UPDATE_APP_ATTEMPT,
|
||||||
REMOVE_APP,
|
REMOVE_APP,
|
||||||
|
REMOVE_APP_ATTEMPT,
|
||||||
FENCED,
|
FENCED,
|
||||||
|
|
||||||
// Below events should be called synchronously
|
// Below events should be called synchronously
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A event used to remove an attempt.
|
||||||
|
*/
|
||||||
|
public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent {
|
||||||
|
private ApplicationAttemptId applicationAttemptId;
|
||||||
|
|
||||||
|
RMStateStoreRemoveAppAttemptEvent(ApplicationAttemptId applicationAttemptId) {
|
||||||
|
super(RMStateStoreEventType.REMOVE_APP_ATTEMPT);
|
||||||
|
this.applicationAttemptId = applicationAttemptId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationAttemptId getApplicationAttemptId() {
|
||||||
|
return applicationAttemptId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -658,6 +658,22 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeApplicationAttemptInternal(
|
||||||
|
ApplicationAttemptId appAttemptId)
|
||||||
|
throws Exception {
|
||||||
|
String appId = appAttemptId.getApplicationId().toString();
|
||||||
|
String appIdRemovePath = getNodePath(rmAppRoot, appId);
|
||||||
|
String attemptIdRemovePath = getNodePath(appIdRemovePath,
|
||||||
|
appAttemptId.toString());
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
|
||||||
|
+ attemptIdRemovePath);
|
||||||
|
}
|
||||||
|
safeDelete(attemptIdRemovePath);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeApplicationStateInternal(
|
public synchronized void removeApplicationStateInternal(
|
||||||
ApplicationStateData appState)
|
ApplicationStateData appState)
|
||||||
|
|
|
@ -80,6 +80,16 @@ public abstract class ApplicationStateData {
|
||||||
return attempts.get(attemptId);
|
return attempts.get(attemptId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getFirstAttemptId() {
|
||||||
|
int min = Integer.MAX_VALUE;
|
||||||
|
for(ApplicationAttemptId attemptId : attempts.keySet()) {
|
||||||
|
if (attemptId.getAttemptId() < min) {
|
||||||
|
min = attemptId.getAttemptId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return min == Integer.MAX_VALUE ? 1 : min;
|
||||||
|
}
|
||||||
|
|
||||||
public abstract ApplicationStateDataProto getProto();
|
public abstract ApplicationStateDataProto getProto();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -149,6 +149,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
private long startTime;
|
private long startTime;
|
||||||
private long finishTime = 0;
|
private long finishTime = 0;
|
||||||
private long storedFinishTime = 0;
|
private long storedFinishTime = 0;
|
||||||
|
private int firstAttemptIdInStateStore = 1;
|
||||||
|
private int nextAttemptId = 1;
|
||||||
// This field isn't protected by readlock now.
|
// This field isn't protected by readlock now.
|
||||||
private volatile RMAppAttempt currentAttempt;
|
private volatile RMAppAttempt currentAttempt;
|
||||||
private String queue;
|
private String queue;
|
||||||
|
@ -809,6 +811,11 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
this.storedFinishTime = appState.getFinishTime();
|
this.storedFinishTime = appState.getFinishTime();
|
||||||
this.startTime = appState.getStartTime();
|
this.startTime = appState.getStartTime();
|
||||||
this.callerContext = appState.getCallerContext();
|
this.callerContext = appState.getCallerContext();
|
||||||
|
// If interval > 0, some attempts might have been deleted.
|
||||||
|
if (submissionContext.getAttemptFailuresValidityInterval() > 0) {
|
||||||
|
this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
|
||||||
|
this.nextAttemptId = firstAttemptIdInStateStore;
|
||||||
|
}
|
||||||
|
|
||||||
// send the ATS create Event
|
// send the ATS create Event
|
||||||
sendATSCreateEvent(this, this.startTime);
|
sendATSCreateEvent(this, this.startTime);
|
||||||
|
@ -822,7 +829,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
private void createNewAttempt() {
|
private void createNewAttempt() {
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
|
ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
|
||||||
|
|
||||||
BlacklistManager currentAMBlacklist;
|
BlacklistManager currentAMBlacklist;
|
||||||
if (currentAttempt != null) {
|
if (currentAttempt != null) {
|
||||||
|
@ -1304,6 +1311,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
+ app.attemptFailuresValidityInterval + " milliseconds " : " ")
|
+ app.attemptFailuresValidityInterval + " milliseconds " : " ")
|
||||||
+ "is " + numberOfFailure + ". The max attempts is "
|
+ "is " + numberOfFailure + ". The max attempts is "
|
||||||
+ app.maxAppAttempts);
|
+ app.maxAppAttempts);
|
||||||
|
|
||||||
|
removeExcessAttempts(app);
|
||||||
|
|
||||||
if (!app.submissionContext.getUnmanagedAM()
|
if (!app.submissionContext.getUnmanagedAM()
|
||||||
&& numberOfFailure < app.maxAppAttempts) {
|
&& numberOfFailure < app.maxAppAttempts) {
|
||||||
if (initialState.equals(RMAppState.KILLING)) {
|
if (initialState.equals(RMAppState.KILLING)) {
|
||||||
|
@ -1340,6 +1350,19 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
return RMAppState.FINAL_SAVING;
|
return RMAppState.FINAL_SAVING;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeExcessAttempts(RMAppImpl app) {
|
||||||
|
while (app.nextAttemptId - app.firstAttemptIdInStateStore
|
||||||
|
> app.maxAppAttempts) {
|
||||||
|
// attempts' first element is oldest attempt because it is a
|
||||||
|
// LinkedHashMap
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||||
|
app.getApplicationId(), app.firstAttemptIdInStateStore);
|
||||||
|
app.firstAttemptIdInStateStore++;
|
||||||
|
LOG.info("Remove attempt from state store : " + attemptId);
|
||||||
|
app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -859,6 +859,9 @@ public class TestAMRestart {
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
ApplicationStateData app1State =
|
||||||
|
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
||||||
|
Assert.assertEquals(1, app1State.getFirstAttemptId());
|
||||||
|
|
||||||
// re-register the NM
|
// re-register the NM
|
||||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
@ -871,6 +874,7 @@ public class TestAMRestart {
|
||||||
nm1.registerNode(Collections.singletonList(status), null);
|
nm1.registerNode(Collections.singletonList(status), null);
|
||||||
|
|
||||||
rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
|
rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
Assert.assertEquals(2, app1State.getAttemptCount());
|
||||||
|
|
||||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
|
||||||
|
@ -884,6 +888,7 @@ public class TestAMRestart {
|
||||||
nm1
|
nm1
|
||||||
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am4.waitForState(RMAppAttemptState.FAILED);
|
am4.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
Assert.assertEquals(2, app1State.getAttemptCount());
|
||||||
|
|
||||||
// can launch the 5th attempt successfully
|
// can launch the 5th attempt successfully
|
||||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
@ -897,6 +902,7 @@ public class TestAMRestart {
|
||||||
nm1
|
nm1
|
||||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am5.waitForState(RMAppAttemptState.FAILED);
|
am5.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
Assert.assertEquals(2, app1State.getAttemptCount());
|
||||||
|
|
||||||
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -134,6 +135,7 @@ public class RMStateStoreTestBase {
|
||||||
void writeVersion(Version version) throws Exception;
|
void writeVersion(Version version) throws Exception;
|
||||||
Version getCurrentVersion() throws Exception;
|
Version getCurrentVersion() throws Exception;
|
||||||
boolean appExists(RMApp app) throws Exception;
|
boolean appExists(RMApp app) throws Exception;
|
||||||
|
boolean attemptExists(RMAppAttempt attempt) throws Exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
void waitNotify(TestDispatcher dispatcher) {
|
void waitNotify(TestDispatcher dispatcher) {
|
||||||
|
@ -172,7 +174,7 @@ public class RMStateStoreTestBase {
|
||||||
return mockApp;
|
return mockApp;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ContainerId storeAttempt(RMStateStore store,
|
protected RMAppAttempt storeAttempt(RMStateStore store,
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId attemptId,
|
||||||
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
|
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
|
||||||
SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
|
SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
|
||||||
|
@ -195,7 +197,7 @@ public class RMStateStoreTestBase {
|
||||||
dispatcher.attemptId = attemptId;
|
dispatcher.attemptId = attemptId;
|
||||||
store.storeNewApplicationAttempt(mockAttempt);
|
store.storeNewApplicationAttempt(mockAttempt);
|
||||||
waitNotify(dispatcher);
|
waitNotify(dispatcher);
|
||||||
return container.getId();
|
return mockAttempt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
|
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
|
||||||
|
@ -238,8 +240,9 @@ public class RMStateStoreTestBase {
|
||||||
clientToAMTokenMgr.createMasterKey(attemptId1);
|
clientToAMTokenMgr.createMasterKey(attemptId1);
|
||||||
|
|
||||||
ContainerId containerId1 = storeAttempt(store, attemptId1,
|
ContainerId containerId1 = storeAttempt(store, attemptId1,
|
||||||
"container_1352994193343_0001_01_000001",
|
"container_1352994193343_0001_01_000001",
|
||||||
appAttemptToken1, clientTokenKey1, dispatcher);
|
appAttemptToken1, clientTokenKey1, dispatcher)
|
||||||
|
.getMasterContainer().getId();
|
||||||
|
|
||||||
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
|
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
|
||||||
ApplicationAttemptId attemptId2 =
|
ApplicationAttemptId attemptId2 =
|
||||||
|
@ -252,8 +255,9 @@ public class RMStateStoreTestBase {
|
||||||
clientToAMTokenMgr.createMasterKey(attemptId2);
|
clientToAMTokenMgr.createMasterKey(attemptId2);
|
||||||
|
|
||||||
ContainerId containerId2 = storeAttempt(store, attemptId2,
|
ContainerId containerId2 = storeAttempt(store, attemptId2,
|
||||||
"container_1352994193343_0001_02_000001",
|
"container_1352994193343_0001_02_000001",
|
||||||
appAttemptToken2, clientTokenKey2, dispatcher);
|
appAttemptToken2, clientTokenKey2, dispatcher)
|
||||||
|
.getMasterContainer().getId();
|
||||||
|
|
||||||
ApplicationAttemptId attemptIdRemoved = ConverterUtils
|
ApplicationAttemptId attemptIdRemoved = ConverterUtils
|
||||||
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
|
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
|
||||||
|
@ -633,6 +637,47 @@ public class RMStateStoreTestBase {
|
||||||
Assert.assertTrue(stateStoreHelper.appExists(rmApp2));
|
Assert.assertTrue(stateStoreHelper.appExists(rmApp2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRemoveAttempt(RMStateStoreHelper stateStoreHelper)
|
||||||
|
throws Exception {
|
||||||
|
RMStateStore store = stateStoreHelper.getRMStateStore();
|
||||||
|
TestDispatcher dispatcher = new TestDispatcher();
|
||||||
|
store.setRMDispatcher(dispatcher);
|
||||||
|
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1383183339, 6);
|
||||||
|
storeApp(store, appId, 123456, 564321);
|
||||||
|
|
||||||
|
ApplicationAttemptId attemptId1 =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
RMAppAttempt attempt1 = storeAttempt(store, attemptId1,
|
||||||
|
ContainerId.newContainerId(attemptId1, 1).toString(),
|
||||||
|
null, null, dispatcher);
|
||||||
|
ApplicationAttemptId attemptId2 =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 2);
|
||||||
|
RMAppAttempt attempt2 = storeAttempt(store, attemptId2,
|
||||||
|
ContainerId.newContainerId(attemptId2, 1).toString(),
|
||||||
|
null, null, dispatcher);
|
||||||
|
store.removeApplicationAttemptInternal(attemptId1);
|
||||||
|
Assert.assertFalse(stateStoreHelper.attemptExists(attempt1));
|
||||||
|
Assert.assertTrue(stateStoreHelper.attemptExists(attempt2));
|
||||||
|
|
||||||
|
// let things settle down
|
||||||
|
Thread.sleep(1000);
|
||||||
|
store.close();
|
||||||
|
|
||||||
|
// load state
|
||||||
|
store = stateStoreHelper.getRMStateStore();
|
||||||
|
RMState state = store.loadState();
|
||||||
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||||
|
state.getApplicationState();
|
||||||
|
|
||||||
|
ApplicationStateData appState = rmAppState.get(appId);
|
||||||
|
// app is loaded
|
||||||
|
assertNotNull(appState);
|
||||||
|
assertEquals(2, appState.getFirstAttemptId());
|
||||||
|
assertNull(appState.getAttempt(attemptId1));
|
||||||
|
assertNotNull(appState.getAttempt(attemptId2));
|
||||||
|
}
|
||||||
|
|
||||||
protected void modifyAppState() throws Exception {
|
protected void modifyAppState() throws Exception {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -88,6 +89,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
Path appDir = new Path(appRootDir, appId);
|
Path appDir = new Path(appRootDir, appId);
|
||||||
return appDir;
|
return appDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Path getAttemptDir(String appId, String attemptId) {
|
||||||
|
Path appDir = getAppDir(appId);
|
||||||
|
Path attemptDir = new Path(appDir, attemptId);
|
||||||
|
return attemptDir;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws Exception {
|
public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws Exception {
|
||||||
|
@ -151,6 +158,15 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
store.getAppDir(app.getApplicationId().toString());
|
store.getAppDir(app.getApplicationId().toString());
|
||||||
return fs.exists(nodePath);
|
return fs.exists(nodePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean attemptExists(RMAppAttempt attempt) throws IOException {
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
ApplicationAttemptId attemptId = attempt.getAppAttemptId();
|
||||||
|
Path nodePath =
|
||||||
|
store.getAttemptDir(attemptId.getApplicationId().toString(),
|
||||||
|
attemptId.toString());
|
||||||
|
return fs.exists(nodePath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
|
@ -185,6 +201,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
testAppDeletion(fsTester);
|
testAppDeletion(fsTester);
|
||||||
testDeleteStore(fsTester);
|
testDeleteStore(fsTester);
|
||||||
testRemoveApplication(fsTester);
|
testRemoveApplication(fsTester);
|
||||||
|
testRemoveAttempt(fsTester);
|
||||||
testAMRMTokenSecretManagerStateStore(fsTester);
|
testAMRMTokenSecretManagerStateStore(fsTester);
|
||||||
testReservationStateStore(fsTester);
|
testReservationStateStore(fsTester);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -96,6 +97,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
|
||||||
testRemoveApplication(tester);
|
testRemoveApplication(tester);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRemoveAttempt() throws Exception {
|
||||||
|
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
|
||||||
|
testRemoveAttempt(tester);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testAMTokens() throws Exception {
|
public void testAMTokens() throws Exception {
|
||||||
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
|
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
|
||||||
|
@ -147,5 +154,14 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
|
||||||
}
|
}
|
||||||
return stateStore.loadRMAppState(app.getApplicationId()) != null;
|
return stateStore.loadRMAppState(app.getApplicationId()) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean attemptExists(RMAppAttempt attempt) throws Exception {
|
||||||
|
if (stateStore.isClosed()) {
|
||||||
|
getRMStateStore();
|
||||||
|
}
|
||||||
|
return stateStore.loadRMAppAttemptState(attempt.getAppAttemptId())
|
||||||
|
!= null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,6 +123,10 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
+ appId;
|
+ appId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getAttemptNode(String appId, String attemptId) {
|
||||||
|
return getAppNode(appId) + "/" + attemptId;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Emulating retrying createRootDir not to raise NodeExist exception
|
* Emulating retrying createRootDir not to raise NodeExist exception
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
|
@ -165,6 +169,13 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
return null != curatorFramework.checkExists()
|
return null != curatorFramework.checkExists()
|
||||||
.forPath(store.getAppNode(app.getApplicationId().toString()));
|
.forPath(store.getAppNode(app.getApplicationId().toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean attemptExists(RMAppAttempt attempt) throws Exception {
|
||||||
|
ApplicationAttemptId attemptId = attempt.getAppAttemptId();
|
||||||
|
return null != curatorFramework.checkExists()
|
||||||
|
.forPath(store.getAttemptNode(
|
||||||
|
attemptId.getApplicationId().toString(), attemptId.toString()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -177,6 +188,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
testAppDeletion(zkTester);
|
testAppDeletion(zkTester);
|
||||||
testDeleteStore(zkTester);
|
testDeleteStore(zkTester);
|
||||||
testRemoveApplication(zkTester);
|
testRemoveApplication(zkTester);
|
||||||
|
testRemoveAttempt(zkTester);
|
||||||
testAMRMTokenSecretManagerStateStore(zkTester);
|
testAMRMTokenSecretManagerStateStore(zkTester);
|
||||||
testReservationStateStore(zkTester);
|
testReservationStateStore(zkTester);
|
||||||
((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)
|
((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)
|
||||||
|
|
Loading…
Reference in New Issue