YARN-5699. Retrospect yarn entity fields which are publishing in events info fields. Contributed by Rohith Sharma K S.

This commit is contained in:
Sangjin Lee 2016-10-15 13:54:40 -07:00
parent 5f4ae85bd8
commit 1f304b0c7f
8 changed files with 190 additions and 181 deletions

View File

@ -463,21 +463,21 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
if (eventInfo == null) {
continue;
}
if (eventInfo.containsKey(AppAttemptMetricsConstants.HOST_EVENT_INFO)) {
if (eventInfo.containsKey(AppAttemptMetricsConstants.HOST_INFO)) {
host =
eventInfo.get(AppAttemptMetricsConstants.HOST_EVENT_INFO)
eventInfo.get(AppAttemptMetricsConstants.HOST_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO)) {
.containsKey(AppAttemptMetricsConstants.RPC_PORT_INFO)) {
rpcPort = (Integer) eventInfo.get(
AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO);
AppAttemptMetricsConstants.RPC_PORT_INFO);
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)) {
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) {
amContainerId =
ContainerId.fromString(eventInfo.get(
AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)
.toString());
}
} else if (event.getEventType().equals(
@ -487,39 +487,40 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
continue;
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)) {
.containsKey(AppAttemptMetricsConstants.TRACKING_URL_INFO)) {
trackingUrl =
eventInfo.get(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)
AppAttemptMetricsConstants.TRACKING_URL_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)) {
.containsKey(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)) {
originalTrackingUrl =
eventInfo
.get(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) {
.containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO)) {
diagnosticsInfo =
eventInfo.get(
AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)
AppAttemptMetricsConstants.DIAGNOSTICS_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.STATE_EVENT_INFO)) {
.containsKey(AppAttemptMetricsConstants.STATE_INFO)) {
state =
YarnApplicationAttemptState.valueOf(eventInfo.get(
AppAttemptMetricsConstants.STATE_EVENT_INFO)
AppAttemptMetricsConstants.STATE_INFO)
.toString());
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)) {
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) {
amContainerId =
ContainerId.fromString(eventInfo.get(
AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)
.toString());
}
}
@ -547,37 +548,37 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
Map<String, Object> entityInfo = entity.getOtherInfo();
if (entityInfo != null) {
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO)) {
.containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO)) {
allocatedMem = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO);
ContainerMetricsConstants.ALLOCATED_MEMORY_INFO);
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO)) {
.containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_INFO)) {
allocatedVcore = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO);
ContainerMetricsConstants.ALLOCATED_VCORE_INFO);
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)) {
.containsKey(ContainerMetricsConstants.ALLOCATED_HOST_INFO)) {
allocatedHost =
entityInfo
.get(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)
.get(ContainerMetricsConstants.ALLOCATED_HOST_INFO)
.toString();
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO)) {
.containsKey(ContainerMetricsConstants.ALLOCATED_PORT_INFO)) {
allocatedPort = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO);
ContainerMetricsConstants.ALLOCATED_PORT_INFO);
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO)) {
.containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO)) {
allocatedPriority = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO);
ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO);
}
if (entityInfo.containsKey(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO)) {
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO)) {
nodeHttpAddress =
(String) entityInfo
.get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO);
.get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
}
}
List<TimelineEvent> events = entity.getEvents();
@ -594,22 +595,22 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
continue;
}
if (eventInfo
.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) {
.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)) {
diagnosticsInfo =
eventInfo.get(
ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)
ContainerMetricsConstants.DIAGNOSTICS_INFO)
.toString();
}
if (eventInfo
.containsKey(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO)) {
.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO)) {
exitStatus = (Integer) eventInfo.get(
ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO);
ContainerMetricsConstants.EXIT_STATUS_INFO);
}
if (eventInfo
.containsKey(ContainerMetricsConstants.STATE_EVENT_INFO)) {
.containsKey(ContainerMetricsConstants.STATE_INFO)) {
state =
ContainerState.valueOf(eventInfo.get(
ContainerMetricsConstants.STATE_EVENT_INFO).toString());
ContainerMetricsConstants.STATE_INFO).toString());
}
}
}

View File

