diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/SubApplicationEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/SubApplicationEntity.java
new file mode 100644
index 00000000000..a83ef3d76e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/SubApplicationEntity.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This entity represents a user defined entities to be stored under sub
+ * application table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class SubApplicationEntity extends HierarchicalTimelineEntity {
+
+ public static final String YARN_APPLICATION_ID = "YARN_APPLICATION_ID";
+
+ public SubApplicationEntity(TimelineEntity entity) {
+ super(entity);
+ }
+
+ /**
+ * Checks if the input TimelineEntity object is an SubApplicationEntity.
+ *
+ * @param te TimelineEntity object.
+ * @return true if input is an SubApplicationEntity, false otherwise
+ */
+ public static boolean isSubApplicationEntity(TimelineEntity te) {
+ return (te != null && te instanceof SubApplicationEntity);
+ }
+
+ public void setApplicationId(String appId) {
+ addInfo(YARN_APPLICATION_ID, appId);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
index 423c059319c..e987b46ae26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
@@ -54,9 +54,10 @@ public abstract class TimelineV2Client extends CompositeService {
/**
*
- * Send the information of a number of conceptual entities to the timeline
- * service v.2 collector. It is a blocking API. The method will not return
- * until all the put entities have been persisted.
+ * Send the information of a number of conceptual entities within the scope
+ * of YARN application to the timeline service v.2 collector. It is a blocking
+ * API. The method will not return until all the put entities have been
+ * persisted.
*
*
* @param entities the collection of {@link TimelineEntity}
@@ -69,9 +70,10 @@ public abstract class TimelineV2Client extends CompositeService {
/**
*
- * Send the information of a number of conceptual entities to the timeline
- * service v.2 collector. It is an asynchronous API. The method will return
- * once all the entities are received.
+ * Send the information of a number of conceptual entities within the scope
+ * of YARN application to the timeline service v.2 collector. It is an
+ * asynchronous API. The method will return once all the entities are
+ * received.
*
*
* @param entities the collection of {@link TimelineEntity}
@@ -93,4 +95,37 @@ public abstract class TimelineV2Client extends CompositeService {
* address and timeline delegation token.
*/
public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo);
+
+
+ /**
+ *
+ * Send the information of a number of conceptual entities within the scope of
+ * a sub-application to the timeline service v.2 collector. It is a blocking
+ * API. The method will not return until all the put entities have been
+ * persisted.
+ *
+ *
+ * @param entities the collection of {@link TimelineEntity}
+ * @throws IOException if there are I/O errors
+ * @throws YarnException if entities are incomplete/invalid
+ */
+ @Public
+ public abstract void putSubAppEntities(TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ *
+ * Send the information of a number of conceptual entities within the scope of
+ * a sub-application to the timeline service v.2 collector. It is an
+ * asynchronous API. The method will return once all the entities are received
+ * .
+ *
+ *
+ * @param entities the collection of {@link TimelineEntity}
+ * @throws IOException if there are I/O errors
+ * @throws YarnException if entities are incomplete/invalid
+ */
+ @Public
+ public abstract void putSubAppEntitiesAsync(TimelineEntity... entities)
+ throws IOException, YarnException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
index 09bfd5832d1..fb71a8460a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -69,6 +69,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private TimelineEntityDispatcher entityDispatcher;
+ private TimelineEntityDispatcher subAppEntityDispatcher;
private volatile String timelineServiceAddress;
@VisibleForTesting
volatile Token currentTimelineToken = null;
@@ -124,6 +125,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
entityDispatcher = new TimelineEntityDispatcher(conf);
+ subAppEntityDispatcher = new TimelineEntityDispatcher(conf);
super.serviceInit(conf);
}
@@ -131,24 +133,38 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
protected void serviceStart() throws Exception {
super.serviceStart();
entityDispatcher.start();
+ subAppEntityDispatcher.start();
}
@Override
protected void serviceStop() throws Exception {
entityDispatcher.stop();
+ subAppEntityDispatcher.stop();
super.serviceStop();
}
@Override
public void putEntities(TimelineEntity... entities)
throws IOException, YarnException {
- entityDispatcher.dispatchEntities(true, entities);
+ entityDispatcher.dispatchEntities(true, entities, false);
}
@Override
public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
- entityDispatcher.dispatchEntities(false, entities);
+ entityDispatcher.dispatchEntities(false, entities, false);
+ }
+
+ @Override
+ public void putSubAppEntities(TimelineEntity... entities)
+ throws IOException, YarnException {
+ subAppEntityDispatcher.dispatchEntities(true, entities, true);
+ }
+
+ @Override
+ public void putSubAppEntitiesAsync(TimelineEntity... entities)
+ throws IOException, YarnException {
+ subAppEntityDispatcher.dispatchEntities(false, entities, true);
}
@Override
@@ -347,13 +363,15 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
private final TimelineEntities entities;
private final boolean isSync;
- EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
+ EntitiesHolder(final TimelineEntities entities, final boolean isSync,
+ final boolean subappwrite) {
super(new Callable() {
// publishEntities()
public Void call() throws Exception {
MultivaluedMap params = new MultivaluedMapImpl();
params.add("appid", getContextAppId().toString());
params.add("async", Boolean.toString(!isSync));
+ params.add("subappwrite", Boolean.toString(subappwrite));
putObjects("entities", params, entities);
return null;
}
@@ -497,7 +515,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
}
public void dispatchEntities(boolean sync,
- TimelineEntity[] entitiesTobePublished) throws YarnException {
+ TimelineEntity[] entitiesTobePublished, boolean subappwrite)
+ throws YarnException {
if (executor.isShutdown()) {
throw new YarnException("Timeline client is in the process of stopping,"
+ " not accepting any more TimelineEntities");
@@ -510,7 +529,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
}
// created a holder and place it in queue
- EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+ EntitiesHolder entitiesHolder =
+ new EntitiesHolder(entities, sync, subappwrite);
try {
timelineEntityQueue.put(entitiesHolder);
} catch (InterruptedException e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 680520c54c1..ffb232c52ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@@ -265,7 +266,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
relatesTo1.put("type3",
Sets.newHashSet("entity31", "entity35", "entity32", "entity33"));
entity5.addRelatesToEntities(relatesTo1);
- userEntities.addEntity(entity5);
+ userEntities.addEntity(new SubApplicationEntity(entity5));
TimelineEntity entity6 = new TimelineEntity();
entity6.setId("entity2");
@@ -324,7 +325,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
relatesTo2.put("type6", Sets.newHashSet("entity61", "entity66"));
relatesTo2.put("type3", Sets.newHashSet("entity31"));
entity6.addRelatesToEntities(relatesTo2);
- userEntities.addEntity(entity6);
+ userEntities.addEntity(new SubApplicationEntity(entity6));
for (long i = 1; i <= 10; i++) {
TimelineEntity userEntity = new TimelineEntity();
@@ -332,7 +333,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
userEntity.setId("entityid-" + i);
userEntity.setIdPrefix(11 - i);
userEntity.setCreatedTime(ts);
- userEntities.addEntity(userEntity);
+ userEntities.addEntity(new SubApplicationEntity(userEntity));
}
HBaseTimelineWriterImpl hbi = null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java
index 90a69595e55..6932a316cbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@@ -195,7 +196,7 @@ public class TestHBaseTimelineStorageEntities {
m1.setValues(metricValues);
metrics.add(m1);
entity.addMetrics(metrics);
- te.addEntity(entity);
+ te.addEntity(new SubApplicationEntity(entity));
HBaseTimelineWriterImpl hbi = null;
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 027505b5601..29390c52475 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@@ -197,7 +198,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
}
- if (!isApplication && !userId.equals(subApplicationUser)) {
+ if (!isApplication && SubApplicationEntity.isSubApplicationEntity(te)) {
SubApplicationRowKey subApplicationRowKey =
new SubApplicationRowKey(subApplicationUser, clusterId,
te.getType(), te.getIdPrefix(), te.getId(), userId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index 369dc0802e4..e50e4facdef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@@ -140,6 +141,7 @@ public class TimelineCollectorWebService {
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@QueryParam("async") String async,
+ @QueryParam("subappwrite") String isSubAppEntities,
@QueryParam("appid") String appId,
TimelineEntities entities) {
init(res);
@@ -166,10 +168,11 @@ public class TimelineCollectorWebService {
boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
if (isAsync) {
- collector.putEntitiesAsync(
- processTimelineEntities(entities), callerUgi);
+ collector.putEntitiesAsync(processTimelineEntities(entities, appId,
+ Boolean.valueOf(isSubAppEntities)), callerUgi);
} else {
- collector.putEntities(processTimelineEntities(entities), callerUgi);
+ collector.putEntities(processTimelineEntities(entities, appId,
+ Boolean.valueOf(isSubAppEntities)), callerUgi);
}
return Response.ok().build();
@@ -210,7 +213,7 @@ public class TimelineCollectorWebService {
// but let's keep it for now in case we need to use sub-classes APIs in the
// future (e.g., aggregation).
private static TimelineEntities processTimelineEntities(
- TimelineEntities entities) {
+ TimelineEntities entities, String appId, boolean isSubAppWrite) {
TimelineEntities entitiesToReturn = new TimelineEntities();
for (TimelineEntity entity : entities.getEntities()) {
TimelineEntityType type = null;
@@ -246,7 +249,13 @@ public class TimelineCollectorWebService {
break;
}
} else {
- entitiesToReturn.addEntity(entity);
+ if (isSubAppWrite) {
+ SubApplicationEntity se = new SubApplicationEntity(entity);
+ se.setApplicationId(appId);
+ entitiesToReturn.addEntity(se);
+ } else {
+ entitiesToReturn.addEntity(entity);
+ }
}
}
return entitiesToReturn;