From a7d8b4d775832ffbcc78e23335a7f50194f224b3 Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Thu, 5 Apr 2018 10:22:50 -0700 Subject: [PATCH] YARN-6936. [Atsv2] Retrospect storing entities into sub application table from client perspective. (Rohith Sharma K S via Haibo Chen) (cherry picked from commit f8b8bd53c4797d406bea5b1b0cdb179e209169cc) (cherry picked from commit 6658018410defe3d7e4ea2a465e68d1cfdde6ada) (cherry picked from commit 9cb378f5c762f3b26598d3ade448219e5801fa2f) --- .../timelineservice/SubApplicationEntity.java | 50 +++++++++++++++++++ .../yarn/client/api/TimelineV2Client.java | 47 ++++++++++++++--- .../client/api/impl/TimelineV2ClientImpl.java | 30 +++++++++-- ...TimelineReaderWebServicesHBaseStorage.java | 7 +-- .../TestHBaseTimelineStorageEntities.java | 3 +- .../storage/HBaseTimelineWriterImpl.java | 3 +- .../TimelineCollectorWebService.java | 19 +++++-- 7 files changed, 138 insertions(+), 21 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/SubApplicationEntity.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/SubApplicationEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/SubApplicationEntity.java new file mode 100644 index 00000000000..a83ef3d76e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/SubApplicationEntity.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This entity represents a user defined entities to be stored under sub + * application table. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class SubApplicationEntity extends HierarchicalTimelineEntity { + + public static final String YARN_APPLICATION_ID = "YARN_APPLICATION_ID"; + + public SubApplicationEntity(TimelineEntity entity) { + super(entity); + } + + /** + * Checks if the input TimelineEntity object is an SubApplicationEntity. + * + * @param te TimelineEntity object. + * @return true if input is an SubApplicationEntity, false otherwise + */ + public static boolean isSubApplicationEntity(TimelineEntity te) { + return (te != null && te instanceof SubApplicationEntity); + } + + public void setApplicationId(String appId) { + addInfo(YARN_APPLICATION_ID, appId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java index 423c059319c..e987b46ae26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java @@ -54,9 +54,10 @@ public abstract class TimelineV2Client extends CompositeService { /** *

- * Send the information of a number of conceptual entities to the timeline - * service v.2 collector. It is a blocking API. The method will not return - * until all the put entities have been persisted. + * Send the information of a number of conceptual entities within the scope + * of YARN application to the timeline service v.2 collector. It is a blocking + * API. The method will not return until all the put entities have been + * persisted. *

* * @param entities the collection of {@link TimelineEntity} @@ -69,9 +70,10 @@ public abstract class TimelineV2Client extends CompositeService { /** *

- * Send the information of a number of conceptual entities to the timeline - * service v.2 collector. It is an asynchronous API. The method will return - * once all the entities are received. + * Send the information of a number of conceptual entities within the scope + * of YARN application to the timeline service v.2 collector. It is an + * asynchronous API. The method will return once all the entities are + * received. *

* * @param entities the collection of {@link TimelineEntity} @@ -93,4 +95,37 @@ public abstract class TimelineV2Client extends CompositeService { * address and timeline delegation token. */ public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo); + + + /** + *

+ * Send the information of a number of conceptual entities within the scope of + * a sub-application to the timeline service v.2 collector. It is a blocking + * API. The method will not return until all the put entities have been + * persisted. + *

+ * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putSubAppEntities(TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

+ * Send the information of a number of conceptual entities within the scope of + * a sub-application to the timeline service v.2 collector. It is an + * asynchronous API. The method will return once all the entities are received + * . + *

+ * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putSubAppEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java index 09bfd5832d1..fb71a8460a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -69,6 +69,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client { private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; private TimelineEntityDispatcher entityDispatcher; + private TimelineEntityDispatcher subAppEntityDispatcher; private volatile String timelineServiceAddress; @VisibleForTesting volatile Token currentTimelineToken = null; @@ -124,6 +125,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client { YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); entityDispatcher = new TimelineEntityDispatcher(conf); + subAppEntityDispatcher = new TimelineEntityDispatcher(conf); super.serviceInit(conf); } @@ -131,24 +133,38 @@ public class TimelineV2ClientImpl extends TimelineV2Client { protected void serviceStart() throws Exception { super.serviceStart(); entityDispatcher.start(); + subAppEntityDispatcher.start(); } @Override protected void serviceStop() throws Exception { entityDispatcher.stop(); + subAppEntityDispatcher.stop(); super.serviceStop(); } @Override public void putEntities(TimelineEntity... entities) throws IOException, YarnException { - entityDispatcher.dispatchEntities(true, entities); + entityDispatcher.dispatchEntities(true, entities, false); } @Override public void putEntitiesAsync(TimelineEntity... entities) throws IOException, YarnException { - entityDispatcher.dispatchEntities(false, entities); + entityDispatcher.dispatchEntities(false, entities, false); + } + + @Override + public void putSubAppEntities(TimelineEntity... entities) + throws IOException, YarnException { + subAppEntityDispatcher.dispatchEntities(true, entities, true); + } + + @Override + public void putSubAppEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + subAppEntityDispatcher.dispatchEntities(false, entities, true); } @Override @@ -347,13 +363,15 @@ public class TimelineV2ClientImpl extends TimelineV2Client { private final TimelineEntities entities; private final boolean isSync; - EntitiesHolder(final TimelineEntities entities, final boolean isSync) { + EntitiesHolder(final TimelineEntities entities, final boolean isSync, + final boolean subappwrite) { super(new Callable() { // publishEntities() public Void call() throws Exception { MultivaluedMap params = new MultivaluedMapImpl(); params.add("appid", getContextAppId().toString()); params.add("async", Boolean.toString(!isSync)); + params.add("subappwrite", Boolean.toString(subappwrite)); putObjects("entities", params, entities); return null; } @@ -497,7 +515,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client { } public void dispatchEntities(boolean sync, - TimelineEntity[] entitiesTobePublished) throws YarnException { + TimelineEntity[] entitiesTobePublished, boolean subappwrite) + throws YarnException { if (executor.isShutdown()) { throw new YarnException("Timeline client is in the process of stopping," + " not accepting any more TimelineEntities"); @@ -510,7 +529,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client { } // created a holder and place it in queue - EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); + EntitiesHolder entitiesHolder = + new EntitiesHolder(entities, sync, subappwrite); try { timelineEntityQueue.put(entitiesHolder); } catch (InterruptedException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 680520c54c1..ffb232c52ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; @@ -265,7 +266,7 @@ public class TestTimelineReaderWebServicesHBaseStorage relatesTo1.put("type3", Sets.newHashSet("entity31", "entity35", "entity32", "entity33")); entity5.addRelatesToEntities(relatesTo1); - userEntities.addEntity(entity5); + userEntities.addEntity(new SubApplicationEntity(entity5)); TimelineEntity entity6 = new TimelineEntity(); entity6.setId("entity2"); @@ -324,7 +325,7 @@ public class TestTimelineReaderWebServicesHBaseStorage relatesTo2.put("type6", Sets.newHashSet("entity61", "entity66")); relatesTo2.put("type3", Sets.newHashSet("entity31")); entity6.addRelatesToEntities(relatesTo2); - userEntities.addEntity(entity6); + userEntities.addEntity(new SubApplicationEntity(entity6)); for (long i = 1; i <= 10; i++) { TimelineEntity userEntity = new TimelineEntity(); @@ -332,7 +333,7 @@ public class TestTimelineReaderWebServicesHBaseStorage userEntity.setId("entityid-" + i); userEntity.setIdPrefix(11 - i); userEntity.setCreatedTime(ts); - userEntities.addEntity(userEntity); + userEntities.addEntity(new SubApplicationEntity(userEntity)); } HBaseTimelineWriterImpl hbi = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java index 90a69595e55..6932a316cbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @@ -195,7 +196,7 @@ public class TestHBaseTimelineStorageEntities { m1.setValues(metricValues); metrics.add(m1); entity.addMetrics(metrics); - te.addEntity(entity); + te.addEntity(new SubApplicationEntity(entity)); HBaseTimelineWriterImpl hbi = null; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 027505b5601..29390c52475 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @@ -197,7 +198,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); } - if (!isApplication && !userId.equals(subApplicationUser)) { + if (!isApplication && SubApplicationEntity.isSubApplicationEntity(te)) { SubApplicationRowKey subApplicationRowKey = new SubApplicationRowKey(subApplicationUser, clusterId, te.getType(), te.getIdPrefix(), te.getId(), userId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 369dc0802e4..e50e4facdef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; @@ -140,6 +141,7 @@ public class TimelineCollectorWebService { @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("async") String async, + @QueryParam("subappwrite") String isSubAppEntities, @QueryParam("appid") String appId, TimelineEntities entities) { init(res); @@ -166,10 +168,11 @@ public class TimelineCollectorWebService { boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); if (isAsync) { - collector.putEntitiesAsync( - processTimelineEntities(entities), callerUgi); + collector.putEntitiesAsync(processTimelineEntities(entities, appId, + Boolean.valueOf(isSubAppEntities)), callerUgi); } else { - collector.putEntities(processTimelineEntities(entities), callerUgi); + collector.putEntities(processTimelineEntities(entities, appId, + Boolean.valueOf(isSubAppEntities)), callerUgi); } return Response.ok().build(); @@ -210,7 +213,7 @@ public class TimelineCollectorWebService { // but let's keep it for now in case we need to use sub-classes APIs in the // future (e.g., aggregation). private static TimelineEntities processTimelineEntities( - TimelineEntities entities) { + TimelineEntities entities, String appId, boolean isSubAppWrite) { TimelineEntities entitiesToReturn = new TimelineEntities(); for (TimelineEntity entity : entities.getEntities()) { TimelineEntityType type = null; @@ -246,7 +249,13 @@ public class TimelineCollectorWebService { break; } } else { - entitiesToReturn.addEntity(entity); + if (isSubAppWrite) { + SubApplicationEntity se = new SubApplicationEntity(entity); + se.setApplicationId(appId); + entitiesToReturn.addEntity(se); + } else { + entitiesToReturn.addEntity(entity); + } } } return entitiesToReturn;