@ -593,13 +593,13 @@ public class TestApplicationHistoryManagerOnTimelineStore {
tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 1L);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO,
"test tracking url");
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO,
"test original tracking url");
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, "test host");
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, 100);
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.HOST_INFO, "test host");
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_INFO, 100);
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
ContainerId.newContainerId(appAttemptId, 1));
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
@ -607,15 +607,15 @@ public class TestApplicationHistoryManagerOnTimelineStore {
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 2L);
eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO,
"test tracking url");
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO,
"test original tracking url");
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO,
"test diagnostics info");
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
FinalApplicationStatus.UNDEFINED.toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO,
YarnApplicationAttemptState.FINISHED.toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
@ -632,15 +632,15 @@ public class TestApplicationHistoryManagerOnTimelineStore {
entity.addPrimaryFilter(
TimelineStore.SystemFilter.ENTITY_OWNER.toString(), "yarn");
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, -1);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, -1);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO, -1);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO, -1);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
"test host");
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, 100);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO, 100);
entityInfo
.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1);
.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO, -1);
entityInfo.put(ContainerMetricsConstants
.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, "http://test:1234");
.ALLOCATED_HOST_HTTP_ADDRESS_INFO, "http://test:1234");
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
@ -651,10 +651,10 @@ public class TestApplicationHistoryManagerOnTimelineStore {
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 2L);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
"test diagnostics info");
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, -1);
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO, -1);
eventInfo.put(ContainerMetricsConstants.STATE_INFO,
ContainerState.COMPLETE.toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);

View File

@ -37,28 +37,28 @@ public class AppAttemptMetricsConstants {
public static final String PARENT_PRIMARY_FILTER =
"YARN_APPLICATION_ATTEMPT_PARENT";
public static final String TRACKING_URL_EVENT_INFO =
public static final String TRACKING_URL_INFO =
"YARN_APPLICATION_ATTEMPT_TRACKING_URL";
public static final String ORIGINAL_TRACKING_URL_EVENT_INFO =
public static final String ORIGINAL_TRACKING_URL_INFO =
"YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL";
public static final String HOST_EVENT_INFO =
public static final String HOST_INFO =
"YARN_APPLICATION_ATTEMPT_HOST";
public static final String RPC_PORT_EVENT_INFO =
public static final String RPC_PORT_INFO =
"YARN_APPLICATION_ATTEMPT_RPC_PORT";
public static final String MASTER_CONTAINER_EVENT_INFO =
public static final String MASTER_CONTAINER_INFO =
"YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER";
public static final String DIAGNOSTICS_INFO_EVENT_INFO =
public static final String DIAGNOSTICS_INFO =
"YARN_APPLICATION_ATTEMPT_DIAGNOSTICS_INFO";
public static final String FINAL_STATUS_EVENT_INFO =
public static final String FINAL_STATUS_INFO =
"YARN_APPLICATION_ATTEMPT_FINAL_STATUS";
public static final String STATE_EVENT_INFO =
public static final String STATE_INFO =
"YARN_APPLICATION_ATTEMPT_STATE";
}

View File

