YARN-4044. Running applications information changes such as movequeue is not published to TimeLine server. (Sunil G via rohithsharmaks)

This commit is contained in:
Rohith Sharma K S 2015-09-24 12:17:55 +05:30
parent ace4d26936
commit fbb1d923f4
10 changed files with 188 additions and 16 deletions

View File

@ -822,6 +822,9 @@ Release 2.8.0 - UNRELEASED
YARN-4152. NodeManager crash with NPE when LogAggregationService#stopContainer called for
absent container. (Bibin A Chundatt via rohithsharmaks)
YARN-4044. Running applications information changes such as movequeue is not published to
TimeLine server. (Sunil G via rohithsharmaks)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -355,6 +355,18 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
if (event.getEventType().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
createdTime = event.getTimestamp();
} else if (event.getEventType().equals(
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
applicationPriority = Integer
.parseInt(eventInfo.get(
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)
.toString());
queue = eventInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)
.toString();
} else if (event.getEventType().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
progress=1.0F;

View File

@ -80,9 +80,9 @@ public class TestApplicationHistoryManagerOnTimelineStore {
store = createStore(SCALE);
TimelineEntities entities = new TimelineEntities();
entities.addEntity(createApplicationTimelineEntity(
ApplicationId.newInstance(0, SCALE + 1), true, true, false));
ApplicationId.newInstance(0, SCALE + 1), true, true, false, false));
entities.addEntity(createApplicationTimelineEntity(
ApplicationId.newInstance(0, SCALE + 2), true, false, true));
ApplicationId.newInstance(0, SCALE + 2), true, false, true, false));
store.put(entities);
}
@ -139,10 +139,10 @@ public class TestApplicationHistoryManagerOnTimelineStore {
ApplicationId appId = ApplicationId.newInstance(0, i);
if (i == 2) {
entities.addEntity(createApplicationTimelineEntity(
appId, true, false, false));
appId, true, false, false, true));
} else {
entities.addEntity(createApplicationTimelineEntity(
appId, false, false, false));
appId, false, false, false, false));
}
store.put(entities);
for (int j = 1; j <= scale; ++j) {
@ -182,7 +182,15 @@ public class TestApplicationHistoryManagerOnTimelineStore {
Assert.assertEquals("test app", app.getName());
Assert.assertEquals("test app type", app.getApplicationType());
Assert.assertEquals("user1", app.getUser());
Assert.assertEquals("test queue", app.getQueue());
if (i == 2) {
// Change event is fired only in case of app with ID 2, hence verify
// with updated changes. And make sure last updated change is accepted.
Assert.assertEquals("changed queue1", app.getQueue());
Assert.assertEquals(Priority.newInstance(6), app.getPriority());
} else {
Assert.assertEquals("test queue", app.getQueue());
Assert.assertEquals(Priority.newInstance(0), app.getPriority());
}
Assert.assertEquals(Integer.MAX_VALUE + 2L
+ app.getApplicationId().getId(), app.getStartTime());
Assert.assertEquals(Integer.MAX_VALUE + 3L
@ -458,7 +466,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
private static TimelineEntity createApplicationTimelineEntity(
ApplicationId appId, boolean emptyACLs, boolean noAttemptId,
boolean wrongAppId) {
boolean wrongAppId, boolean enableUpdateEvent) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
if (wrongAppId) {
@ -515,9 +523,32 @@ public class TestApplicationHistoryManagerOnTimelineStore {
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
if (enableUpdateEvent) {
tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue", 5);
entity.addEvent(tEvent);
// Change priority alone
tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue", 6);
// Now change queue
tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue1", 6);
entity.addEvent(tEvent);
}
return entity;
}
private static void createAppModifiedEvent(ApplicationId appId,
TimelineEvent tEvent, String queue, int priority) {
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 4L + appId.getId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, queue);
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
priority);
tEvent.setEventInfo(eventInfo);
}
private static TimelineEntity createAppAttemptTimelineEntity(
ApplicationAttemptId appAttemptId) {
TimelineEntity entity = new TimelineEntity();

View File

@ -37,6 +37,9 @@ public class ApplicationMetricsConstants {
public static final String ACLS_UPDATED_EVENT_TYPE =
"YARN_APPLICATION_ACLS_UPDATED";
public static final String UPDATED_EVENT_TYPE =
"YARN_APPLICATION_UPDATED";
public static final String NAME_ENTITY_INFO =
"YARN_APPLICATION_NAME";

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
public class ApplicationUpdatedEvent extends SystemMetricsEvent {
private ApplicationId appId;
private String queue;
private Priority applicationPriority;
public ApplicationUpdatedEvent(ApplicationId appId, String queue,
long updatedTime, Priority applicationPriority) {
super(SystemMetricsEventType.APP_UPDATED, updatedTime);
this.appId = appId;
this.queue = queue;
this.applicationPriority = applicationPriority;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public String getQueue() {
return queue;
}
public Priority getApplicationPriority() {
return applicationPriority;
}
}

View File

@ -24,6 +24,7 @@ public enum SystemMetricsEventType {
APP_CREATED,
APP_FINISHED,
APP_ACLS_UPDATED,
APP_UPDATED,
// app attempt events
APP_ATTEMPT_REGISTERED,

View File

@ -118,6 +118,17 @@ public class SystemMetricsPublisher extends CompositeService {
}
}
@SuppressWarnings("unchecked")
public void appUpdated(RMApp app, long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler()
.handle(
new ApplicationUpdatedEvent(app.getApplicationId(), app
.getQueue(), updatedTime, app
.getApplicationSubmissionContext().getPriority()));
}
}
@SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
if (publishSystemMetrics) {
@ -228,6 +239,9 @@ public class SystemMetricsPublisher extends CompositeService {
case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break;
case APP_UPDATED:
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
@ -308,6 +322,21 @@ public class SystemMetricsPublisher extends CompositeService {
putEntity(entity);
}
private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event) {
TimelineEntity entity =

View File

@ -911,7 +911,10 @@ public class RMAppImpl implements RMApp, Recoverable {
moveEvent.getResult().setException(ex);
return;
}
app.rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
// TODO: Write out change to state store (YARN-1558)
// Also take care of RM failover
moveEvent.getResult().set(null);

View File

@ -1869,6 +1869,10 @@ public class CapacityScheduler extends
application.getCurrentAppAttempt());
}
// Update the changed application state to timeline server
rmContext.getSystemMetricsPublisher().appUpdated(rmApp,
System.currentTimeMillis());
LOG.info("Priority '" + appPriority + "' is updated in queue :"
+ rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser());

View File

@ -109,6 +109,17 @@ public class TestSystemMetricsPublisher {
ApplicationId appId = ApplicationId.newInstance(0, i);
RMApp app = createRMApp(appId);
metricsPublisher.appCreated(app, app.getStartTime());
if (i == 1) {
when(app.getQueue()).thenReturn("new test queue");
ApplicationSubmissionContext asc = mock(ApplicationSubmissionContext.class);
when(asc.getUnmanagedAM()).thenReturn(false);
when(asc.getPriority()).thenReturn(Priority.newInstance(1));
when(asc.getNodeLabelExpression()).thenReturn("high-cpu");
when(app.getApplicationSubmissionContext()).thenReturn(asc);
metricsPublisher.appUpdated(app, 4L);
} else {
metricsPublisher.appUpdated(app, 4L);
}
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
if (i == 1) {
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
@ -123,7 +134,7 @@ public class TestSystemMetricsPublisher {
ApplicationMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
// ensure three events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 3);
} while (entity == null || entity.getEvents().size() < 4);
// verify all the fields
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
entity.getEntityType());
@ -134,19 +145,24 @@ public class TestSystemMetricsPublisher {
app.getName(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.NAME_ENTITY_INFO));
Assert.assertEquals(app.getQueue(),
entity.getOtherInfo()
.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
if (i != 1) {
Assert.assertEquals(
app.getQueue(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
}
Assert.assertEquals(
app.getApplicationSubmissionContext().getUnmanagedAM(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO));
Assert.assertEquals(
app.getApplicationSubmissionContext().getPriority().getPriority(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO));
if (i != 1) {
Assert.assertEquals(
app.getApplicationSubmissionContext().getPriority().getPriority(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO));
}
Assert.assertEquals(app.getAmNodeLabelExpression(), entity.getOtherInfo()
.get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION));
@ -190,6 +206,7 @@ public class TestSystemMetricsPublisher {
.get(ApplicationMetricsConstants.APP_CPU_METRICS).toString()));
}
boolean hasCreatedEvent = false;
boolean hasUpdatedEvent = false;
boolean hasFinishedEvent = false;
boolean hasACLsUpdatedEvent = false;
for (TimelineEvent event : entity.getEvents()) {
@ -211,13 +228,28 @@ public class TestSystemMetricsPublisher {
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
.getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
} else if (event.getEventType().equals(
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
hasUpdatedEvent = true;
Assert.assertEquals(4L, event.getTimestamp());
if (1 == i) {
Assert.assertEquals(
1,
event.getEventInfo().get(
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO));
Assert.assertEquals(
"new test queue",
event.getEventInfo().get(
ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
}
} else if (event.getEventType().equals(
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
hasACLsUpdatedEvent = true;
Assert.assertEquals(4L, event.getTimestamp());
}
}
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent);
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent
&& hasACLsUpdatedEvent && hasUpdatedEvent);
}
}