YARN-4044. Running applications information changes such as movequeue is not published to TimeLine server. (Sunil G via rohithsharmaks)

This commit is contained in:
Rohith Sharma K S 2015-09-24 12:13:22 +05:30
parent 8ed0d4b744
commit a9aafad12b
10 changed files with 188 additions and 16 deletions

View File

@ -899,6 +899,9 @@ Release 2.8.0 - UNRELEASED
YARN-4152. NodeManager crash with NPE when LogAggregationService#stopContainer called for YARN-4152. NodeManager crash with NPE when LogAggregationService#stopContainer called for
absent container. (Bibin A Chundatt via rohithsharmaks) absent container. (Bibin A Chundatt via rohithsharmaks)
YARN-4044. Running applications information changes such as movequeue is not published to
TimeLine server. (Sunil G via rohithsharmaks)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -355,6 +355,18 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
if (event.getEventType().equals( if (event.getEventType().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
createdTime = event.getTimestamp(); createdTime = event.getTimestamp();
} else if (event.getEventType().equals(
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
applicationPriority = Integer
.parseInt(eventInfo.get(
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)
.toString());
queue = eventInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)
.toString();
} else if (event.getEventType().equals( } else if (event.getEventType().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
progress=1.0F; progress=1.0F;

View File

@ -80,9 +80,9 @@ public class TestApplicationHistoryManagerOnTimelineStore {
store = createStore(SCALE); store = createStore(SCALE);
TimelineEntities entities = new TimelineEntities(); TimelineEntities entities = new TimelineEntities();
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
ApplicationId.newInstance(0, SCALE + 1), true, true, false)); ApplicationId.newInstance(0, SCALE + 1), true, true, false, false));
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
ApplicationId.newInstance(0, SCALE + 2), true, false, true)); ApplicationId.newInstance(0, SCALE + 2), true, false, true, false));
store.put(entities); store.put(entities);
} }
@ -139,10 +139,10 @@ public class TestApplicationHistoryManagerOnTimelineStore {
ApplicationId appId = ApplicationId.newInstance(0, i); ApplicationId appId = ApplicationId.newInstance(0, i);
if (i == 2) { if (i == 2) {
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
appId, true, false, false)); appId, true, false, false, true));
} else { } else {
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
appId, false, false, false)); appId, false, false, false, false));
} }
store.put(entities); store.put(entities);
for (int j = 1; j <= scale; ++j) { for (int j = 1; j <= scale; ++j) {
@ -182,7 +182,15 @@ public class TestApplicationHistoryManagerOnTimelineStore {
Assert.assertEquals("test app", app.getName()); Assert.assertEquals("test app", app.getName());
Assert.assertEquals("test app type", app.getApplicationType()); Assert.assertEquals("test app type", app.getApplicationType());
Assert.assertEquals("user1", app.getUser()); Assert.assertEquals("user1", app.getUser());
Assert.assertEquals("test queue", app.getQueue()); if (i == 2) {
// Change event is fired only in case of app with ID 2, hence verify
// with updated changes. And make sure last updated change is accepted.
Assert.assertEquals("changed queue1", app.getQueue());
Assert.assertEquals(Priority.newInstance(6), app.getPriority());
} else {
Assert.assertEquals("test queue", app.getQueue());
Assert.assertEquals(Priority.newInstance(0), app.getPriority());
}
Assert.assertEquals(Integer.MAX_VALUE + 2L Assert.assertEquals(Integer.MAX_VALUE + 2L
+ app.getApplicationId().getId(), app.getStartTime()); + app.getApplicationId().getId(), app.getStartTime());
Assert.assertEquals(Integer.MAX_VALUE + 3L Assert.assertEquals(Integer.MAX_VALUE + 3L
@ -458,7 +466,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
private static TimelineEntity createApplicationTimelineEntity( private static TimelineEntity createApplicationTimelineEntity(
ApplicationId appId, boolean emptyACLs, boolean noAttemptId, ApplicationId appId, boolean emptyACLs, boolean noAttemptId,
boolean wrongAppId) { boolean wrongAppId, boolean enableUpdateEvent) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
if (wrongAppId) { if (wrongAppId) {
@ -515,9 +523,32 @@ public class TestApplicationHistoryManagerOnTimelineStore {
} }
tEvent.setEventInfo(eventInfo); tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent); entity.addEvent(tEvent);
if (enableUpdateEvent) {
tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue", 5);
entity.addEvent(tEvent);
// Change priority alone
tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue", 6);
// Now change queue
tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue1", 6);
entity.addEvent(tEvent);
}
return entity; return entity;
} }
private static void createAppModifiedEvent(ApplicationId appId,
TimelineEvent tEvent, String queue, int priority) {
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 4L + appId.getId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, queue);
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
priority);
tEvent.setEventInfo(eventInfo);
}
private static TimelineEntity createAppAttemptTimelineEntity( private static TimelineEntity createAppAttemptTimelineEntity(
ApplicationAttemptId appAttemptId) { ApplicationAttemptId appAttemptId) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();

View File

@ -37,6 +37,9 @@ public class ApplicationMetricsConstants {
public static final String ACLS_UPDATED_EVENT_TYPE = public static final String ACLS_UPDATED_EVENT_TYPE =
"YARN_APPLICATION_ACLS_UPDATED"; "YARN_APPLICATION_ACLS_UPDATED";
public static final String UPDATED_EVENT_TYPE =
"YARN_APPLICATION_UPDATED";
public static final String NAME_ENTITY_INFO = public static final String NAME_ENTITY_INFO =
"YARN_APPLICATION_NAME"; "YARN_APPLICATION_NAME";

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
public class ApplicationUpdatedEvent extends SystemMetricsEvent {
private ApplicationId appId;
private String queue;
private Priority applicationPriority;
public ApplicationUpdatedEvent(ApplicationId appId, String queue,
long updatedTime, Priority applicationPriority) {
super(SystemMetricsEventType.APP_UPDATED, updatedTime);
this.appId = appId;
this.queue = queue;
this.applicationPriority = applicationPriority;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public String getQueue() {
return queue;
}
public Priority getApplicationPriority() {
return applicationPriority;
}
}

View File

@ -24,6 +24,7 @@ public enum SystemMetricsEventType {
APP_CREATED, APP_CREATED,
APP_FINISHED, APP_FINISHED,
APP_ACLS_UPDATED, APP_ACLS_UPDATED,
APP_UPDATED,
// app attempt events // app attempt events
APP_ATTEMPT_REGISTERED, APP_ATTEMPT_REGISTERED,

View File

@ -118,6 +118,17 @@ public class SystemMetricsPublisher extends CompositeService {
} }
} }
@SuppressWarnings("unchecked")
public void appUpdated(RMApp app, long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler()
.handle(
new ApplicationUpdatedEvent(app.getApplicationId(), app
.getQueue(), updatedTime, app
.getApplicationSubmissionContext().getPriority()));
}
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) { public void appFinished(RMApp app, RMAppState state, long finishedTime) {
if (publishSystemMetrics) { if (publishSystemMetrics) {
@ -228,6 +239,9 @@ public class SystemMetricsPublisher extends CompositeService {
case APP_ACLS_UPDATED: case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break; break;
case APP_UPDATED:
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
break;
case APP_ATTEMPT_REGISTERED: case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break; break;
@ -308,6 +322,21 @@ public class SystemMetricsPublisher extends CompositeService {
putEntity(entity); putEntity(entity);
} }
private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationACLsUpdatedEvent( private void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event) { ApplicationACLsUpdatedEvent event) {
TimelineEntity entity = TimelineEntity entity =

View File

@ -911,7 +911,10 @@ public class RMAppImpl implements RMApp, Recoverable {
moveEvent.getResult().setException(ex); moveEvent.getResult().setException(ex);
return; return;
} }
app.rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
// TODO: Write out change to state store (YARN-1558) // TODO: Write out change to state store (YARN-1558)
// Also take care of RM failover // Also take care of RM failover
moveEvent.getResult().set(null); moveEvent.getResult().set(null);

View File

@ -1946,6 +1946,10 @@ public class CapacityScheduler extends
application.getCurrentAppAttempt()); application.getCurrentAppAttempt());
} }
// Update the changed application state to timeline server
rmContext.getSystemMetricsPublisher().appUpdated(rmApp,
System.currentTimeMillis());
LOG.info("Priority '" + appPriority + "' is updated in queue :" LOG.info("Priority '" + appPriority + "' is updated in queue :"
+ rmApp.getQueue() + " for application: " + applicationId + rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser()); + " for the user: " + rmApp.getUser());

View File

@ -109,6 +109,17 @@ public class TestSystemMetricsPublisher {
ApplicationId appId = ApplicationId.newInstance(0, i); ApplicationId appId = ApplicationId.newInstance(0, i);
RMApp app = createRMApp(appId); RMApp app = createRMApp(appId);
metricsPublisher.appCreated(app, app.getStartTime()); metricsPublisher.appCreated(app, app.getStartTime());
if (i == 1) {
when(app.getQueue()).thenReturn("new test queue");
ApplicationSubmissionContext asc = mock(ApplicationSubmissionContext.class);
when(asc.getUnmanagedAM()).thenReturn(false);
when(asc.getPriority()).thenReturn(Priority.newInstance(1));
when(asc.getNodeLabelExpression()).thenReturn("high-cpu");
when(app.getApplicationSubmissionContext()).thenReturn(asc);
metricsPublisher.appUpdated(app, 4L);
} else {
metricsPublisher.appUpdated(app, 4L);
}
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
if (i == 1) { if (i == 1) {
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L); metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
@ -123,7 +134,7 @@ public class TestSystemMetricsPublisher {
ApplicationMetricsConstants.ENTITY_TYPE, ApplicationMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class)); EnumSet.allOf(Field.class));
// ensure three events are both published before leaving the loop // ensure three events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 3); } while (entity == null || entity.getEvents().size() < 4);
// verify all the fields // verify all the fields
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE, Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
entity.getEntityType()); entity.getEntityType());
@ -134,19 +145,24 @@ public class TestSystemMetricsPublisher {
app.getName(), app.getName(),
entity.getOtherInfo().get( entity.getOtherInfo().get(
ApplicationMetricsConstants.NAME_ENTITY_INFO)); ApplicationMetricsConstants.NAME_ENTITY_INFO));
Assert.assertEquals(app.getQueue(), if (i != 1) {
entity.getOtherInfo() Assert.assertEquals(
.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)); app.getQueue(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
}
Assert.assertEquals( Assert.assertEquals(
app.getApplicationSubmissionContext().getUnmanagedAM(), app.getApplicationSubmissionContext().getUnmanagedAM(),
entity.getOtherInfo().get( entity.getOtherInfo().get(
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)); ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO));
Assert.assertEquals( if (i != 1) {
app.getApplicationSubmissionContext().getPriority().getPriority(), Assert.assertEquals(
entity.getOtherInfo().get( app.getApplicationSubmissionContext().getPriority().getPriority(),
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)); entity.getOtherInfo().get(
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO));
}
Assert.assertEquals(app.getAmNodeLabelExpression(), entity.getOtherInfo() Assert.assertEquals(app.getAmNodeLabelExpression(), entity.getOtherInfo()
.get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION)); .get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION));
@ -190,6 +206,7 @@ public class TestSystemMetricsPublisher {
.get(ApplicationMetricsConstants.APP_CPU_METRICS).toString())); .get(ApplicationMetricsConstants.APP_CPU_METRICS).toString()));
} }
boolean hasCreatedEvent = false; boolean hasCreatedEvent = false;
boolean hasUpdatedEvent = false;
boolean hasFinishedEvent = false; boolean hasFinishedEvent = false;
boolean hasACLsUpdatedEvent = false; boolean hasACLsUpdatedEvent = false;
for (TimelineEvent event : entity.getEvents()) { for (TimelineEvent event : entity.getEvents()) {
@ -211,13 +228,28 @@ public class TestSystemMetricsPublisher {
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)); ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
.getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO)); .getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
} else if (event.getEventType().equals(
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
hasUpdatedEvent = true;
Assert.assertEquals(4L, event.getTimestamp());
if (1 == i) {
Assert.assertEquals(
1,
event.getEventInfo().get(
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO));
Assert.assertEquals(
"new test queue",
event.getEventInfo().get(
ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
}
} else if (event.getEventType().equals( } else if (event.getEventType().equals(
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) { ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
hasACLsUpdatedEvent = true; hasACLsUpdatedEvent = true;
Assert.assertEquals(4L, event.getTimestamp()); Assert.assertEquals(4L, event.getTimestamp());
} }
} }
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent); Assert.assertTrue(hasCreatedEvent && hasFinishedEvent
&& hasACLsUpdatedEvent && hasUpdatedEvent);
} }
} }