@ -41,33 +41,36 @@ public class ContainerMetricsConstants {
public static final String FINISHED_IN_RM_EVENT_TYPE =
"YARN_RM_CONTAINER_FINISHED";
public static final String CONTAINER_FINISHED_TIME =
"YARN_CONTAINER_FINISHED_TIME";
public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT";
public static final String ALLOCATED_MEMORY_ENTITY_INFO =
public static final String ALLOCATED_MEMORY_INFO =
"YARN_CONTAINER_ALLOCATED_MEMORY";
public static final String ALLOCATED_VCORE_ENTITY_INFO =
public static final String ALLOCATED_VCORE_INFO =
"YARN_CONTAINER_ALLOCATED_VCORE";
public static final String ALLOCATED_HOST_ENTITY_INFO =
public static final String ALLOCATED_HOST_INFO =
"YARN_CONTAINER_ALLOCATED_HOST";
public static final String ALLOCATED_PORT_ENTITY_INFO =
public static final String ALLOCATED_PORT_INFO =
"YARN_CONTAINER_ALLOCATED_PORT";
public static final String ALLOCATED_PRIORITY_ENTITY_INFO =
public static final String ALLOCATED_PRIORITY_INFO =
"YARN_CONTAINER_ALLOCATED_PRIORITY";
public static final String DIAGNOSTICS_INFO_EVENT_INFO =
public static final String DIAGNOSTICS_INFO =
"YARN_CONTAINER_DIAGNOSTICS_INFO";
public static final String EXIT_STATUS_EVENT_INFO =
public static final String EXIT_STATUS_INFO =
"YARN_CONTAINER_EXIT_STATUS";
public static final String STATE_EVENT_INFO =
public static final String STATE_INFO =
"YARN_CONTAINER_STATE";
public static final String ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO =
public static final String ALLOCATED_HOST_HTTP_ADDRESS_INFO =
"YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS";
// Event of this type will be emitted by NM.

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -98,7 +99,6 @@ public class NMTimelinePublisher extends CompositeService {
// context will be updated after containerManagerImpl is started
// hence NMMetricsPublisher is added subservice of containerManagerImpl
this.nodeId = context.getNodeId();
this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort();
}
@VisibleForTesting
@ -167,18 +167,18 @@ public class NMTimelinePublisher extends CompositeService {
Resource resource = container.getResource();
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
resource.getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
resource.getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
nodeId.getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
nodeId.getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
container.getPriority().toString());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
httpAddress);
entity.setInfo(entityInfo);
@ -198,19 +198,20 @@ public class NMTimelinePublisher extends CompositeService {
ContainerId containerId = containerStatus.getContainerId();
TimelineEntity entity = createContainerEntity(containerId);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
containerStatus.getDiagnostics());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
containerStatus.getExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, containerStatus
.getState().toString());
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
ContainerState.COMPLETE.toString());
entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
timeStamp);
entity.setInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(timeStamp);
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
@ -304,6 +305,11 @@ public class NMTimelinePublisher extends CompositeService {
public void publishContainerEvent(ContainerEvent event) {
// publish only when the desired event is received
if (this.httpAddress == null) {
// update httpAddress for first time. When this service started,
// web server will not be started.
this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort();
}
switch (event.getType()) {
case INIT_CONTAINER:
publishContainerCreatedEvent(event);

View File

@ -215,16 +215,16 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(registeredTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO,
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO,
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.HOST_INFO,
appAttempt.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_INFO,
appAttempt.getRpcPort());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
appAttempt.getMasterContainer().getId().toString());
}
tEvent.setEventInfo(eventInfo);
@ -246,18 +246,18 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO,
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO,
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO,
appAttempt.getDiagnostics());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
appAttempt.getMasterContainer().getId().toString());
}
tEvent.setEventInfo(eventInfo);
@ -273,18 +273,18 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
public void containerCreated(RMContainer container, long createdTime) {
TimelineEntity entity = createContainerEntity(container.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
container.getAllocatedResource().getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
container.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
container.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
container.getAllocatedNode().getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
container.getAllocatedPriority().getPriority());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
container.getNodeHttpAddress());
entity.setOtherInfo(entityInfo);
@ -307,16 +307,16 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
container.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
container.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
eventInfo.put(ContainerMetricsConstants.STATE_INFO,
container.getContainerState().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
container.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
container.getAllocatedNode().getPort());
entity.setOtherInfo(entityInfo);
tEvent.setEventInfo(eventInfo);

View File

@ -157,22 +157,22 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
entity.addEvent(tEvent);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
app.getDiagnostics().toString());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
entityInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
entityInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
RMServerUtils.createApplicationState(state).toString());
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null
? null : app.getCurrentAppAttempt().getAppAttemptId();
if (appAttemptId != null) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
entityInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
appAttemptId.toString());
}
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
entity.setInfo(entityInfo);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
@ -193,6 +193,11 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
// publish in entity info also to query using filters
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, appState);
entity.setInfo(entityInfo);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@ -250,21 +255,23 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(registeredTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
entity.addEvent(tEvent);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO,
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
entityInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO,
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
entityInfo.put(AppAttemptMetricsConstants.HOST_INFO,
appAttempt.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
entityInfo.put(AppAttemptMetricsConstants.RPC_PORT_INFO,
appAttempt.getRpcPort());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
entityInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
appAttempt.getMasterContainer().getId().toString());
}
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
entity.setInfo(entityInfo);
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
entity, appAttempt.getAppAttemptId().getApplicationId()));
@ -281,26 +288,20 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
entity.addEvent(tEvent);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO,
appAttempt.getDiagnostics());
// app will get the final status from app attempt, or create one
// based on app state if it doesn't exist
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
entityInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
entityInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
appAttempt.getMasterContainer().getId().toString());
}
tEvent.setInfo(eventInfo);
entity.setInfo(entityInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
entity, appAttempt.getAppAttemptId().getApplicationId()));
@ -325,25 +326,26 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(createdTime);
entity.addEvent(tEvent);
// updated as event info instead of entity info, as entity info is updated
// by NM
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
container.getAllocatedResource().getMemorySize());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
container.getAllocatedResource().getVirtualCores());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
container.getAllocatedNode().getHost());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
container.getAllocatedNode().getPort());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
container.getAllocatedPriority().getPriority());
eventInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
container.getNodeHttpAddress());
tEvent.setInfo(eventInfo);
entity.setInfo(entityInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, container
.getContainerId().getApplicationAttemptId().getApplicationId()));
@ -359,22 +361,19 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
container.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
container.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
container.getContainerState().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
container.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
container.getAllocatedNode().getPort());
entity.setInfo(entityInfo);
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
container.getDiagnosticsInfo());
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
container.getContainerExitStatus());
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
container.getContainerState().toString());
entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
finishedTime);
entity.setInfo(entityInfo);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, container
.getContainerId().getApplicationAttemptId().getApplicationId()));

View File

@ -341,34 +341,34 @@ public class TestSystemMetricsPublisher {
hasRegisteredEvent = true;
Assert.assertEquals(appAttempt.getHost(),
event.getEventInfo()
.get(AppAttemptMetricsConstants.HOST_EVENT_INFO));
.get(AppAttemptMetricsConstants.HOST_INFO));
Assert
.assertEquals(appAttempt.getRpcPort(),
event.getEventInfo().get(
AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO));
AppAttemptMetricsConstants.RPC_PORT_INFO));
Assert.assertEquals(
appAttempt.getMasterContainer().getId().toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO));
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO));
} else if (event.getEventType().equals(
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
hasFinishedEvent = true;
Assert.assertEquals(appAttempt.getDiagnostics(), event.getEventInfo()
.get(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
.get(AppAttemptMetricsConstants.DIAGNOSTICS_INFO));
Assert.assertEquals(appAttempt.getTrackingUrl(), event.getEventInfo()
.get(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO));
.get(AppAttemptMetricsConstants.TRACKING_URL_INFO));
Assert.assertEquals(
appAttempt.getOriginalTrackingUrl(),
event.getEventInfo().get(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO));
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO));
Assert.assertEquals(
FinalApplicationStatus.UNDEFINED.toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO));
AppAttemptMetricsConstants.FINAL_STATUS_INFO));
Assert.assertEquals(
YarnApplicationAttemptState.FINISHED.toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.STATE_EVENT_INFO));
AppAttemptMetricsConstants.STATE_INFO));
}
}
Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
@ -391,17 +391,17 @@ public class TestSystemMetricsPublisher {
Assert.assertNotNull(entity.getOtherInfo());
Assert.assertEquals(2, entity.getOtherInfo().size());
Assert.assertNotNull(entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_HOST_INFO));
Assert.assertNotNull(entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_PORT_INFO));
Assert.assertEquals(
container.getAllocatedNode().getHost(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_HOST_INFO));
Assert.assertEquals(
container.getAllocatedNode().getPort(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_PORT_INFO));
}
@Test(timeout = 10000)
@ -432,25 +432,25 @@ public class TestSystemMetricsPublisher {
Assert.assertEquals(
container.getAllocatedNode().getHost(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_HOST_INFO));
Assert.assertEquals(
container.getAllocatedNode().getPort(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_PORT_INFO));
Assert.assertEquals(container.getAllocatedResource().getMemorySize(),
// KeyValueBasedTimelineStore could cast long to integer, need make sure
// variables for compare have same type.
((Integer) entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO))
ContainerMetricsConstants.ALLOCATED_MEMORY_INFO))
.longValue());
Assert.assertEquals(
container.getAllocatedResource().getVirtualCores(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_VCORE_INFO));
Assert.assertEquals(
container.getAllocatedPriority().getPriority(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO));
ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO));
boolean hasCreatedEvent = false;
boolean hasFinishedEvent = false;
for (TimelineEvent event : entity.getEvents()) {
@ -465,13 +465,13 @@ public class TestSystemMetricsPublisher {
Assert.assertEquals(
container.getDiagnosticsInfo(),
event.getEventInfo().get(
ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
ContainerMetricsConstants.DIAGNOSTICS_INFO));
Assert.assertEquals(
container.getContainerExitStatus(),
event.getEventInfo().get(
ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO));
ContainerMetricsConstants.EXIT_STATUS_INFO));
Assert.assertEquals(container.getContainerState().toString(), event
.getEventInfo().get(ContainerMetricsConstants.STATE_EVENT_INFO));
.getEventInfo().get(ContainerMetricsConstants.STATE_INFO));
}
}
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent);