diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml
index 2bf5c02e110..d282c5841c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml
+++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml
@@ -223,4 +223,15 @@
group list is separated by a blank. For e.g. "alice,bob users,wheel".
A special value of "*" means all users are allowed.
+
+
+ security.collector-nodemanager.protocol.acl
+ *
+ ACL for CollectorNodemanagerProtocol, used by nodemanager
+ if timeline service v2 is enabled, for the timeline collector and nodemanager
+ to communicate with each other.
+ The ACL is a comma-separated list of user and group names. The user and
+ group list is separated by a blank. For e.g. "alice,bob users,wheel".
+ A special value of "*" means all users are allowed.
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 53fe055f75e..cfa91f530a3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
@@ -1124,7 +1125,7 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
String entityType, String relatedJobEntity, JobId jobId,
- boolean setCreatedTime) {
+ boolean setCreatedTime, long taskIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskId);
@@ -1133,6 +1134,7 @@ public class JobHistoryEventHandler extends AbstractService
((TaskStartedEvent)event).getTaskType().toString());
}
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
+ entity.setIdPrefix(taskIdPrefix);
return entity;
}
@@ -1141,11 +1143,12 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskAttemptEntity(HistoryEvent event, long timestamp,
String taskAttemptId, String entityType, String relatedTaskEntity,
- String taskId, boolean setCreatedTime) {
+ String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskAttemptId);
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
+ entity.setIdPrefix(taskAttemptIdPrefix);
return entity;
}
@@ -1196,6 +1199,8 @@ public class JobHistoryEventHandler extends AbstractService
String taskId = null;
String taskAttemptId = null;
boolean setCreatedTime = false;
+ long taskIdPrefix = 0;
+ long taskAttemptIdPrefix = 0;
switch (event.getEventType()) {
// Handle job events
@@ -1218,15 +1223,21 @@ public class JobHistoryEventHandler extends AbstractService
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
+ taskIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskStartedEvent)event).getStartTime());
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
+ taskIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskFailedEvent)event).getStartTime());
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
+ taskIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskFinishedEvent)event).getStartTime());
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
@@ -1234,6 +1245,8 @@ public class JobHistoryEventHandler extends AbstractService
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskAttemptStartedEvent)event).getStartTime());
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
@@ -1253,16 +1266,22 @@ public class JobHistoryEventHandler extends AbstractService
getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.invertLong(
+ ((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime());
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).
getAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.
+ invertLong(((MapAttemptFinishedEvent)event).getStartTime());
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).
getAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.
+ invertLong(((ReduceAttemptFinishedEvent)event).getStartTime());
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
@@ -1291,12 +1310,12 @@ public class JobHistoryEventHandler extends AbstractService
// TaskEntity
tEntity = createTaskEntity(event, timestamp, taskId,
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
- jobId, setCreatedTime);
+ jobId, setCreatedTime, taskIdPrefix);
} else {
// TaskAttemptEntity
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
- taskId, setCreatedTime);
+ taskId, setCreatedTime, taskAttemptIdPrefix);
}
}
try {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 9ea1b9aa922..3faad480b9d 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1530,7 +1530,7 @@ public abstract class TaskAttemptImpl implements
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getCounters(), taskAttempt
- .getProgressSplitBlock().burst());
+ .getProgressSplitBlock().burst(), taskAttempt.launchTime);
return tauce;
}
@@ -1943,35 +1943,35 @@ public abstract class TaskAttemptImpl implements
this.container == null ? -1 : this.container.getNodeId().getPort();
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe =
- new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
- TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
- state.toString(),
- this.reportedStatus.mapFinishTime,
- finishTime,
- containerHostName,
- containerNodePort,
- this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
- this.reportedStatus.stateString,
- getCounters(),
- getProgressSplitBlock().burst());
- eventHandler.handle(
- new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
+ new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.mapFinishTime,
+ finishTime,
+ containerHostName,
+ containerNodePort,
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst(), launchTime);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
} else {
- ReduceAttemptFinishedEvent rfe =
- new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
- TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
- state.toString(),
- this.reportedStatus.shuffleFinishTime,
- this.reportedStatus.sortFinishTime,
- finishTime,
- containerHostName,
- containerNodePort,
- this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
- this.reportedStatus.stateString,
- getCounters(),
- getProgressSplitBlock().burst());
- eventHandler.handle(
- new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
+ ReduceAttemptFinishedEvent rfe =
+ new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.shuffleFinishTime,
+ this.reportedStatus.sortFinishTime,
+ finishTime,
+ containerHostName,
+ containerNodePort,
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst(), launchTime);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
index 8a6fa304d4e..228ae24a955 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
@@ -139,6 +139,8 @@ public abstract class TaskImpl implements Task, EventHandler {
private final Set inProgressAttempts;
private boolean historyTaskStartGenerated = false;
+ // Launch time reported in history events.
+ private long launchTime;
private static final SingleArcTransition
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
@@ -705,8 +707,9 @@ public abstract class TaskImpl implements Task, EventHandler {
}
private void sendTaskStartedEvent() {
+ launchTime = getLaunchTime();
TaskStartedEvent tse = new TaskStartedEvent(
- TypeConverter.fromYarn(taskId), getLaunchTime(),
+ TypeConverter.fromYarn(taskId), launchTime,
TypeConverter.fromYarn(taskId.getTaskType()),
getSplitsAsString());
eventHandler
@@ -714,18 +717,19 @@ public abstract class TaskImpl implements Task, EventHandler {
historyTaskStartGenerated = true;
}
- private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
+ private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task,
+ TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
TypeConverter.fromYarn(task.successfulAttempt),
task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()),
- taskState.toString(),
- task.getCounters());
+ taskState.toString(), task.getCounters(), task.launchTime);
return tfe;
}
- private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List diag, TaskStateInternal taskState, TaskAttemptId taId) {
+ private static TaskFailedEvent createTaskFailedEvent(TaskImpl task,
+ List diag, TaskStateInternal taskState, TaskAttemptId taId) {
StringBuilder errorSb = new StringBuilder();
if (diag != null) {
for (String d : diag) {
@@ -740,7 +744,7 @@ public abstract class TaskImpl implements Task, EventHandler {
errorSb.toString(),
taskState.toString(),
taId == null ? null : TypeConverter.fromYarn(taId),
- task.getCounters());
+ task.getCounters(), task.launchTime);
return taskFailedEvent;
}
@@ -861,7 +865,8 @@ public abstract class TaskImpl implements Task, EventHandler {
TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
taskInfo.getFinishTime(), taskInfo.getTaskType(),
taskInfo.getError(), taskInfo.getTaskStatus(),
- taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+ taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(),
+ launchTime);
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
eventHandler.handle(
new JobTaskEvent(taskId, getExternalState(taskState)));
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 09527977def..0dc7642418c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -848,7 +848,8 @@ public class RMContainerAllocator extends RMContainerRequestor
updateAMRMToken(response.getAMRMToken());
}
- List finishedContainers = response.getCompletedContainersStatuses();
+ List finishedContainers =
+ response.getCompletedContainersStatuses();
// propagate preemption requests
final PreemptionMessage preemptReq = response.getPreemptionMessage();
@@ -877,16 +878,13 @@ public class RMContainerAllocator extends RMContainerRequestor
handleUpdatedNodes(response);
handleJobPriorityChange(response);
- // handle receiving the timeline collector address for this app
- String collectorAddr = response.getCollectorAddr();
+ // Handle receiving the timeline collector address and token for this app.
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
- if (collectorAddr != null && !collectorAddr.isEmpty()
- && appContext.getTimelineV2Client() != null) {
- appContext.getTimelineV2Client().setTimelineServiceAddress(
- response.getCollectorAddr());
+ if (appContext.getTimelineV2Client() != null) {
+ appContext.getTimelineV2Client().
+ setTimelineCollectorInfo(response.getCollectorInfo());
}
-
for (ContainerStatus cont : finishedContainers) {
processFinishedContainer(cont);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
index ac510b39434..e2713191ac7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
@@ -58,7 +58,7 @@ public class TestEvents {
Counters counters = new Counters();
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
- counters);
+ counters, 234);
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
assertEquals(test.getCounters(), counters);
@@ -69,7 +69,7 @@ public class TestEvents {
assertEquals(test.getTaskId(), tid);
assertEquals(test.getTaskStatus(), "TEST");
assertEquals(test.getTaskType(), TaskType.REDUCE);
-
+ assertEquals(234, test.getStartTime());
}
/**
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index caf8c6718a9..e35a84d537e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -148,7 +148,7 @@ public class TestJobHistoryEventHandler {
// First completion event, but min-queue-size for batching flushes is 10
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
verify(mockWriter).flush();
} finally {
@@ -184,7 +184,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@@ -229,7 +229,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@@ -272,7 +272,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 6c51626af9d..e4a8a1a90b9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@@ -27,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -99,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -121,6 +125,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -140,6 +145,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
@@ -748,6 +754,96 @@ public class TestRMContainerAllocator {
}
}
+ @Test
+ public void testUpdateCollectorInfo() throws Exception {
+ LOG.info("Running testUpdateCollectorInfo");
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ String localAddr = "localhost:1234";
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ // Generate a timeline delegation token.
+ TimelineDelegationTokenIdentifier ident =
+ new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
+ new Text("renewer"), null);
+ ident.setSequenceNumber(1);
+ Token collectorToken =
+ new Token(ident.getBytes(),
+ new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
+ new Text(localAddr));
+ org.apache.hadoop.yarn.api.records.Token token =
+ org.apache.hadoop.yarn.api.records.Token.newInstance(
+ collectorToken.getIdentifier(), collectorToken.getKind().toString(),
+ collectorToken.getPassword(),
+ collectorToken.getService().toString());
+ CollectorInfo collectorInfo = CollectorInfo.newInstance(localAddr, token);
+ // Mock scheduler to server Allocate request.
+ final MockSchedulerForTimelineCollector mockScheduler =
+ new MockSchedulerForTimelineCollector(collectorInfo);
+ MyContainerAllocator allocator =
+ new MyContainerAllocator(null, conf, attemptId, mockJob,
+ SystemClock.getInstance()) {
+ @Override
+ protected void register() {
+ }
+
+ @Override
+ protected ApplicationMasterProtocol createSchedulerProxy() {
+ return mockScheduler;
+ }
+ };
+ // Initially UGI should have no tokens.
+ ArrayList> tokens =
+ new ArrayList<>(ugi.getTokens());
+ assertEquals(0, tokens.size());
+ TimelineV2Client client = spy(TimelineV2Client.createTimelineClient(appId));
+ client.init(conf);
+ when(((RunningAppContext)allocator.getContext()).getTimelineV2Client()).
+ thenReturn(client);
+
+ // Send allocate request to RM and fetch collector address and token.
+ allocator.schedule();
+ verify(client).setTimelineCollectorInfo(collectorInfo);
+ // Verify if token has been updated in UGI.
+ tokens = new ArrayList<>(ugi.getTokens());
+ assertEquals(1, tokens.size());
+ assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
+ tokens.get(0).getKind());
+ assertEquals(collectorToken.decodeIdentifier(),
+ tokens.get(0).decodeIdentifier());
+
+ // Generate new collector token, send allocate request to RM and fetch the
+ // new token.
+ ident.setSequenceNumber(100);
+ Token collectorToken1 =
+ new Token(ident.getBytes(),
+ new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
+ new Text(localAddr));
+ token = org.apache.hadoop.yarn.api.records.Token.newInstance(
+ collectorToken1.getIdentifier(), collectorToken1.getKind().toString(),
+ collectorToken1.getPassword(), collectorToken1.getService().toString());
+ collectorInfo = CollectorInfo.newInstance(localAddr, token);
+ mockScheduler.updateCollectorInfo(collectorInfo);
+ allocator.schedule();
+ verify(client).setTimelineCollectorInfo(collectorInfo);
+ // Verify if new token has been updated in UGI.
+ tokens = new ArrayList<>(ugi.getTokens());
+ assertEquals(1, tokens.size());
+ assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
+ tokens.get(0).getKind());
+ assertEquals(collectorToken1.decodeIdentifier(),
+ tokens.get(0).decodeIdentifier());
+ allocator.close();
+ }
+
@Test
public void testMapReduceScheduling() throws Exception {
@@ -3488,6 +3584,46 @@ public class TestRMContainerAllocator {
}
}
+ private final static class MockSchedulerForTimelineCollector
+ implements ApplicationMasterProtocol {
+ private CollectorInfo collectorInfo;
+
+ private MockSchedulerForTimelineCollector(CollectorInfo info) {
+ this.collectorInfo = info;
+ }
+
+ private void updateCollectorInfo(CollectorInfo info) {
+ collectorInfo = info;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return Records.newRecord(RegisterApplicationMasterResponse.class);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return FinishApplicationMasterResponse.newInstance(false);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ AllocateResponse response = AllocateResponse.newInstance(
+ request.getResponseId(), Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Resource.newInstance(512000, 1024), null, 10, null,
+ Collections.emptyList());
+ response.setCollectorInfo(collectorInfo);
+ return response;
+ }
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
index 3121c4e0016..2b1357ea859 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
@@ -32,9 +32,10 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
- * Event to record successful completion of a map attempt
+ * Event to record successful completion of a map attempt.
*
*/
@InterfaceAudience.Private
@@ -58,9 +59,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
+ private long startTime;
/**
- * Create an event for successful completion of map attempts
+ * Create an event for successful completion of map attempts.
* @param id Task Attempt ID
* @param taskType Type of the task
* @param taskStatus Status of the task
@@ -77,12 +79,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* virtual memory and physical memory.
*
* If you have no splits data, code {@code null} for this
- * parameter.
+ * parameter.
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
- public MapAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long mapFinishTime, long finishTime, String hostname, int port,
- String rackName, String state, Counters counters, int[][] allSplits) {
+ public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long mapFinishTime, long finishTime, String hostname,
+ int port, String rackName, String state, Counters counters,
+ int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@@ -98,6 +101,16 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+ this.startTime = startTs;
+ }
+
+ public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long mapFinishTime, long finishTime, String hostname,
+ int port, String rackName, String state, Counters counters,
+ int[][] allSplits) {
+ this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, port,
+ rackName, state, counters, allSplits,
+ SystemClock.getInstance().getTime());
}
/**
@@ -117,15 +130,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* @param counters Counters for the attempt
*/
@Deprecated
- public MapAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long mapFinishTime, long finishTime, String hostname,
- String state, Counters counters) {
+ public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long mapFinishTime, long finishTime, String hostname,
+ String state, Counters counters) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "",
state, counters, null);
}
-
-
+
MapAttemptFinishedEvent() {}
public Object getDatum() {
@@ -175,38 +186,56 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
- /** Get the task ID */
- public TaskID getTaskId() { return attemptId.getTaskID(); }
- /** Get the attempt id */
+ /** Gets the task ID. */
+ public TaskID getTaskId() {
+ return attemptId.getTaskID();
+ }
+ /** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the task status */
+ /** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
- /** Get the map phase finish time */
+ /** Gets the map phase finish time. */
public long getMapFinishTime() { return mapFinishTime; }
- /** Get the attempt finish time */
+ /** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
- /** Get the host name */
+ /**
+ * Gets the task attempt start time.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the host name. */
public String getHostname() { return hostname.toString(); }
- /** Get the tracker rpc port */
+ /** Gets the tracker rpc port. */
public int getPort() { return port; }
- /** Get the rack name */
+ /** Gets the rack name. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
-
- /** Get the state string */
- public String getState() { return state.toString(); }
- /** Get the counters */
- Counters getCounters() { return counters; }
- /** Get the event type */
+ /**
+ * Gets the attempt state string.
+ * @return map attempt state
+ */
+ public String getState() {
+ return state.toString();
+ }
+ /**
+ * Gets the counters.
+ * @return counters
+ */
+ Counters getCounters() {
+ return counters;
+ }
+ /** Gets the event type. */
public EventType getEventType() {
return EventType.MAP_ATTEMPT_FINISHED;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
index 9c0f09b017d..5a16f834acb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful completion of a reduce attempt
@@ -59,6 +60,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
+ private long startTime;
/**
* Create an event to record completion of a reduce attempt
@@ -76,13 +78,13 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
- * virtual memory and physical memory.
+ * virtual memory and physical memory.
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
- public ReduceAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long shuffleFinishTime, long sortFinishTime, long finishTime,
- String hostname, int port, String rackName, String state,
- Counters counters, int[][] allSplits) {
+ public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long shuffleFinishTime, long sortFinishTime,
+ long finishTime, String hostname, int port, String rackName,
+ String state, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@@ -99,6 +101,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+ this.startTime = startTs;
+ }
+
+ public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long shuffleFinishTime, long sortFinishTime,
+ long finishTime, String hostname, int port, String rackName,
+ String state, Counters counters, int[][] allSplits) {
+ this(id, taskType, taskStatus, shuffleFinishTime, sortFinishTime,
+ finishTime, hostname, port, rackName, state, counters, allSplits,
+ SystemClock.getInstance().getTime());
}
/**
@@ -118,13 +130,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param state State of the attempt
* @param counters Counters for the attempt
*/
- public ReduceAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long shuffleFinishTime, long sortFinishTime, long finishTime,
- String hostname, String state, Counters counters) {
+ public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long shuffleFinishTime, long sortFinishTime,
+ long finishTime, String hostname, String state, Counters counters) {
this(id, taskType, taskStatus,
- shuffleFinishTime, sortFinishTime, finishTime,
- hostname, -1, "", state, counters, null);
+ shuffleFinishTime, sortFinishTime, finishTime,
+ hostname, -1, "", state, counters, null);
}
ReduceAttemptFinishedEvent() {}
@@ -178,39 +189,55 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
- /** Get the Task ID */
+ /** Gets the Task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
- /** Get the attempt id */
+ /** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the task status */
+ /** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
- /** Get the finish time of the sort phase */
+ /** Gets the finish time of the sort phase. */
public long getSortFinishTime() { return sortFinishTime; }
- /** Get the finish time of the shuffle phase */
+ /** Gets the finish time of the shuffle phase. */
public long getShuffleFinishTime() { return shuffleFinishTime; }
- /** Get the finish time of the attempt */
+ /** Gets the finish time of the attempt. */
public long getFinishTime() { return finishTime; }
- /** Get the name of the host where the attempt ran */
+ /**
+ * Gets the start time.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the name of the host where the attempt ran. */
public String getHostname() { return hostname.toString(); }
- /** Get the tracker rpc port */
+ /** Gets the tracker rpc port. */
public int getPort() { return port; }
- /** Get the rack name of the node where the attempt ran */
+ /** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
-
- /** Get the state string */
- public String getState() { return state.toString(); }
- /** Get the counters for the attempt */
- Counters getCounters() { return counters; }
- /** Get the event type */
+ /**
+ * Gets the state string.
+ * @return reduce attempt state
+ */
+ public String getState() {
+ return state.toString();
+ }
+ /**
+ * Gets the counters.
+ * @return counters
+ */
+ Counters getCounters() {
+ return counters;
+ }
+ /** Gets the event type. */
public EventType getEventType() {
return EventType.REDUCE_ATTEMPT_FINISHED;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index a931ca24033..c28c21605df 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful task completion
@@ -50,10 +51,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private String hostname;
private String state;
private Counters counters;
+ private long startTime;
/**
- * Create an event to record successful finishes for setup and cleanup
- * attempts
+ * Create an event to record successful finishes for setup and cleanup
+ * attempts.
* @param id Attempt ID
* @param taskType Type of task
* @param taskStatus Status of task
@@ -61,11 +63,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
* @param hostname Host where the attempt executed
* @param state State string
* @param counters Counters for the attempt
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus,
long finishTime, String rackName,
- String hostname, String state, Counters counters) {
+ String hostname, String state, Counters counters, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@@ -74,6 +77,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.hostname = hostname;
this.state = state;
this.counters = counters;
+ this.startTime = startTs;
+ }
+
+ public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long finishTime, String rackName, String hostname,
+ String state, Counters counters) {
+ this(id, taskType, taskStatus, finishTime, rackName, hostname, state,
+ counters, SystemClock.getInstance().getTime());
}
TaskAttemptFinishedEvent() {}
@@ -107,33 +118,43 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
- /** Get the task ID */
+ /** Gets the task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
- /** Get the task attempt id */
+ /** Gets the task attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the task status */
+ /** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
- /** Get the attempt finish time */
+ /** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
- /** Get the host where the attempt executed */
+ /**
+ * Gets the task attempt start time to be used while publishing to ATSv2.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the host where the attempt executed. */
public String getHostname() { return hostname.toString(); }
- /** Get the rackname where the attempt executed */
+ /** Gets the rackname where the attempt executed. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
- /** Get the state string */
+ /**
+ * Gets the state string.
+ * @return task attempt state.
+ */
public String getState() { return state.toString(); }
- /** Get the counters for the attempt */
+ /** Gets the counters for the attempt. */
Counters getCounters() { return counters; }
- /** Get the event type */
+ /** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 175296725ff..9afa09384cc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record unsuccessful (Killed/Failed) completion of task attempts
@@ -58,10 +59,11 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
+ private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
- * Create an event to record the unsuccessful completion of attempts
+ * Create an event to record the unsuccessful completion of attempts.
* @param id Attempt ID
* @param taskType Type of the task
* @param status Status of the attempt
@@ -75,12 +77,13 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
* virtual memory and physical memory.
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
- String error, Counters counters, int[][] allSplits) {
+ String error, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.status = status;
@@ -99,6 +102,15 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes =
ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+ this.startTime = startTs;
+ }
+
+ public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id,
+ TaskType taskType, String status, long finishTime, String hostname,
+ int port, String rackName, String error, Counters counters,
+ int[][] allSplits) {
+ this(id, taskType, status, finishTime, hostname, port, rackName, error,
+ counters, allSplits, SystemClock.getInstance().getTime());
}
/**
@@ -190,39 +202,49 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
- /** Get the task id */
+ /** Gets the task id. */
public TaskID getTaskId() {
return attemptId.getTaskID();
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return TaskType.valueOf(taskType.toString());
}
- /** Get the attempt id */
+ /** Gets the attempt id. */
public TaskAttemptID getTaskAttemptId() {
return attemptId;
}
- /** Get the finish time */
+ /** Gets the finish time. */
public long getFinishTime() { return finishTime; }
- /** Get the name of the host where the attempt executed */
+ /**
+ * Gets the task attempt start time to be used while publishing to ATSv2.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the name of the host where the attempt executed. */
public String getHostname() { return hostname; }
- /** Get the rpc port for the host where the attempt executed */
+ /** Gets the rpc port for the host where the attempt executed. */
public int getPort() { return port; }
- /** Get the rack name of the node where the attempt ran */
+ /** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
- /** Get the error string */
+ /** Gets the error string. */
public String getError() { return error.toString(); }
- /** Get the task status */
+ /**
+ * Gets the task attempt status.
+ * @return task attempt status.
+ */
public String getTaskStatus() {
return status.toString();
}
- /** Get the counters */
+ /** Gets the counters. */
Counters getCounters() { return counters; }
- /** Get the event type */
+ /** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
index d14350df54f..b4d9e410da2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the failure of a task
@@ -49,11 +50,12 @@ public class TaskFailedEvent implements HistoryEvent {
private String status;
private String error;
private Counters counters;
+ private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
- * Create an event to record task failure
+ * Create an event to record task failure.
* @param id Task ID
* @param finishTime Finish time of the task
* @param taskType Type of the task
@@ -61,10 +63,11 @@ public class TaskFailedEvent implements HistoryEvent {
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
+ * @param startTs task start time.
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
- TaskAttemptID failedDueToAttempt, Counters counters) {
+ TaskAttemptID failedDueToAttempt, Counters counters, long startTs) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
@@ -72,15 +75,23 @@ public class TaskFailedEvent implements HistoryEvent {
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
+ this.startTime = startTs;
+ }
+
+ public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType,
+ String error, String status, TaskAttemptID failedDueToAttempt,
+ Counters counters) {
+ this(id, finishTime, taskType, error, status, failedDueToAttempt, counters,
+ SystemClock.getInstance().getTime());
}
public TaskFailedEvent(TaskID id, long finishTime,
- TaskType taskType, String error, String status,
- TaskAttemptID failedDueToAttempt) {
- this(id, finishTime, taskType, error, status,
- failedDueToAttempt, EMPTY_COUNTERS);
+ TaskType taskType, String error, String status,
+ TaskAttemptID failedDueToAttempt) {
+ this(id, finishTime, taskType, error, status, failedDueToAttempt,
+ EMPTY_COUNTERS);
}
-
+
TaskFailedEvent() {}
public Object getDatum() {
@@ -118,27 +129,37 @@ public class TaskFailedEvent implements HistoryEvent {
EventReader.fromAvro(datum.getCounters());
}
- /** Get the task id */
+ /** Gets the task id. */
public TaskID getTaskId() { return id; }
- /** Get the error string */
+ /** Gets the error string. */
public String getError() { return error; }
- /** Get the finish time of the attempt */
+ /** Gets the finish time of the attempt. */
public long getFinishTime() {
return finishTime;
}
- /** Get the task type */
+ /**
+ * Gets the task start time to be reported to ATSv2.
+ * @return task start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the attempt id due to which the task failed */
+ /** Gets the attempt id due to which the task failed. */
public TaskAttemptID getFailedAttemptID() {
return failedDueToAttempt;
}
- /** Get the task status */
+ /**
+ * Gets the task status.
+ * @return task status
+ */
public String getTaskStatus() { return status; }
- /** Get task counters */
+ /** Gets task counters. */
public Counters getCounters() { return counters; }
- /** Get the event type */
+ /** Gets the event type. */
public EventType getEventType() {
return EventType.TASK_FAILED;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
index 0bc43832636..97557c7e0b4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the successful completion of a task
@@ -49,27 +50,36 @@ public class TaskFinishedEvent implements HistoryEvent {
private TaskType taskType;
private String status;
private Counters counters;
-
+ private long startTime;
+
/**
- * Create an event to record the successful completion of a task
+ * Create an event to record the successful completion of a task.
* @param id Task ID
* @param attemptId Task Attempt ID of the successful attempt for this task
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param status Status string
* @param counters Counters for the task
+ * @param startTs task start time
*/
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType,
- String status, Counters counters) {
+ String status, Counters counters, long startTs) {
this.taskid = id;
this.successfulAttemptId = attemptId;
this.finishTime = finishTime;
this.taskType = taskType;
this.status = status;
this.counters = counters;
+ this.startTime = startTs;
}
-
+
+ public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
+ TaskType taskType, String status, Counters counters) {
+ this(id, attemptId, finishTime, taskType, status, counters,
+ SystemClock.getInstance().getTime());
+ }
+
TaskFinishedEvent() {}
public Object getDatum() {
@@ -101,23 +111,33 @@ public class TaskFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
- /** Get task id */
+ /** Gets task id. */
public TaskID getTaskId() { return taskid; }
- /** Get successful task attempt id */
+ /** Gets successful task attempt id. */
public TaskAttemptID getSuccessfulTaskAttemptId() {
return successfulAttemptId;
}
- /** Get the task finish time */
+ /** Gets the task finish time. */
public long getFinishTime() { return finishTime; }
- /** Get task counters */
+ /**
+ * Gets the task start time to be reported to ATSv2.
+ * @return task start time
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets task counters. */
public Counters getCounters() { return counters; }
- /** Get task type */
+ /** Gets task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get task status */
+ /**
+ * Gets task status.
+ * @return task status
+ */
public String getTaskStatus() { return status.toString(); }
- /** Get event type */
+ /** Gets event type. */
public EventType getEventType() {
return EventType.TASK_FINISHED;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index cbca3c8cdb1..19313d3c8d5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -298,10 +298,10 @@ public class TestMRTimelineEventHandling {
" does not exist.",
jobEventFile.exists());
verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
- true, false, null);
+ true, false, null, false);
Set cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
"huge_dummy_conf1", "huge_dummy_conf2");
- verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
+ verifyEntity(jobEventFile, null, false, true, cfgsToCheck, false);
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir =
@@ -322,8 +322,8 @@ public class TestMRTimelineEventHandling {
"appEventFilePath: " + appEventFilePath +
" does not exist.",
appEventFile.exists());
- verifyEntity(appEventFile, null, true, false, null);
- verifyEntity(appEventFile, null, false, true, cfgsToCheck);
+ verifyEntity(appEventFile, null, true, false, null, false);
+ verifyEntity(appEventFile, null, false, true, cfgsToCheck, false);
// check for task event file
String outputDirTask =
@@ -344,7 +344,7 @@ public class TestMRTimelineEventHandling {
" does not exist.",
taskEventFile.exists());
verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
- true, false, null);
+ true, false, null, true);
// check for task attempt event file
String outputDirTaskAttempt =
@@ -363,7 +363,7 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" does not exist.", taskAttemptEventFile.exists());
verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
- true, false, null);
+ true, false, null, true);
}
/**
@@ -380,12 +380,13 @@ public class TestMRTimelineEventHandling {
* @throws IOException
*/
private void verifyEntity(File entityFile, String eventId,
- boolean chkMetrics, boolean chkCfg, Set cfgsToVerify)
- throws IOException {
+ boolean chkMetrics, boolean chkCfg, Set cfgsToVerify,
+ boolean checkIdPrefix) throws IOException {
BufferedReader reader = null;
String strLine;
try {
reader = new BufferedReader(new FileReader(entityFile));
+ long idPrefix = -1;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@@ -394,6 +395,19 @@ public class TestMRTimelineEventHandling {
strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity.class);
+
+ LOG.info("strLine.trim()= " + strLine.trim());
+ if (checkIdPrefix) {
+ Assert.assertTrue("Entity ID prefix expected to be > 0",
+ entity.getIdPrefix() > 0);
+ if (idPrefix == -1) {
+ idPrefix = entity.getIdPrefix();
+ } else {
+ Assert.assertEquals("Entity ID prefix should be same across " +
+ "each publish of same entity",
+ idPrefix, entity.getIdPrefix());
+ }
+ }
if (eventId == null) {
// Job metrics are published without any events for
// ApplicationEntity. There is also possibility that some other
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
index 447ea4e6b49..d553596b2fa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -54,7 +53,7 @@ class JobHistoryFileReplayMapperV1 extends
public void map(IntWritable key, IntWritable val, Context context) throws IOException {
// collect the apps it needs to process
- TimelineClient tlc = new TimelineClientImpl();
+ TimelineClient tlc = TimelineClient.createTimelineClient();
TimelineEntityConverterV1 converter = new TimelineEntityConverterV1();
JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context);
int replayMode = helper.getReplayMode();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
index 16d14a18c88..6d6151fa6a1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
/**
* Adds simple entities with random string payload, events, metrics, and
@@ -46,7 +45,7 @@ class SimpleEntityWriterV1
public void map(IntWritable key, IntWritable val, Context context)
throws IOException {
- TimelineClient tlc = new TimelineClientImpl();
+ TimelineClient tlc = TimelineClient.createTimelineClient();
Configuration conf = context.getConfiguration();
final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
diff --git a/hadoop-project/src/site/markdown/index.md.vm b/hadoop-project/src/site/markdown/index.md.vm
index 62e21b2edf9..bb7bda2c822 100644
--- a/hadoop-project/src/site/markdown/index.md.vm
+++ b/hadoop-project/src/site/markdown/index.md.vm
@@ -55,17 +55,15 @@ documentation.
YARN Timeline Service v.2
-------------------
-We are introducing an early preview (alpha 1) of a major revision of YARN
+We are introducing an early preview (alpha 2) of a major revision of YARN
Timeline Service: v.2. YARN Timeline Service v.2 addresses two major
challenges: improving scalability and reliability of Timeline Service, and
enhancing usability by introducing flows and aggregation.
-YARN Timeline Service v.2 alpha 1 is provided so that users and developers
+YARN Timeline Service v.2 alpha 2 is provided so that users and developers
can test it and provide feedback and suggestions for making it a ready
replacement for Timeline Service v.1.x. It should be used only in a test
-capacity. Most importantly, security is not enabled. Do not set up or use
-Timeline Service v.2 until security is implemented if security is a
-critical requirement.
+capacity.
More details are available in the
[YARN Timeline Service v.2](./hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index d3ca765f91e..9b254ae9072 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -92,21 +93,21 @@ public abstract class AllocateResponse {
.preemptionMessage(preempt).nmTokens(nmTokens).build();
}
- @Public
+ @Private
@Unstable
public static AllocateResponse newInstance(int responseId,
List completedContainers,
List allocatedContainers, List updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List nmTokens,
- List updatedContainers) {
+ CollectorInfo collectorInfo) {
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
- .updatedContainers(updatedContainers).build();
+ .collectorInfo(collectorInfo).build();
}
@Private
@@ -133,7 +134,7 @@ public abstract class AllocateResponse {
List allocatedContainers, List updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List nmTokens, Token amRMToken,
- List updatedContainers, String collectorAddr) {
+ List updatedContainers, CollectorInfo collectorInfo) {
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
@@ -141,7 +142,7 @@ public abstract class AllocateResponse {
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
.updatedContainers(updatedContainers).amRmToken(amRMToken)
- .collectorAddr(collectorAddr).build();
+ .collectorInfo(collectorInfo).build();
}
/**
@@ -333,17 +334,18 @@ public abstract class AllocateResponse {
public abstract void setApplicationPriority(Priority priority);
/**
- * The address of collector that belong to this app
+ * The data associated with the collector that belongs to this app. Contains
+ * address and token alongwith identification information.
*
- * @return The address of collector that belong to this attempt
+ * @return The data of collector that belong to this attempt
*/
@Public
@Unstable
- public abstract String getCollectorAddr();
+ public abstract CollectorInfo getCollectorInfo();
@Private
@Unstable
- public abstract void setCollectorAddr(String collectorAddr);
+ public abstract void setCollectorInfo(CollectorInfo info);
/**
* Get the list of container update errors to inform the
@@ -559,15 +561,17 @@ public abstract class AllocateResponse {
}
/**
- * Set the collectorAddr of the response.
- * @see AllocateResponse#setCollectorAddr(String)
- * @param collectorAddr collectorAddr of the response
+ * Set the collectorInfo of the response.
+ * @see AllocateResponse#setCollectorInfo(CollectorInfo)
+ * @param collectorInfo collectorInfo of the response which
+ * contains collector address, RM id, version and collector token.
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
- public AllocateResponseBuilder collectorAddr(String collectorAddr) {
- allocateResponse.setCollectorAddr(collectorAddr);
+ public AllocateResponseBuilder collectorInfo(
+ CollectorInfo collectorInfo) {
+ allocateResponse.setCollectorInfo(collectorInfo);
return this;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
similarity index 50%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
index 07e1d92898f..d22b9fb48db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
@@ -16,31 +16,44 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.api.records;
+package org.apache.hadoop.yarn.api.records;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records;
-
+/**
+ * Collector info containing collector address and collector token passed from
+ * RM to AM in Allocate Response.
+ */
@Private
-public abstract class AppCollectorsMap {
+@InterfaceStability.Unstable
+public abstract class CollectorInfo {
- public static AppCollectorsMap newInstance(
- ApplicationId id, String collectorAddr) {
- AppCollectorsMap appCollectorsMap =
- Records.newRecord(AppCollectorsMap.class);
- appCollectorsMap.setApplicationId(id);
- appCollectorsMap.setCollectorAddr(collectorAddr);
- return appCollectorsMap;
+ protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
+
+ public static CollectorInfo newInstance(String collectorAddr) {
+ return newInstance(collectorAddr, null);
}
- public abstract ApplicationId getApplicationId();
-
- public abstract void setApplicationId(ApplicationId id);
+ public static CollectorInfo newInstance(String collectorAddr, Token token) {
+ CollectorInfo amCollectorInfo =
+ Records.newRecord(CollectorInfo.class);
+ amCollectorInfo.setCollectorAddr(collectorAddr);
+ amCollectorInfo.setCollectorToken(token);
+ return amCollectorInfo;
+ }
public abstract String getCollectorAddr();
public abstract void setCollectorAddr(String addr);
+ /**
+ * Get delegation token for app collector which AM will use to publish
+ * entities.
+ * @return the delegation token for app collector.
+ */
+ public abstract Token getCollectorToken();
+
+ public abstract void setCollectorToken(Token token);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ApplicationEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ApplicationEntity.java
index 6075ec4e145..20226aa8e9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ApplicationEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ApplicationEntity.java
@@ -49,4 +49,32 @@ public class ApplicationEntity extends HierarchicalTimelineEntity {
public void setQueue(String queue) {
addInfo(QUEUE_INFO_KEY, queue);
}
+
+ /**
+ * Checks if the input TimelineEntity object is an ApplicationEntity.
+ *
+ * @param te TimelineEntity object.
+ * @return true if input is an ApplicationEntity, false otherwise
+ */
+ public static boolean isApplicationEntity(TimelineEntity te) {
+ return (te == null ? false
+ : te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()));
+ }
+
+ /**
+ * @param te TimelineEntity object.
+ * @param eventId event with this id needs to be fetched
+ * @return TimelineEvent if TimelineEntity contains the desired event.
+ */
+ public static TimelineEvent getApplicationEvent(TimelineEntity te,
+ String eventId) {
+ if (isApplicationEntity(te)) {
+ for (TimelineEvent event : te.getEvents()) {
+ if (event.getId().equals(eventId)) {
+ return event;
+ }
+ }
+ }
+ return null;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index e43c3acb6de..0af5ea47694 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -55,6 +55,7 @@ import com.fasterxml.jackson.annotation.JsonSetter;
@InterfaceStability.Unstable
public class TimelineEntity implements Comparable {
protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_";
+ public final static long DEFAULT_ENTITY_PREFIX = 0L;
/**
* Identifier of timeline entity(entity id + entity type).
@@ -146,6 +147,7 @@ public class TimelineEntity implements Comparable {
private HashMap> isRelatedToEntities = new HashMap<>();
private HashMap> relatesToEntities = new HashMap<>();
private Long createdTime;
+ private long idPrefix;
public TimelineEntity() {
identifier = new Identifier();
@@ -548,20 +550,10 @@ public class TimelineEntity implements Comparable {
public int compareTo(TimelineEntity other) {
int comparison = getType().compareTo(other.getType());
if (comparison == 0) {
- if (getCreatedTime() == null) {
- if (other.getCreatedTime() == null) {
- return getId().compareTo(other.getId());
- } else {
- return 1;
- }
- }
- if (other.getCreatedTime() == null) {
+ if (getIdPrefix() > other.getIdPrefix()) {
+ // Descending order by entity id prefix
return -1;
- }
- if (getCreatedTime() > other.getCreatedTime()) {
- // Order by created time desc
- return -1;
- } else if (getCreatedTime() < other.getCreatedTime()) {
+ } else if (getIdPrefix() < other.getIdPrefix()) {
return 1;
} else {
return getId().compareTo(other.getId());
@@ -582,4 +574,38 @@ public class TimelineEntity implements Comparable {
return real.toString();
}
}
+
+ @XmlElement(name = "idprefix")
+ public long getIdPrefix() {
+ if (real == null) {
+ return idPrefix;
+ } else {
+ return real.getIdPrefix();
+ }
+ }
+
+ /**
+ * Sets idPrefix for an entity.
+ *
+ * Note: Entities will be stored in the order of idPrefix specified.
+ * If users decide to set idPrefix for an entity, they MUST provide
+ * the same prefix for every update of this entity.
+ *
+ * Users can use {@link TimelineServiceHelper#invertLong(long)} to invert
+ * the prefix if necessary.
+ *
+ * @param entityIdPrefix prefix for an entity.
+ */
+ @JsonSetter("idprefix")
+ public void setIdPrefix(long entityIdPrefix) {
+ if (real == null) {
+ this.idPrefix = entityIdPrefix;
+ } else {
+ real.setIdPrefix(entityIdPrefix);
+ }
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 67dfeebafe6..49448217e9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1782,6 +1782,10 @@ public class YarnConfiguration extends Configuration {
YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONHISTORY_PROTOCOL =
"security.applicationhistory.protocol.acl";
+ public static final String
+ YARN_SECURITY_SERVICE_AUTHORIZATION_COLLECTOR_NODEMANAGER_PROTOCOL =
+ "security.collector-nodemanager.protocol.acl";
+
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */
public static final String NM_SLEEP_DELAY_BEFORE_SIGKILL_MS =
@@ -2099,7 +2103,7 @@ public class YarnConfiguration extends Configuration {
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir";
/**
- * Settings for timeline service v2.0
+ * Settings for timeline service v2.0.
*/
public static final String TIMELINE_SERVICE_WRITER_CLASS =
TIMELINE_SERVICE_PREFIX + "writer.class";
@@ -2112,9 +2116,20 @@ public class YarnConfiguration extends Configuration {
TIMELINE_SERVICE_PREFIX + "reader.class";
public static final String DEFAULT_TIMELINE_SERVICE_READER_CLASS =
- "org.apache.hadoop.yarn.server.timelineservice" +
- ".storage.HBaseTimelineReaderImpl";
+ "org.apache.hadoop.yarn.server.timelineservice.storage" +
+ ".HBaseTimelineReaderImpl";
+ /**
+ * default schema prefix for hbase tables.
+ */
+ public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX =
+ "prod.";
+
+ /**
+ * config param name to override schema prefix.
+ */
+ public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME =
+ TIMELINE_SERVICE_PREFIX + "hbase-schema.prefix";
/** The setting that controls how often the timeline collector flushes the
* timeline writer.
@@ -2134,6 +2149,58 @@ public class YarnConfiguration extends Configuration {
TIMELINE_SERVICE_PREFIX
+ "hbase.coprocessor.app-final-value-retention-milliseconds";
+ /**
+ * The name of the setting for the location of the coprocessor
+ * jar on hdfs.
+ */
+ public static final String FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION =
+ TIMELINE_SERVICE_PREFIX
+ + "hbase.coprocessor.jar.hdfs.location";
+
+ /** default hdfs location for flowrun coprocessor jar. */
+ public static final String DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR =
+ "/hbase/coprocessor/hadoop-yarn-server-timelineservice.jar";
+
+ /**
+ * The name for setting that points to an optional HBase configuration
+ * (hbase-site.xml file) with settings that will override the ones found on
+ * the classpath.
+ */
+ public static final String TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE =
+ TIMELINE_SERVICE_PREFIX
+ + "hbase.configuration.file";
+
+ /**
+ * The name for setting that enables or disables authentication checks
+ * for reading timeline service v2 data.
+ */
+ public static final String TIMELINE_SERVICE_READ_AUTH_ENABLED =
+ TIMELINE_SERVICE_PREFIX + "read.authentication.enabled";
+
+ /**
+ * The default setting for authentication checks for reading timeline
+ * service v2 data.
+ */
+ public static final Boolean DEFAULT_TIMELINE_SERVICE_READ_AUTH_ENABLED =
+ false;
+
+ /**
+ * The name for setting that lists the users and groups who are allowed
+ * to read timeline service v2 data. It is a comma separated list of
+ * user, followed by space, then comma separated list of groups.
+ * It will allow this list of users and groups to read the data
+ * and reject everyone else.
+ */
+ public static final String TIMELINE_SERVICE_READ_ALLOWED_USERS =
+ TIMELINE_SERVICE_PREFIX + "read.allowed.users";
+
+ /**
+ * The default value for list of the users who are allowed to read
+ * timeline service v2 data.
+ */
+ public static final String DEFAULT_TIMELINE_SERVICE_READ_ALLOWED_USERS =
+ "";
+
/**
* The setting that controls how long the final value of a metric of a
* completed app is retained before merging into the flow sum. Up to this time
@@ -2154,6 +2221,8 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10;
+ /** default version for any flow. */
+ public static final String DEFAULT_FLOW_VERSION = "1";
/**
* The time period for which timeline v2 client will wait for draining
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java
index e0268a67b8f..65ed18a7a1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java
@@ -46,4 +46,12 @@ public final class TimelineServiceHelper {
(HashMap) originalMap : new HashMap(originalMap);
}
+ /**
+ * Inverts the given key.
+ * @param key value to be inverted .
+ * @return inverted long
+ */
+ public static long invertLong(long key) {
+ return Long.MAX_VALUE - key;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 81ebd798bb9..c5f485fc3f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -613,3 +613,8 @@ message StringBytesMapProto {
optional string key = 1;
optional bytes value = 2;
}
+
+message CollectorInfoProto {
+ optional string collector_addr = 1;
+ optional hadoop.common.TokenProto collector_token = 2;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index b92c46e945e..7a7f03503ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -112,7 +112,7 @@ message AllocateResponseProto {
repeated NMTokenProto nm_tokens = 9;
optional hadoop.common.TokenProto am_rm_token = 12;
optional PriorityProto application_priority = 13;
- optional string collector_addr = 14;
+ optional CollectorInfoProto collector_info = 14;
repeated UpdateContainerErrorProto update_errors = 15;
repeated UpdatedContainerProto updated_containers = 16;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestApplicationEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestApplicationEntity.java
new file mode 100644
index 00000000000..c3f277718e7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestApplicationEntity.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ * Various tests for the ApplicationEntity class.
+ *
+ */
+public class TestApplicationEntity {
+
+ @Test
+ public void testIsApplicationEntity() {
+ TimelineEntity te = new TimelineEntity();
+ te.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ assertTrue(ApplicationEntity.isApplicationEntity(te));
+
+ te = null;
+ assertEquals(false, ApplicationEntity.isApplicationEntity(te));
+
+ te = new TimelineEntity();
+ te.setType(TimelineEntityType.YARN_CLUSTER.toString());
+ assertEquals(false, ApplicationEntity.isApplicationEntity(te));
+ }
+
+ @Test
+ public void testGetApplicationEvent() {
+ TimelineEntity te = null;
+ TimelineEvent tEvent = ApplicationEntity.getApplicationEvent(te,
+ "no event");
+ assertEquals(null, tEvent);
+
+ te = new TimelineEntity();
+ te.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ TimelineEvent event = new TimelineEvent();
+ event.setId("start_event");
+ event.setTimestamp(System.currentTimeMillis());
+ te.addEvent(event);
+ tEvent = ApplicationEntity.getApplicationEvent(te, "start_event");
+ assertEquals(event, tEvent);
+
+ te = new TimelineEntity();
+ te.setType(TimelineEntityType.YARN_CLUSTER.toString());
+ event = new TimelineEvent();
+ event.setId("start_event_cluster");
+ event.setTimestamp(System.currentTimeMillis());
+ te.addEvent(event);
+ tEvent = ApplicationEntity.getApplicationEvent(te, "start_event_cluster");
+ assertEquals(null, tEvent);
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index d97c6ebd885..bd7bf93566f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -66,6 +66,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPropsToSkipCompare
.add(YarnConfiguration
.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL);
+ configurationPropsToSkipCompare.add(YarnConfiguration
+ .YARN_SECURITY_SERVICE_AUTHORIZATION_COLLECTOR_NODEMANAGER_PROTOCOL);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
// Federation default configs to be ignored
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index ab4607aca6a..a02af709116 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -104,6 +104,8 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;
@@ -313,6 +315,17 @@ public class ApplicationMaster {
protected final Set launchedContainers =
Collections.newSetFromMap(new ConcurrentHashMap());
+ /**
+ * Container start times used to set id prefix while publishing entity
+ * to ATSv2.
+ */
+ private final ConcurrentMap containerStartTimes =
+ new ConcurrentHashMap();
+
+ private ConcurrentMap getContainerStartTimes() {
+ return containerStartTimes;
+ }
+
/**
* @param args Command line args
*/
@@ -866,7 +879,15 @@ public class ApplicationMaster {
+ containerStatus.getContainerId());
}
if (timelineServiceV2Enabled) {
- publishContainerEndEventOnTimelineServiceV2(containerStatus);
+ Long containerStartTime =
+ containerStartTimes.get(containerStatus.getContainerId());
+ if (containerStartTime == null) {
+ containerStartTime = SystemClock.getInstance().getTime();
+ containerStartTimes.put(containerStatus.getContainerId(),
+ containerStartTime);
+ }
+ publishContainerEndEventOnTimelineServiceV2(containerStatus,
+ containerStartTime);
} else if (timelineServiceV1Enabled) {
publishContainerEndEvent(timelineClient, containerStatus, domainId,
appSubmitterUgi);
@@ -994,8 +1015,10 @@ public class ApplicationMaster {
containerId, container.getNodeId());
}
if (applicationMaster.timelineServiceV2Enabled) {
- applicationMaster
- .publishContainerStartEventOnTimelineServiceV2(container);
+ long startTime = SystemClock.getInstance().getTime();
+ applicationMaster.getContainerStartTimes().put(containerId, startTime);
+ applicationMaster.publishContainerStartEventOnTimelineServiceV2(
+ container, startTime);
} else if (applicationMaster.timelineServiceV1Enabled) {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
@@ -1356,24 +1379,24 @@ public class ApplicationMaster {
}
private void publishContainerStartEventOnTimelineServiceV2(
- Container container) {
+ Container container, long startTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity =
new org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
- long ts = System.currentTimeMillis();
- entity.setCreatedTime(ts);
+ entity.setCreatedTime(startTime);
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
- event.setTimestamp(ts);
+ event.setTimestamp(startTime);
event.setId(DSEvent.DS_CONTAINER_START.toString());
event.addInfo("Node", container.getNodeId().toString());
event.addInfo("Resources", container.getResource().toString());
entity.addEvent(event);
+ entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction