YARN-5029. RM needs to send update event with YarnApplicationState as Running to ATS/AHS. Contributed by Xuan Gong.

(cherry picked from commit 39f2bac38b)
(cherry picked from commit 9dc3683d87)
This commit is contained in:
Junping Du 2016-05-11 09:27:26 -07:00
parent ed2230ec7d
commit 6d1300880f
8 changed files with 165 additions and 40 deletions

View File

@ -368,6 +368,17 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
.toString()); .toString());
queue = eventInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO) queue = eventInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)
.toString(); .toString();
} else if (event.getEventType().equals(
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) {
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
if (eventInfo.containsKey(
ApplicationMetricsConstants.STATE_EVENT_INFO)) {
state = YarnApplicationState.valueOf(eventInfo.get(
ApplicationMetricsConstants.STATE_EVENT_INFO).toString());
}
} else if (event.getEventType().equals( } else if (event.getEventType().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
progress=1.0F; progress=1.0F;

View File

@ -274,12 +274,8 @@ public class AHSWebServices extends WebServices {
} }
} }
// TODO: YARN-5029. RM would send the update event. We could get
// the consistent YarnApplicationState.
// Will remove YarnApplicationState.ACCEPTED.
private boolean isRunningState(YarnApplicationState appState) { private boolean isRunningState(YarnApplicationState appState) {
return appState == YarnApplicationState.ACCEPTED return appState == YarnApplicationState.RUNNING;
|| appState == YarnApplicationState.RUNNING;
} }
private boolean isFinishedState(YarnApplicationState appState) { private boolean isFinishedState(YarnApplicationState appState) {

View File

@ -40,6 +40,9 @@ public class ApplicationMetricsConstants {
public static final String UPDATED_EVENT_TYPE = public static final String UPDATED_EVENT_TYPE =
"YARN_APPLICATION_UPDATED"; "YARN_APPLICATION_UPDATED";
public static final String STATE_UPDATED_EVENT_TYPE =
"YARN_APPLICATION_STATE_UPDATED";
public static final String NAME_ENTITY_INFO = public static final String NAME_ENTITY_INFO =
"YARN_APPLICATION_NAME"; "YARN_APPLICATION_NAME";

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
/**
* When the state of this application has been changed, RM would sent
* this event to inform Timeline Server for keeping the Application state
* consistent.
*/
public class ApplicaitonStateUpdatedEvent extends SystemMetricsEvent{
private ApplicationId appId;
private YarnApplicationState appState;
public ApplicaitonStateUpdatedEvent(ApplicationId appliocationId,
YarnApplicationState state, long updatedTime) {
super(SystemMetricsEventType.APP_STATE_UPDATED, updatedTime);
this.appId = appliocationId;
this.appState = state;
}
public ApplicationId getApplicationId() {
return appId;
}
public YarnApplicationState getAppState() {
return appState;
}
}

View File

@ -25,6 +25,7 @@ public enum SystemMetricsEventType {
APP_FINISHED, APP_FINISHED,
APP_ACLS_UPDATED, APP_ACLS_UPDATED,
APP_UPDATED, APP_UPDATED,
APP_STATE_UPDATED,
// app attempt events // app attempt events
APP_ATTEMPT_REGISTERED, APP_ATTEMPT_REGISTERED,

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
/** /**
* The class that helps RM publish metrics to the timeline server. RM will * The class that helps RM publish metrics to the timeline server. RM will
* always invoke the methods of this class regardless the service is enabled or * always invoke the methods of this class regardless the service is enabled or
@ -157,6 +160,18 @@ public class SystemMetricsPublisher extends CompositeService {
} }
} }
@SuppressWarnings("unchecked")
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicaitonStateUpdatedEvent(
app.getApplicationId(),
appState,
updatedTime));
}
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt, public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) { long registeredTime) {
@ -235,32 +250,36 @@ public class SystemMetricsPublisher extends CompositeService {
protected void handleSystemMetricsEvent( protected void handleSystemMetricsEvent(
SystemMetricsEvent event) { SystemMetricsEvent event) {
switch (event.getType()) { switch (event.getType()) {
case APP_CREATED: case APP_CREATED:
publishApplicationCreatedEvent((ApplicationCreatedEvent) event); publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
break; break;
case APP_FINISHED: case APP_FINISHED:
publishApplicationFinishedEvent((ApplicationFinishedEvent) event); publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
break; break;
case APP_ACLS_UPDATED: case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break; break;
case APP_UPDATED: case APP_UPDATED:
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
break; break;
case APP_ATTEMPT_REGISTERED: case APP_STATE_UPDATED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); publishApplicationStateUpdatedEvent(
break; (ApplicaitonStateUpdatedEvent)event);
case APP_ATTEMPT_FINISHED: break;
publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); case APP_ATTEMPT_REGISTERED:
break; publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
case CONTAINER_CREATED: break;
publishContainerCreatedEvent((ContainerCreatedEvent) event); case APP_ATTEMPT_FINISHED:
break; publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
case CONTAINER_FINISHED: break;
publishContainerFinishedEvent((ContainerFinishedEvent) event); case CONTAINER_CREATED:
break; publishContainerCreatedEvent((ContainerCreatedEvent) event);
default: break;
LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); case CONTAINER_FINISHED:
publishContainerFinishedEvent((ContainerFinishedEvent) event);
break;
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
} }
} }
@ -352,6 +371,20 @@ public class SystemMetricsPublisher extends CompositeService {
putEntity(entity); putEntity(entity);
} }
private void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getAppState());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationACLsUpdatedEvent( private void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event) { ApplicationACLsUpdatedEvent event) {
TimelineEntity entity = TimelineEntity entity =
@ -501,7 +534,9 @@ public class SystemMetricsPublisher extends CompositeService {
return entity; return entity;
} }
private void putEntity(TimelineEntity entity) { @Private
@VisibleForTesting
public void putEntity(TimelineEntity entity) {
try { try {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId() + LOG.debug("Publishing the entity " + entity.getEntityId() +

View File

@ -247,7 +247,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition()) RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED) RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING))
.addTransition(RMAppState.ACCEPTED, .addTransition(RMAppState.ACCEPTED,
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
// ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
@ -905,7 +906,21 @@ public class RMAppImpl implements RMApp, Recoverable {
nodeUpdateEvent.getNode()); nodeUpdateEvent.getNode());
}; };
} }
private static final class RMAppStateUpdateTransition
extends RMAppTransition {
private YarnApplicationState stateToATS;
public RMAppStateUpdateTransition(YarnApplicationState state) {
stateToATS = state;
}
public void transition(RMAppImpl app, RMAppEvent event) {
app.rmContext.getSystemMetricsPublisher().appStateUpdated(
app, stateToATS, app.systemClock.getTime());
};
}
private static final class AppRunningOnNodeTransition extends RMAppTransition { private static final class AppRunningOnNodeTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event; RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;

View File

@ -106,13 +106,15 @@ public class TestSystemMetricsPublisher {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testPublishApplicationMetrics() throws Exception { public void testPublishApplicationMetrics() throws Exception {
long stateUpdateTimeStamp = System.currentTimeMillis();
for (int i = 1; i <= 2; ++i) { for (int i = 1; i <= 2; ++i) {
ApplicationId appId = ApplicationId.newInstance(0, i); ApplicationId appId = ApplicationId.newInstance(0, i);
RMApp app = createRMApp(appId); RMApp app = createRMApp(appId);
metricsPublisher.appCreated(app, app.getStartTime()); metricsPublisher.appCreated(app, app.getStartTime());
if (i == 1) { if (i == 1) {
when(app.getQueue()).thenReturn("new test queue"); when(app.getQueue()).thenReturn("new test queue");
ApplicationSubmissionContext asc = mock(ApplicationSubmissionContext.class); ApplicationSubmissionContext asc = mock(
ApplicationSubmissionContext.class);
when(asc.getUnmanagedAM()).thenReturn(false); when(asc.getUnmanagedAM()).thenReturn(false);
when(asc.getPriority()).thenReturn(Priority.newInstance(1)); when(asc.getPriority()).thenReturn(Priority.newInstance(1));
when(asc.getNodeLabelExpression()).thenReturn("high-cpu"); when(asc.getNodeLabelExpression()).thenReturn("high-cpu");
@ -121,7 +123,10 @@ public class TestSystemMetricsPublisher {
} else { } else {
metricsPublisher.appUpdated(app, 4L); metricsPublisher.appUpdated(app, 4L);
} }
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); metricsPublisher.appStateUpdated(app, YarnApplicationState.RUNNING,
stateUpdateTimeStamp);
metricsPublisher.appFinished(app, RMAppState.FINISHED,
app.getFinishTime());
if (i == 1) { if (i == 1) {
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L); metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
} else { } else {
@ -134,8 +139,8 @@ public class TestSystemMetricsPublisher {
store.getEntity(appId.toString(), store.getEntity(appId.toString(),
ApplicationMetricsConstants.ENTITY_TYPE, ApplicationMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class)); EnumSet.allOf(Field.class));
// ensure three events are both published before leaving the loop // ensure Five events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 4); } while (entity == null || entity.getEvents().size() < 5);
// verify all the fields // verify all the fields
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE, Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
entity.getEntityType()); entity.getEntityType());
@ -212,6 +217,7 @@ public class TestSystemMetricsPublisher {
boolean hasUpdatedEvent = false; boolean hasUpdatedEvent = false;
boolean hasFinishedEvent = false; boolean hasFinishedEvent = false;
boolean hasACLsUpdatedEvent = false; boolean hasACLsUpdatedEvent = false;
boolean hasStateUpdateEvent = false;
for (TimelineEvent event : entity.getEvents()) { for (TimelineEvent event : entity.getEvents()) {
if (event.getEventType().equals( if (event.getEventType().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
@ -249,10 +255,21 @@ public class TestSystemMetricsPublisher {
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) { ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
hasACLsUpdatedEvent = true; hasACLsUpdatedEvent = true;
Assert.assertEquals(4L, event.getTimestamp()); Assert.assertEquals(4L, event.getTimestamp());
} else if (event.getEventType().equals(
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) {
hasStateUpdateEvent = true;
Assert.assertEquals(event.getTimestamp(), stateUpdateTimeStamp);
Assert.assertEquals(YarnApplicationState.RUNNING.toString(), event
.getEventInfo().get(
ApplicationMetricsConstants.STATE_EVENT_INFO));
} }
} }
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent // Do assertTrue verification separately for easier debug
&& hasACLsUpdatedEvent && hasUpdatedEvent); Assert.assertTrue(hasCreatedEvent);
Assert.assertTrue(hasFinishedEvent);
Assert.assertTrue(hasACLsUpdatedEvent);
Assert.assertTrue(hasUpdatedEvent);
Assert.assertTrue(hasStateUpdateEvent);
} }
} }