YARN-6936. [Atsv2] Retrospect storing entities into sub application table from client perspective. (Rohith Sharma K S via Haibo Chen)

(cherry picked from commit f8b8bd53c4)
This commit is contained in:
Haibo Chen 2018-04-05 10:22:50 -07:00
parent b11c228660
commit 6658018410
7 changed files with 138 additions and 21 deletions

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents a user defined entities to be stored under sub
* application table.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class SubApplicationEntity extends HierarchicalTimelineEntity {
public static final String YARN_APPLICATION_ID = "YARN_APPLICATION_ID";
public SubApplicationEntity(TimelineEntity entity) {
super(entity);
}
/**
* Checks if the input TimelineEntity object is an SubApplicationEntity.
*
* @param te TimelineEntity object.
* @return true if input is an SubApplicationEntity, false otherwise
*/
public static boolean isSubApplicationEntity(TimelineEntity te) {
return (te != null && te instanceof SubApplicationEntity);
}
public void setApplicationId(String appId) {
addInfo(YARN_APPLICATION_ID, appId);
}
}

View File

@ -54,9 +54,10 @@ public abstract class TimelineV2Client extends CompositeService {
/** /**
* <p> * <p>
* Send the information of a number of conceptual entities to the timeline * Send the information of a number of conceptual entities within the scope
* service v.2 collector. It is a blocking API. The method will not return * of YARN application to the timeline service v.2 collector. It is a blocking
* until all the put entities have been persisted. * API. The method will not return until all the put entities have been
* persisted.
* </p> * </p>
* *
* @param entities the collection of {@link TimelineEntity} * @param entities the collection of {@link TimelineEntity}
@ -69,9 +70,10 @@ public abstract class TimelineV2Client extends CompositeService {
/** /**
* <p> * <p>
* Send the information of a number of conceptual entities to the timeline * Send the information of a number of conceptual entities within the scope
* service v.2 collector. It is an asynchronous API. The method will return * of YARN application to the timeline service v.2 collector. It is an
* once all the entities are received. * asynchronous API. The method will return once all the entities are
* received.
* </p> * </p>
* *
* @param entities the collection of {@link TimelineEntity} * @param entities the collection of {@link TimelineEntity}
@ -93,4 +95,37 @@ public abstract class TimelineV2Client extends CompositeService {
* address and timeline delegation token. * address and timeline delegation token.
*/ */
public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo); public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo);
/**
* <p>
* Send the information of a number of conceptual entities within the scope of
* a sub-application to the timeline service v.2 collector. It is a blocking
* API. The method will not return until all the put entities have been
* persisted.
* </p>
*
* @param entities the collection of {@link TimelineEntity}
* @throws IOException if there are I/O errors
* @throws YarnException if entities are incomplete/invalid
*/
@Public
public abstract void putSubAppEntities(TimelineEntity... entities)
throws IOException, YarnException;
/**
* <p>
* Send the information of a number of conceptual entities within the scope of
* a sub-application to the timeline service v.2 collector. It is an
* asynchronous API. The method will return once all the entities are received
* .
* </p>
*
* @param entities the collection of {@link TimelineEntity}
* @throws IOException if there are I/O errors
* @throws YarnException if entities are incomplete/invalid
*/
@Public
public abstract void putSubAppEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException;
} }

View File

@ -69,6 +69,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private TimelineEntityDispatcher entityDispatcher; private TimelineEntityDispatcher entityDispatcher;
private TimelineEntityDispatcher subAppEntityDispatcher;
private volatile String timelineServiceAddress; private volatile String timelineServiceAddress;
@VisibleForTesting @VisibleForTesting
volatile Token currentTimelineToken = null; volatile Token currentTimelineToken = null;
@ -124,6 +125,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
entityDispatcher = new TimelineEntityDispatcher(conf); entityDispatcher = new TimelineEntityDispatcher(conf);
subAppEntityDispatcher = new TimelineEntityDispatcher(conf);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -131,24 +133,38 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
super.serviceStart(); super.serviceStart();
entityDispatcher.start(); entityDispatcher.start();
subAppEntityDispatcher.start();
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
entityDispatcher.stop(); entityDispatcher.stop();
subAppEntityDispatcher.stop();
super.serviceStop(); super.serviceStop();
} }
@Override @Override
public void putEntities(TimelineEntity... entities) public void putEntities(TimelineEntity... entities)
throws IOException, YarnException { throws IOException, YarnException {
entityDispatcher.dispatchEntities(true, entities); entityDispatcher.dispatchEntities(true, entities, false);
} }
@Override @Override
public void putEntitiesAsync(TimelineEntity... entities) public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException { throws IOException, YarnException {
entityDispatcher.dispatchEntities(false, entities); entityDispatcher.dispatchEntities(false, entities, false);
}
@Override
public void putSubAppEntities(TimelineEntity... entities)
throws IOException, YarnException {
subAppEntityDispatcher.dispatchEntities(true, entities, true);
}
@Override
public void putSubAppEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
subAppEntityDispatcher.dispatchEntities(false, entities, true);
} }
@Override @Override
@ -346,13 +362,15 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
private final TimelineEntities entities; private final TimelineEntities entities;
private final boolean isSync; private final boolean isSync;
EntitiesHolder(final TimelineEntities entities, final boolean isSync) { EntitiesHolder(final TimelineEntities entities, final boolean isSync,
final boolean subappwrite) {
super(new Callable<Void>() { super(new Callable<Void>() {
// publishEntities() // publishEntities()
public Void call() throws Exception { public Void call() throws Exception {
MultivaluedMap<String, String> params = new MultivaluedMapImpl(); MultivaluedMap<String, String> params = new MultivaluedMapImpl();
params.add("appid", getContextAppId().toString()); params.add("appid", getContextAppId().toString());
params.add("async", Boolean.toString(!isSync)); params.add("async", Boolean.toString(!isSync));
params.add("subappwrite", Boolean.toString(subappwrite));
putObjects("entities", params, entities); putObjects("entities", params, entities);
return null; return null;
} }
@ -496,7 +514,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
} }
public void dispatchEntities(boolean sync, public void dispatchEntities(boolean sync,
TimelineEntity[] entitiesTobePublished) throws YarnException { TimelineEntity[] entitiesTobePublished, boolean subappwrite)
throws YarnException {
if (executor.isShutdown()) { if (executor.isShutdown()) {
throw new YarnException("Timeline client is in the process of stopping," throw new YarnException("Timeline client is in the process of stopping,"
+ " not accepting any more TimelineEntities"); + " not accepting any more TimelineEntities");
@ -509,7 +528,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
} }
// created a holder and place it in queue // created a holder and place it in queue
EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); EntitiesHolder entitiesHolder =
new EntitiesHolder(entities, sync, subappwrite);
try { try {
timelineEntityQueue.put(entitiesHolder); timelineEntityQueue.put(entitiesHolder);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@ -265,7 +266,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
relatesTo1.put("type3", relatesTo1.put("type3",
Sets.newHashSet("entity31", "entity35", "entity32", "entity33")); Sets.newHashSet("entity31", "entity35", "entity32", "entity33"));
entity5.addRelatesToEntities(relatesTo1); entity5.addRelatesToEntities(relatesTo1);
userEntities.addEntity(entity5); userEntities.addEntity(new SubApplicationEntity(entity5));
TimelineEntity entity6 = new TimelineEntity(); TimelineEntity entity6 = new TimelineEntity();
entity6.setId("entity2"); entity6.setId("entity2");
@ -324,7 +325,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
relatesTo2.put("type6", Sets.newHashSet("entity61", "entity66")); relatesTo2.put("type6", Sets.newHashSet("entity61", "entity66"));
relatesTo2.put("type3", Sets.newHashSet("entity31")); relatesTo2.put("type3", Sets.newHashSet("entity31"));
entity6.addRelatesToEntities(relatesTo2); entity6.addRelatesToEntities(relatesTo2);
userEntities.addEntity(entity6); userEntities.addEntity(new SubApplicationEntity(entity6));
for (long i = 1; i <= 10; i++) { for (long i = 1; i <= 10; i++) {
TimelineEntity userEntity = new TimelineEntity(); TimelineEntity userEntity = new TimelineEntity();
@ -332,7 +333,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
userEntity.setId("entityid-" + i); userEntity.setId("entityid-" + i);
userEntity.setIdPrefix(11 - i); userEntity.setIdPrefix(11 - i);
userEntity.setCreatedTime(ts); userEntity.setCreatedTime(ts);
userEntities.addEntity(userEntity); userEntities.addEntity(new SubApplicationEntity(userEntity));
} }
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@ -195,7 +196,7 @@ public class TestHBaseTimelineStorageEntities {
m1.setValues(metricValues); m1.setValues(metricValues);
metrics.add(m1); metrics.add(m1);
entity.addMetrics(metrics); entity.addMetrics(metrics);
te.addEntity(entity); te.addEntity(new SubApplicationEntity(entity));
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
try { try {

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@ -197,7 +198,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
} }
if (!isApplication && !userId.equals(subApplicationUser)) { if (!isApplication && SubApplicationEntity.isSubApplicationEntity(te)) {
SubApplicationRowKey subApplicationRowKey = SubApplicationRowKey subApplicationRowKey =
new SubApplicationRowKey(subApplicationUser, clusterId, new SubApplicationRowKey(subApplicationUser, clusterId,
te.getType(), te.getIdPrefix(), te.getId(), userId); te.getType(), te.getIdPrefix(), te.getId(), userId);

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@ -142,6 +143,7 @@ public class TimelineCollectorWebService {
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@QueryParam("async") String async, @QueryParam("async") String async,
@QueryParam("subappwrite") String isSubAppEntities,
@QueryParam("appid") String appId, @QueryParam("appid") String appId,
TimelineEntities entities) { TimelineEntities entities) {
init(res); init(res);
@ -168,10 +170,11 @@ public class TimelineCollectorWebService {
boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
if (isAsync) { if (isAsync) {
collector.putEntitiesAsync( collector.putEntitiesAsync(processTimelineEntities(entities, appId,
processTimelineEntities(entities), callerUgi); Boolean.valueOf(isSubAppEntities)), callerUgi);
} else { } else {
collector.putEntities(processTimelineEntities(entities), callerUgi); collector.putEntities(processTimelineEntities(entities, appId,
Boolean.valueOf(isSubAppEntities)), callerUgi);
} }
return Response.ok().build(); return Response.ok().build();
@ -212,7 +215,7 @@ public class TimelineCollectorWebService {
// but let's keep it for now in case we need to use sub-classes APIs in the // but let's keep it for now in case we need to use sub-classes APIs in the
// future (e.g., aggregation). // future (e.g., aggregation).
private static TimelineEntities processTimelineEntities( private static TimelineEntities processTimelineEntities(
TimelineEntities entities) { TimelineEntities entities, String appId, boolean isSubAppWrite) {
TimelineEntities entitiesToReturn = new TimelineEntities(); TimelineEntities entitiesToReturn = new TimelineEntities();
for (TimelineEntity entity : entities.getEntities()) { for (TimelineEntity entity : entities.getEntities()) {
TimelineEntityType type = null; TimelineEntityType type = null;
@ -247,10 +250,16 @@ public class TimelineCollectorWebService {
default: default:
break; break;
} }
} else {
if (isSubAppWrite) {
SubApplicationEntity se = new SubApplicationEntity(entity);
se.setApplicationId(appId);
entitiesToReturn.addEntity(se);
} else { } else {
entitiesToReturn.addEntity(entity); entitiesToReturn.addEntity(entity);
} }
} }
}
return entitiesToReturn; return entitiesToReturn;
} }
} }