Merge r1617341 from trunk. YARN-2138. Cleaned up notifyDone* APIs in RMStateStore. Contributed by Varun Saxena
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1617342 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f805ac93a9
commit
285267b2dc
|
@ -92,6 +92,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2337. ResourceManager sets ClientRMService in RMContext multiple times.
|
YARN-2337. ResourceManager sets ClientRMService in RMContext multiple times.
|
||||||
(Zhihai Xu via kasha)
|
(Zhihai Xu via kasha)
|
||||||
|
|
||||||
|
YARN-2138. Cleaned up notifyDone* APIs in RMStateStore. (Varun Saxena via
|
||||||
|
jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -52,13 +52,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
|
@ -132,7 +132,8 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
LOG.info("Storing info for app: " + appId);
|
LOG.info("Storing info for app: " + appId);
|
||||||
try {
|
try {
|
||||||
store.storeApplicationStateInternal(appId, appStateData);
|
store.storeApplicationStateInternal(appId, appStateData);
|
||||||
store.notifyDoneStoringApplication(appId, null);
|
store.notifyApplication(new RMAppEvent(appId,
|
||||||
|
RMAppEventType.APP_NEW_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error storing app: " + appId, e);
|
LOG.error("Error storing app: " + appId, e);
|
||||||
store.notifyStoreOperationFailed(e);
|
store.notifyStoreOperationFailed(e);
|
||||||
|
@ -156,7 +157,8 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
LOG.info("Updating info for app: " + appId);
|
LOG.info("Updating info for app: " + appId);
|
||||||
try {
|
try {
|
||||||
store.updateApplicationStateInternal(appId, appStateData);
|
store.updateApplicationStateInternal(appId, appStateData);
|
||||||
store.notifyDoneUpdatingApplication(appId, null);
|
store.notifyApplication(new RMAppEvent(appId,
|
||||||
|
RMAppEventType.APP_UPDATE_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error updating app: " + appId, e);
|
LOG.error("Error updating app: " + appId, e);
|
||||||
store.notifyStoreOperationFailed(e);
|
store.notifyStoreOperationFailed(e);
|
||||||
|
@ -205,8 +207,9 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
}
|
}
|
||||||
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
||||||
attemptStateData);
|
attemptStateData);
|
||||||
store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
|
store.notifyApplicationAttempt(new RMAppAttemptEvent
|
||||||
null);
|
(attemptState.getAttemptId(),
|
||||||
|
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
|
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
|
||||||
store.notifyStoreOperationFailed(e);
|
store.notifyStoreOperationFailed(e);
|
||||||
|
@ -233,8 +236,9 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
}
|
}
|
||||||
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
||||||
attemptStateData);
|
attemptStateData);
|
||||||
store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
|
store.notifyApplicationAttempt(new RMAppAttemptEvent
|
||||||
null);
|
(attemptState.getAttemptId(),
|
||||||
|
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
|
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
|
||||||
store.notifyStoreOperationFailed(e);
|
store.notifyStoreOperationFailed(e);
|
||||||
|
@ -804,42 +808,23 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
/**
|
/**
|
||||||
* In (@link handleStoreEvent}, this method is called to notify the
|
* This method is called to notify the application that
|
||||||
* application that new application is stored in state store
|
* new application is stored or updated in state store
|
||||||
* @param appId id of the application that has been saved
|
* @param event App event containing the app id and event type
|
||||||
* @param storedException the exception that is thrown when storing the
|
|
||||||
* application
|
|
||||||
*/
|
*/
|
||||||
private void notifyDoneStoringApplication(ApplicationId appId,
|
private void notifyApplication(RMAppEvent event) {
|
||||||
Exception storedException) {
|
rmDispatcher.getEventHandler().handle(event);
|
||||||
rmDispatcher.getEventHandler().handle(
|
|
||||||
new RMAppNewSavedEvent(appId, storedException));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void notifyDoneUpdatingApplication(ApplicationId appId,
|
|
||||||
Exception storedException) {
|
|
||||||
rmDispatcher.getEventHandler().handle(
|
|
||||||
new RMAppUpdateSavedEvent(appId, storedException));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
/**
|
/**
|
||||||
* In (@link handleStoreEvent}, this method is called to notify the
|
* This method is called to notify the application attempt
|
||||||
* application attempt that new attempt is stored in state store
|
* that new attempt is stored or updated in state store
|
||||||
* @param appAttempt attempt that has been saved
|
* @param event App attempt event containing the app attempt
|
||||||
|
* id and event type
|
||||||
*/
|
*/
|
||||||
private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
|
private void notifyApplicationAttempt(RMAppAttemptEvent event) {
|
||||||
Exception storedException) {
|
rmDispatcher.getEventHandler().handle(event);
|
||||||
rmDispatcher.getEventHandler().handle(
|
|
||||||
new RMAppAttemptNewSavedEvent(attemptId, storedException));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
|
|
||||||
Exception updatedException) {
|
|
||||||
rmDispatcher.getEventHandler().handle(
|
|
||||||
new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -820,17 +820,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
RMAppTransition {
|
RMAppTransition {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
if (event instanceof RMAppNewSavedEvent) {
|
|
||||||
RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
|
|
||||||
// For HA this exception needs to be handled by giving up
|
|
||||||
// master status if we got fenced
|
|
||||||
if (((RMAppNewSavedEvent) event).getStoredException() != null) {
|
|
||||||
LOG.error(
|
|
||||||
"Failed to store application: " + storeEvent.getApplicationId(),
|
|
||||||
storeEvent.getStoredException());
|
|
||||||
ExitUtil.terminate(1, storeEvent.getStoredException());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
|
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
|
||||||
app.submissionContext.getQueue(), app.user));
|
app.submissionContext.getQueue(), app.user));
|
||||||
}
|
}
|
||||||
|
@ -848,13 +837,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||||
RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
|
|
||||||
if (storeEvent.getUpdatedException() != null) {
|
|
||||||
LOG.error("Failed to update the final state of application"
|
|
||||||
+ storeEvent.getApplicationId(), storeEvent.getUpdatedException());
|
|
||||||
ExitUtil.terminate(1, storeEvent.getUpdatedException());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (app.transitionTodo instanceof SingleArcTransition) {
|
if (app.transitionTodo instanceof SingleArcTransition) {
|
||||||
((SingleArcTransition) app.transitionTodo).transition(app,
|
((SingleArcTransition) app.transitionTodo).transition(app,
|
||||||
app.eventCausingFinalSaving);
|
app.eventCausingFinalSaving);
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
|
|
||||||
public class RMAppNewSavedEvent extends RMAppEvent {
|
|
||||||
|
|
||||||
private final Exception storedException;
|
|
||||||
|
|
||||||
public RMAppNewSavedEvent(ApplicationId appId, Exception storedException) {
|
|
||||||
super(appId, RMAppEventType.APP_NEW_SAVED);
|
|
||||||
this.storedException = storedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Exception getStoredException() {
|
|
||||||
return storedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
|
|
||||||
public class RMAppUpdateSavedEvent extends RMAppEvent {
|
|
||||||
|
|
||||||
private final Exception updatedException;
|
|
||||||
|
|
||||||
public RMAppUpdateSavedEvent(ApplicationId appId, Exception updatedException) {
|
|
||||||
super(appId, RMAppEventType.APP_UPDATE_SAVED);
|
|
||||||
this.updatedException = updatedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Exception getUpdatedException() {
|
|
||||||
return updatedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -80,11 +80,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
@ -905,8 +903,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppAttemptImpl appAttempt,
|
public void transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
appAttempt.checkAttemptStoreError(event);
|
|
||||||
|
|
||||||
appAttempt.launchAttempt();
|
appAttempt.launchAttempt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1058,14 +1054,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
@Override
|
@Override
|
||||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
|
|
||||||
if (storeEvent.getUpdatedException() != null) {
|
|
||||||
LOG.error("Failed to update the final state of application attempt: "
|
|
||||||
+ storeEvent.getApplicationAttemptId(),
|
|
||||||
storeEvent.getUpdatedException());
|
|
||||||
ExitUtil.terminate(1, storeEvent.getUpdatedException());
|
|
||||||
}
|
|
||||||
|
|
||||||
RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
|
RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
|
||||||
|
|
||||||
if (appAttempt.transitionTodo instanceof SingleArcTransition) {
|
if (appAttempt.transitionTodo instanceof SingleArcTransition) {
|
||||||
|
@ -1195,8 +1183,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppAttemptImpl appAttempt,
|
public void transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
appAttempt.checkAttemptStoreError(event);
|
|
||||||
|
|
||||||
// create AMRMToken
|
// create AMRMToken
|
||||||
appAttempt.amrmToken =
|
appAttempt.amrmToken =
|
||||||
appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||||
|
@ -1689,18 +1675,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
|
rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkAttemptStoreError(RMAppAttemptEvent event) {
|
|
||||||
RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
|
|
||||||
if(storeEvent.getStoredException() != null)
|
|
||||||
{
|
|
||||||
// This needs to be handled for HA and give up master status if we got
|
|
||||||
// fenced
|
|
||||||
LOG.error("Failed to store attempt: " + getAppAttemptId(),
|
|
||||||
storeEvent.getStoredException());
|
|
||||||
ExitUtil.terminate(1, storeEvent.getStoredException());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void storeAttempt() {
|
private void storeAttempt() {
|
||||||
// store attempt data in a non-blocking manner to prevent dispatcher
|
// store attempt data in a non-blocking manner to prevent dispatcher
|
||||||
// thread starvation and wait for state to be saved
|
// thread starvation and wait for state to be saved
|
||||||
|
|
|
@ -1,39 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
||||||
|
|
||||||
public class RMAppAttemptNewSavedEvent extends RMAppAttemptEvent {
|
|
||||||
|
|
||||||
final Exception storedException;
|
|
||||||
|
|
||||||
public RMAppAttemptNewSavedEvent(ApplicationAttemptId appAttemptId,
|
|
||||||
Exception storedException) {
|
|
||||||
super(appAttemptId, RMAppAttemptEventType.ATTEMPT_NEW_SAVED);
|
|
||||||
this.storedException = storedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Exception getStoredException() {
|
|
||||||
return storedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,38 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
||||||
|
|
||||||
public class RMAppAttemptUpdateSavedEvent extends RMAppAttemptEvent {
|
|
||||||
|
|
||||||
final Exception updatedException;
|
|
||||||
|
|
||||||
public RMAppAttemptUpdateSavedEvent(ApplicationAttemptId appAttemptId,
|
|
||||||
Exception updatedException) {
|
|
||||||
super(appAttemptId, RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED);
|
|
||||||
this.updatedException = updatedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Exception getUpdatedException() {
|
|
||||||
return updatedException;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||||
|
@ -77,10 +77,9 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
||||||
|
|
||||||
static class TestDispatcher implements
|
static class TestDispatcher implements
|
||||||
Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> {
|
Dispatcher, EventHandler<RMAppAttemptEvent> {
|
||||||
|
|
||||||
ApplicationAttemptId attemptId;
|
ApplicationAttemptId attemptId;
|
||||||
Exception storedException;
|
|
||||||
|
|
||||||
boolean notified = false;
|
boolean notified = false;
|
||||||
|
|
||||||
|
@ -91,9 +90,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(RMAppAttemptNewSavedEvent event) {
|
public void handle(RMAppAttemptEvent event) {
|
||||||
assertEquals(attemptId, event.getApplicationAttemptId());
|
assertEquals(attemptId, event.getApplicationAttemptId());
|
||||||
assertEquals(storedException, event.getStoredException());
|
|
||||||
notified = true;
|
notified = true;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
notifyAll();
|
notifyAll();
|
||||||
|
@ -163,7 +161,6 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
when(mockAttempt.getClientTokenMasterKey())
|
when(mockAttempt.getClientTokenMasterKey())
|
||||||
.thenReturn(clientTokenMasterKey);
|
.thenReturn(clientTokenMasterKey);
|
||||||
dispatcher.attemptId = attemptId;
|
dispatcher.attemptId = attemptId;
|
||||||
dispatcher.storedException = null;
|
|
||||||
store.storeNewApplicationAttempt(mockAttempt);
|
store.storeNewApplicationAttempt(mockAttempt);
|
||||||
waitNotify(dispatcher);
|
waitNotify(dispatcher);
|
||||||
return container.getId();
|
return container.getId();
|
||||||
|
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||||
|
@ -328,15 +327,15 @@ public class TestRMAppTransitions {
|
||||||
|
|
||||||
private void sendAppUpdateSavedEvent(RMApp application) {
|
private void sendAppUpdateSavedEvent(RMApp application) {
|
||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppUpdateSavedEvent(application.getApplicationId(), null);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
rmDispatcher.await();
|
rmDispatcher.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendAttemptUpdateSavedEvent(RMApp application) {
|
private void sendAttemptUpdateSavedEvent(RMApp application) {
|
||||||
application.getCurrentAppAttempt().handle(
|
application.getCurrentAppAttempt().handle(
|
||||||
new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
|
new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
.getAppAttemptId(), null));
|
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RMApp testCreateAppNewSaving(
|
protected RMApp testCreateAppNewSaving(
|
||||||
|
@ -357,7 +356,7 @@ public class TestRMAppTransitions {
|
||||||
RMApp application = testCreateAppNewSaving(submissionContext);
|
RMApp application = testCreateAppNewSaving(submissionContext);
|
||||||
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
|
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
|
||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppNewSavedEvent(application.getApplicationId(), null);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
assertStartTimeSet(application);
|
assertStartTimeSet(application);
|
||||||
assertAppState(RMAppState.SUBMITTED, application);
|
assertAppState(RMAppState.SUBMITTED, application);
|
||||||
|
@ -422,7 +421,7 @@ public class TestRMAppTransitions {
|
||||||
RMApp application = testCreateAppFinalSaving(submissionContext);
|
RMApp application = testCreateAppFinalSaving(submissionContext);
|
||||||
// FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
|
// FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
|
||||||
RMAppEvent appUpdated =
|
RMAppEvent appUpdated =
|
||||||
new RMAppUpdateSavedEvent(application.getApplicationId(), null);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
|
||||||
application.handle(appUpdated);
|
application.handle(appUpdated);
|
||||||
assertAppState(RMAppState.FINISHING, application);
|
assertAppState(RMAppState.FINISHING, application);
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
|
@ -763,7 +762,7 @@ public class TestRMAppTransitions {
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
assertAppState(RMAppState.FINAL_SAVING, application);
|
assertAppState(RMAppState.FINAL_SAVING, application);
|
||||||
RMAppEvent appUpdated =
|
RMAppEvent appUpdated =
|
||||||
new RMAppUpdateSavedEvent(application.getApplicationId(), null);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
|
||||||
application.handle(appUpdated);
|
application.handle(appUpdated);
|
||||||
assertAppState(RMAppState.FINISHED, application);
|
assertAppState(RMAppState.FINISHED, application);
|
||||||
|
|
||||||
|
|
|
@ -81,10 +81,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||||
|
@ -577,8 +575,8 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
applicationAttempt.handle(
|
applicationAttempt.handle(
|
||||||
new RMAppAttemptNewSavedEvent(
|
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
|
||||||
applicationAttempt.getAppAttemptId(), null));
|
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
||||||
}
|
}
|
||||||
|
|
||||||
testAppAttemptScheduledState();
|
testAppAttemptScheduledState();
|
||||||
|
@ -616,8 +614,8 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
|
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
applicationAttempt.handle(
|
applicationAttempt.handle(
|
||||||
new RMAppAttemptNewSavedEvent(
|
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
|
||||||
applicationAttempt.getAppAttemptId(), null));
|
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
||||||
|
|
||||||
testAppAttemptAllocatedState(container);
|
testAppAttemptAllocatedState(container);
|
||||||
|
|
||||||
|
@ -696,8 +694,8 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(RMAppAttemptState.FINAL_SAVING,
|
assertEquals(RMAppAttemptState.FINAL_SAVING,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
applicationAttempt.handle(
|
applicationAttempt.handle(
|
||||||
new RMAppAttemptUpdateSavedEvent(
|
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
|
||||||
applicationAttempt.getAppAttemptId(), null));
|
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue