YARN-5029. RM needs to send update event with YarnApplicationState as Running to ATS/AHS. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2016-05-11 09:27:26 -07:00
parent 3553261400
commit 39f2bac38b
8 changed files with 165 additions and 40 deletions

View File

@ -368,6 +368,17 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
.toString());
queue = eventInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)
.toString();
} else if (event.getEventType().equals(
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) {
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
if (eventInfo.containsKey(
ApplicationMetricsConstants.STATE_EVENT_INFO)) {
state = YarnApplicationState.valueOf(eventInfo.get(
ApplicationMetricsConstants.STATE_EVENT_INFO).toString());
}
} else if (event.getEventType().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
progress=1.0F;

View File

@ -274,12 +274,8 @@ public class AHSWebServices extends WebServices {
}
}
// TODO: YARN-5029. RM would send the update event. We could get
// the consistent YarnApplicationState.
// Will remove YarnApplicationState.ACCEPTED.
private boolean isRunningState(YarnApplicationState appState) {
return appState == YarnApplicationState.ACCEPTED
|| appState == YarnApplicationState.RUNNING;
return appState == YarnApplicationState.RUNNING;
}
private boolean isFinishedState(YarnApplicationState appState) {

View File

@ -40,6 +40,9 @@ public class ApplicationMetricsConstants {
public static final String UPDATED_EVENT_TYPE =
"YARN_APPLICATION_UPDATED";
public static final String STATE_UPDATED_EVENT_TYPE =
"YARN_APPLICATION_STATE_UPDATED";
public static final String NAME_ENTITY_INFO =
"YARN_APPLICATION_NAME";

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
/**
* When the state of this application has been changed, RM would sent
* this event to inform Timeline Server for keeping the Application state
* consistent.
*/
public class ApplicaitonStateUpdatedEvent extends SystemMetricsEvent{
private ApplicationId appId;
private YarnApplicationState appState;
public ApplicaitonStateUpdatedEvent(ApplicationId appliocationId,
YarnApplicationState state, long updatedTime) {
super(SystemMetricsEventType.APP_STATE_UPDATED, updatedTime);
this.appId = appliocationId;
this.appState = state;
}
public ApplicationId getApplicationId() {
return appId;
}
public YarnApplicationState getAppState() {
return appState;
}
}

View File

@ -25,6 +25,7 @@ public enum SystemMetricsEventType {
APP_FINISHED,
APP_ACLS_UPDATED,
APP_UPDATED,
APP_STATE_UPDATED,
// app attempt events
APP_ATTEMPT_REGISTERED,

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* The class that helps RM publish metrics to the timeline server. RM will
* always invoke the methods of this class regardless the service is enabled or
@ -157,6 +160,18 @@ public class SystemMetricsPublisher extends CompositeService {
}
}
@SuppressWarnings("unchecked")
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicaitonStateUpdatedEvent(
app.getApplicationId(),
appState,
updatedTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
@ -235,32 +250,36 @@ public class SystemMetricsPublisher extends CompositeService {
protected void handleSystemMetricsEvent(
SystemMetricsEvent event) {
switch (event.getType()) {
case APP_CREATED:
publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
break;
case APP_FINISHED:
publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
break;
case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break;
case APP_UPDATED:
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
case APP_ATTEMPT_FINISHED:
publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
break;
case CONTAINER_CREATED:
publishContainerCreatedEvent((ContainerCreatedEvent) event);
break;
case CONTAINER_FINISHED:
publishContainerFinishedEvent((ContainerFinishedEvent) event);
break;
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
case APP_CREATED:
publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
break;
case APP_FINISHED:
publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
break;
case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break;
case APP_UPDATED:
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
break;
case APP_STATE_UPDATED:
publishApplicationStateUpdatedEvent(
(ApplicaitonStateUpdatedEvent)event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
case APP_ATTEMPT_FINISHED:
publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
break;
case CONTAINER_CREATED:
publishContainerCreatedEvent((ContainerCreatedEvent) event);
break;
case CONTAINER_FINISHED:
publishContainerFinishedEvent((ContainerFinishedEvent) event);
break;
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
@ -352,6 +371,20 @@ public class SystemMetricsPublisher extends CompositeService {
putEntity(entity);
}
private void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getAppState());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event) {
TimelineEntity entity =
@ -501,7 +534,9 @@ public class SystemMetricsPublisher extends CompositeService {
return entity;
}
private void putEntity(TimelineEntity entity) {
@Private
@VisibleForTesting
public void putEntity(TimelineEntity entity) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId() +

View File

@ -250,7 +250,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING))
.addTransition(RMAppState.ACCEPTED,
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
// ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
@ -929,6 +930,20 @@ public class RMAppImpl implements RMApp, Recoverable {
};
}
private static final class RMAppStateUpdateTransition
extends RMAppTransition {
private YarnApplicationState stateToATS;
public RMAppStateUpdateTransition(YarnApplicationState state) {
stateToATS = state;
}
public void transition(RMAppImpl app, RMAppEvent event) {
app.rmContext.getSystemMetricsPublisher().appStateUpdated(
app, stateToATS, app.systemClock.getTime());
};
}
private static final class AppRunningOnNodeTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;

View File

@ -106,13 +106,15 @@ public class TestSystemMetricsPublisher {
@Test(timeout = 10000)
public void testPublishApplicationMetrics() throws Exception {
long stateUpdateTimeStamp = System.currentTimeMillis();
for (int i = 1; i <= 2; ++i) {
ApplicationId appId = ApplicationId.newInstance(0, i);
RMApp app = createRMApp(appId);
metricsPublisher.appCreated(app, app.getStartTime());
if (i == 1) {
when(app.getQueue()).thenReturn("new test queue");
ApplicationSubmissionContext asc = mock(ApplicationSubmissionContext.class);
ApplicationSubmissionContext asc = mock(
ApplicationSubmissionContext.class);
when(asc.getUnmanagedAM()).thenReturn(false);
when(asc.getPriority()).thenReturn(Priority.newInstance(1));
when(asc.getNodeLabelExpression()).thenReturn("high-cpu");
@ -121,7 +123,10 @@ public class TestSystemMetricsPublisher {
} else {
metricsPublisher.appUpdated(app, 4L);
}
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
metricsPublisher.appStateUpdated(app, YarnApplicationState.RUNNING,
stateUpdateTimeStamp);
metricsPublisher.appFinished(app, RMAppState.FINISHED,
app.getFinishTime());
if (i == 1) {
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
} else {
@ -134,8 +139,8 @@ public class TestSystemMetricsPublisher {
store.getEntity(appId.toString(),
ApplicationMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
// ensure three events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 4);
// ensure Five events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 5);
// verify all the fields
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
entity.getEntityType());
@ -212,6 +217,7 @@ public class TestSystemMetricsPublisher {
boolean hasUpdatedEvent = false;
boolean hasFinishedEvent = false;
boolean hasACLsUpdatedEvent = false;
boolean hasStateUpdateEvent = false;
for (TimelineEvent event : entity.getEvents()) {
if (event.getEventType().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
@ -249,10 +255,21 @@ public class TestSystemMetricsPublisher {
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
hasACLsUpdatedEvent = true;
Assert.assertEquals(4L, event.getTimestamp());
} else if (event.getEventType().equals(
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) {
hasStateUpdateEvent = true;
Assert.assertEquals(event.getTimestamp(), stateUpdateTimeStamp);
Assert.assertEquals(YarnApplicationState.RUNNING.toString(), event
.getEventInfo().get(
ApplicationMetricsConstants.STATE_EVENT_INFO));
}
}
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent
&& hasACLsUpdatedEvent && hasUpdatedEvent);
// Do assertTrue verification separately for easier debug
Assert.assertTrue(hasCreatedEvent);
Assert.assertTrue(hasFinishedEvent);
Assert.assertTrue(hasACLsUpdatedEvent);
Assert.assertTrue(hasUpdatedEvent);
Assert.assertTrue(hasStateUpdateEvent);
}
}