YARN-5739. Provide timeline reader API to list available timeline entity types for one application. Contributed by Li Lu.
(cherry picked from commit 3b2e80881eaeaa82667496d706496765ed7e29f5)
This commit is contained in:
parent
2fa2c3169e
commit
4f595889e7
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
|||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
|
||||
final class DataGeneratorForTest {
|
||||
static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
|
@ -358,6 +359,46 @@ final class DataGeneratorForTest {
|
|||
relatesTo3.put("container2", relatesToSet14);
|
||||
entity2.setRelatesToEntities(relatesTo3);
|
||||
te.addEntity(entity2);
|
||||
|
||||
// For listing types
|
||||
for (int i = 0; i < 10; i++) {
|
||||
TimelineEntity entity3 = new TimelineEntity();
|
||||
String id3 = "typeTest" + i;
|
||||
entity3.setId(id3);
|
||||
StringBuilder typeName = new StringBuilder("newType");
|
||||
for (int j = 0; j < (i % 3); j++) {
|
||||
typeName.append(" ").append(j);
|
||||
}
|
||||
entity3.setType(typeName.toString());
|
||||
entity3.setCreatedTime(cTime + 80L + i);
|
||||
te.addEntity(entity3);
|
||||
}
|
||||
|
||||
// Create app entity for app to flow table
|
||||
TimelineEntities appTe1 = new TimelineEntities();
|
||||
TimelineEntity entityApp1 = new TimelineEntity();
|
||||
String appName1 = "application_1231111111_1111";
|
||||
entityApp1.setId(appName1);
|
||||
entityApp1.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
entityApp1.setCreatedTime(cTime + 40L);
|
||||
TimelineEvent appCreationEvent1 = new TimelineEvent();
|
||||
appCreationEvent1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
appCreationEvent1.setTimestamp(cTime);
|
||||
entityApp1.addEvent(appCreationEvent1);
|
||||
appTe1.addEntity(entityApp1);
|
||||
|
||||
TimelineEntities appTe2 = new TimelineEntities();
|
||||
TimelineEntity entityApp2 = new TimelineEntity();
|
||||
String appName2 = "application_1231111111_1112";
|
||||
entityApp2.setId(appName2);
|
||||
entityApp2.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
entityApp2.setCreatedTime(cTime + 50L);
|
||||
TimelineEvent appCreationEvent2 = new TimelineEvent();
|
||||
appCreationEvent2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
appCreationEvent2.setTimestamp(cTime);
|
||||
entityApp2.addEvent(appCreationEvent2);
|
||||
appTe2.addEntity(entityApp2);
|
||||
|
||||
HBaseTimelineWriterImpl hbi = null;
|
||||
try {
|
||||
hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
|
||||
|
@ -368,8 +409,10 @@ final class DataGeneratorForTest {
|
|||
String flow = "some_flow_name";
|
||||
String flowVersion = "AB7822C10F1111";
|
||||
long runid = 1002345678919L;
|
||||
String appName = "application_1231111111_1111";
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, appName1, te);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, appName2, te);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2);
|
||||
hbi.stop();
|
||||
} finally {
|
||||
if (hbi != null) {
|
||||
|
|
|
@ -1668,6 +1668,29 @@ public class TestHBaseTimelineStorageEntities {
|
|||
assertEquals(3, entities.size());
|
||||
}
|
||||
|
||||
@Test(timeout = 90000)
|
||||
public void testListTypesInApp() throws Exception {
|
||||
Set<String> types = reader.getEntityTypes(
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, "application_1231111111_1111", null, null));
|
||||
assertEquals(4, types.size());
|
||||
|
||||
types = reader.getEntityTypes(
|
||||
new TimelineReaderContext("cluster1", null, null,
|
||||
null, "application_1231111111_1111", null, null));
|
||||
assertEquals(4, types.size());
|
||||
|
||||
types = reader.getEntityTypes(
|
||||
new TimelineReaderContext("cluster1", null, null,
|
||||
null, "application_1231111111_1112", null, null));
|
||||
assertEquals(4, types.size());
|
||||
|
||||
types = reader.getEntityTypes(
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, "application_1231111111_1113", null, null));
|
||||
assertEquals(0, types.size());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
util.shutdownMiniCluster();
|
||||
|
|
|
@ -176,4 +176,24 @@ public class TimelineReaderManager extends AbstractService {
|
|||
}
|
||||
return entity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a list of available timeline entity types for an application. This can
|
||||
* be done by making a call to the backend storage implementation. The meaning
|
||||
* of each argument in detail is the same as {@link TimelineReader#getEntity}.
|
||||
* If cluster ID has not been supplied by the client, fills the cluster id
|
||||
* from config before making a call to backend storage.
|
||||
*
|
||||
* @param context Timeline context within the scope of which entity types
|
||||
* have to be fetched. Entity type field of this context should
|
||||
* be null.
|
||||
* @return A set which contains available timeline entity types, represented
|
||||
* as strings if found, empty otherwise.
|
||||
* @throws IOException if any problem occurs while getting entity types.
|
||||
*/
|
||||
public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||
throws IOException{
|
||||
context.setClusterId(getClusterID(context.getClusterId(), getConfig()));
|
||||
return reader.getEntityTypes(new TimelineReaderContext(context));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2859,4 +2859,106 @@ public class TimelineReaderWebServices {
|
|||
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
|
||||
metricsLimit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a set of available entity types for a given app id. Cluster ID is
|
||||
* not provided by client so default cluster ID has to be taken. If userid,
|
||||
* flow name and flow run id which are optional query parameters are not
|
||||
* specified, they will be queried based on app id and cluster id from the
|
||||
* flow context information stored in underlying storage implementation.
|
||||
*
|
||||
* @param req Servlet request.
|
||||
* @param res Servlet response.
|
||||
* @param appId Application id to be queried(Mandatory path param).
|
||||
* @param flowName Flow name which should match for the app(Optional query
|
||||
* param).
|
||||
* @param flowRunId Run id which should match for the app(Optional query
|
||||
* param).
|
||||
* @param userId User id which should match for the app(Optional query param).
|
||||
*
|
||||
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
||||
* list contains all timeline entity types is returned.<br>
|
||||
* On failures,<br>
|
||||
* If any problem occurs in parsing request, HTTP 400(Bad Request) is
|
||||
* returned.<br>
|
||||
* If flow context information cannot be retrieved or app for the given
|
||||
* app id cannot be found, HTTP 404(Not Found) is returned.<br>
|
||||
* For all other errors while retrieving data, HTTP 500(Internal Server
|
||||
* Error) is returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/apps/{appid}/entity-types")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<String> getEntityTypes(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("appid") String appId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId) {
|
||||
return getEntityTypes(req, res, null, appId, flowName, flowRunId, userId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a set of available entity types for a given app id. If userid,
|
||||
* flow name and flow run id which are optional query parameters are not
|
||||
* specified, they will be queried based on app id and cluster id from the
|
||||
* flow context information stored in underlying storage implementation.
|
||||
*
|
||||
* @param req Servlet request.
|
||||
* @param res Servlet response.
|
||||
* @param clusterId Cluster id to which the app to be queried belong to(
|
||||
* Mandatory path param).
|
||||
* @param appId Application id to be queried(Mandatory path param).
|
||||
* @param flowName Flow name which should match for the app(Optional query
|
||||
* param).
|
||||
* @param flowRunId Run id which should match for the app(Optional query
|
||||
* param).
|
||||
* @param userId User id which should match for the app(Optional query param).
|
||||
*
|
||||
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
||||
* list contains all timeline entity types is returned.<br>
|
||||
* On failures,<br>
|
||||
* If any problem occurs in parsing request, HTTP 400(Bad Request) is
|
||||
* returned.<br>
|
||||
* If flow context information cannot be retrieved or app for the given
|
||||
* app id cannot be found, HTTP 404(Not Found) is returned.<br>
|
||||
* For all other errors while retrieving data, HTTP 500(Internal Server
|
||||
* Error) is returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/clusters/{clusterid}/apps/{appid}/entity-types")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<String> getEntityTypes(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("appid") String appId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId) {
|
||||
String url = req.getRequestURI() +
|
||||
(req.getQueryString() == null ? "" :
|
||||
QUERY_STRING_SEP + req.getQueryString());
|
||||
UserGroupInformation callerUGI =
|
||||
TimelineReaderWebServicesUtils.getUser(req);
|
||||
LOG.info("Received URL " + url + " from user " +
|
||||
TimelineReaderWebServicesUtils.getUserName(callerUGI));
|
||||
long startTime = Time.monotonicNow();
|
||||
init(res);
|
||||
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
|
||||
Set<String> results = null;
|
||||
try {
|
||||
results = timelineReaderManager.getEntityTypes(
|
||||
TimelineReaderWebServicesUtils.createTimelineReaderContext(
|
||||
clusterId, userId, flowName, flowRunId, appId,
|
||||
null, null));
|
||||
} catch (Exception e) {
|
||||
handleException(e, url, startTime, "flowrunid");
|
||||
}
|
||||
long endTime = Time.monotonicNow();
|
||||
LOG.info("Processed URL " + url +
|
||||
" (Took " + (endTime - startTime) + " ms.)");
|
||||
return results;
|
||||
}
|
||||
}
|
|
@ -32,6 +32,7 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVParser;
|
||||
|
@ -403,4 +404,24 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
context.getEntityType());
|
||||
return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
|
||||
}
|
||||
|
||||
@Override public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||
throws IOException {
|
||||
Set<String> result = new TreeSet<>();
|
||||
String flowRunPath = getFlowRunPath(context.getUserId(),
|
||||
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
|
||||
context.getAppId());
|
||||
File dir = new File(new File(rootPath, ENTITIES_DIR),
|
||||
context.getClusterId() + File.separator + flowRunPath
|
||||
+ File.separator + context.getAppId());
|
||||
File[] fileList = dir.listFiles();
|
||||
if (fileList != null) {
|
||||
for (File f : fileList) {
|
||||
if (f.isDirectory()) {
|
||||
result.add(f.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrie
|
|||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
|
||||
|
||||
|
@ -85,4 +86,11 @@ public class HBaseTimelineReaderImpl
|
|||
filters, dataToRetrieve);
|
||||
return reader.readEntities(hbaseConf, conn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||
throws IOException {
|
||||
EntityTypeReader reader = new EntityTypeReader(context);
|
||||
return reader.readEntityTypes(hbaseConf, conn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -177,4 +177,17 @@ public interface TimelineReader extends Service {
|
|||
TimelineReaderContext context,
|
||||
TimelineEntityFilters filters,
|
||||
TimelineDataToRetrieve dataToRetrieve) throws IOException;
|
||||
|
||||
/**
|
||||
* The API to list all available entity types of the given context.
|
||||
*
|
||||
* @param context A context defines the scope of this query. The incoming
|
||||
* context should contain at least the cluster id and application id.
|
||||
*
|
||||
* @return A set of entity types available in the given context.
|
||||
*
|
||||
* @throws IOException if an exception occurred while listing from backend
|
||||
* storage.
|
||||
*/
|
||||
Set<String> getEntityTypes(TimelineReaderContext context) throws IOException;
|
||||
}
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
|||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -605,4 +607,38 @@ public final class TimelineStorageUtils {
|
|||
return hbaseConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a row key prefix stored in a byte array, return a byte array for its
|
||||
* immediate next row key.
|
||||
*
|
||||
* @param rowKeyPrefix The provided row key prefix, represented in an array.
|
||||
* @return the closest next row key of the provided row key.
|
||||
*/
|
||||
public static byte[] calculateTheClosestNextRowKeyForPrefix(
|
||||
byte[] rowKeyPrefix) {
|
||||
// Essentially we are treating it like an 'unsigned very very long' and
|
||||
// doing +1 manually.
|
||||
// Search for the place where the trailing 0xFFs start
|
||||
int offset = rowKeyPrefix.length;
|
||||
while (offset > 0) {
|
||||
if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
|
||||
break;
|
||||
}
|
||||
offset--;
|
||||
}
|
||||
|
||||
if (offset == 0) {
|
||||
// We got an 0xFFFF... (only FFs) stopRow value which is
|
||||
// the last possible prefix before the end of the table.
|
||||
// So set it to stop at the 'end of the table'
|
||||
return HConstants.EMPTY_END_ROW;
|
||||
}
|
||||
|
||||
// Copy the right length of the original
|
||||
byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
|
||||
// And increment the last one
|
||||
newStopRow[newStopRow.length - 1]++;
|
||||
return newStopRow;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The base class for reading timeline data from the HBase storage. This class
|
||||
* provides basic support to validate and augment reader context.
|
||||
*/
|
||||
public abstract class AbstractTimelineStorageReader {
|
||||
|
||||
private final TimelineReaderContext context;
|
||||
/**
|
||||
* Used to look up the flow context.
|
||||
*/
|
||||
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
|
||||
|
||||
public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
|
||||
context = ctxt;
|
||||
}
|
||||
|
||||
protected TimelineReaderContext getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks up flow context from AppToFlow table.
|
||||
*
|
||||
* @param appToFlowRowKey to identify Cluster and App Ids.
|
||||
* @param hbaseConf HBase configuration.
|
||||
* @param conn HBase Connection.
|
||||
* @return flow context information.
|
||||
* @throws IOException if any problem occurs while fetching flow information.
|
||||
*/
|
||||
protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
|
||||
Configuration hbaseConf, Connection conn) throws IOException {
|
||||
byte[] rowKey = appToFlowRowKey.getRowKey();
|
||||
Get get = new Get(rowKey);
|
||||
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
|
||||
if (result != null && !result.isEmpty()) {
|
||||
return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
|
||||
.toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
||||
((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
|
||||
.longValue());
|
||||
} else {
|
||||
throw new NotFoundException(
|
||||
"Unable to find the context flow ID and flow run ID for clusterId="
|
||||
+ appToFlowRowKey.getClusterId() + ", appId="
|
||||
+ appToFlowRowKey.getAppId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets certain parameters to defaults if the values are not provided.
|
||||
*
|
||||
* @param hbaseConf HBase Configuration.
|
||||
* @param conn HBase Connection.
|
||||
* @throws IOException if any exception is encountered while setting params.
|
||||
*/
|
||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
throws IOException {
|
||||
defaultAugmentParams(hbaseConf, conn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Default behavior for all timeline readers to augment parameters.
|
||||
*
|
||||
* @param hbaseConf HBase Configuration.
|
||||
* @param conn HBase Connection.
|
||||
* @throws IOException if any exception is encountered while setting params.
|
||||
*/
|
||||
final protected void defaultAugmentParams(Configuration hbaseConf,
|
||||
Connection conn) throws IOException {
|
||||
// In reality all three should be null or neither should be null
|
||||
if (context.getFlowName() == null || context.getFlowRunId() == null
|
||||
|| context.getUserId() == null) {
|
||||
// Get flow context information from AppToFlow table.
|
||||
AppToFlowRowKey appToFlowRowKey =
|
||||
new AppToFlowRowKey(context.getClusterId(), context.getAppId());
|
||||
FlowContext flowContext =
|
||||
lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
|
||||
context.setFlowName(flowContext.flowName);
|
||||
context.setFlowRunId(flowContext.flowRunId);
|
||||
context.setUserId(flowContext.userId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the required parameters to read the entities.
|
||||
*/
|
||||
protected abstract void validateParams();
|
||||
|
||||
/**
|
||||
* Encapsulates flow context information.
|
||||
*/
|
||||
protected static class FlowContext {
|
||||
private final String userId;
|
||||
private final String flowName;
|
||||
private final Long flowRunId;
|
||||
|
||||
public FlowContext(String user, String flowName, Long flowRunId) {
|
||||
this.userId = user;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
}
|
||||
|
||||
protected String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
protected String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
protected Long getFlowRunId() {
|
||||
return flowRunId;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||
|
@ -343,20 +342,9 @@ class ApplicationEntityReader extends GenericEntityReader {
|
|||
@Override
|
||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
throws IOException {
|
||||
TimelineReaderContext context = getContext();
|
||||
if (isSingleEntityRead()) {
|
||||
// Get flow context information from AppToFlow table.
|
||||
if (context.getFlowName() == null || context.getFlowRunId() == null
|
||||
|| context.getUserId() == null) {
|
||||
AppToFlowRowKey appToFlowRowKey =
|
||||
new AppToFlowRowKey(context.getClusterId(), context.getAppId());
|
||||
FlowContext flowContext =
|
||||
lookupFlowContext(appToFlowRowKey,
|
||||
hbaseConf, conn);
|
||||
context.setFlowName(flowContext.getFlowName());
|
||||
context.setFlowRunId(flowContext.getFlowRunId());
|
||||
context.setUserId(flowContext.getUserId());
|
||||
}
|
||||
defaultAugmentParams(hbaseConf, conn);
|
||||
}
|
||||
// Add configs/metrics to fields to retrieve if confsToRetrieve and/or
|
||||
// metricsToRetrieve are specified.
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
|
||||
import org.apache.hadoop.hbase.filter.PageFilter;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* Timeline entity reader for listing all available entity types given one
|
||||
* reader context. Right now only supports listing all entity types within one
|
||||
* YARN application.
|
||||
*/
|
||||
public final class EntityTypeReader extends AbstractTimelineStorageReader {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(EntityTypeReader.class);
|
||||
private static final EntityTable ENTITY_TABLE = new EntityTable();
|
||||
|
||||
public EntityTypeReader(TimelineReaderContext context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a set of timeline entity types from the HBase storage for the given
|
||||
* context.
|
||||
*
|
||||
* @param hbaseConf HBase Configuration.
|
||||
* @param conn HBase Connection.
|
||||
* @return a set of <cite>TimelineEntity</cite> objects, with only type field
|
||||
* set.
|
||||
* @throws IOException if any exception is encountered while reading entities.
|
||||
*/
|
||||
public Set<String> readEntityTypes(Configuration hbaseConf,
|
||||
Connection conn) throws IOException {
|
||||
|
||||
validateParams();
|
||||
augmentParams(hbaseConf, conn);
|
||||
|
||||
Set<String> types = new TreeSet<>();
|
||||
TimelineReaderContext context = getContext();
|
||||
EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(),
|
||||
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
|
||||
context.getAppId());
|
||||
byte[] currRowKey = prefix.getRowKeyPrefix();
|
||||
byte[] nextRowKey = prefix.getRowKeyPrefix();
|
||||
nextRowKey[nextRowKey.length - 1]++;
|
||||
|
||||
FilterList typeFilterList = new FilterList();
|
||||
typeFilterList.addFilter(new FirstKeyOnlyFilter());
|
||||
typeFilterList.addFilter(new KeyOnlyFilter());
|
||||
typeFilterList.addFilter(new PageFilter(1));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("FilterList created for scan is - " + typeFilterList);
|
||||
}
|
||||
|
||||
int counter = 0;
|
||||
while (true) {
|
||||
try (ResultScanner results
|
||||
= getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey))
|
||||
{
|
||||
TimelineEntity entity = parseEntityForType(results.next());
|
||||
if (entity == null) {
|
||||
break;
|
||||
}
|
||||
++counter;
|
||||
if (!types.add(entity.getType())) {
|
||||
LOG.warn("Failed to add type " + entity.getType()
|
||||
+ " to the result set because there is a duplicated copy. ");
|
||||
}
|
||||
String currType = entity.getType();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Current row key: " + Arrays.toString(currRowKey));
|
||||
LOG.debug("New entity type discovered: " + currType);
|
||||
}
|
||||
currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Scanned " + counter + "records for "
|
||||
+ types.size() + "types");
|
||||
}
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateParams() {
|
||||
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
||||
Preconditions.checkNotNull(getContext().getClusterId(),
|
||||
"clusterId shouldn't be null");
|
||||
Preconditions.checkNotNull(getContext().getAppId(),
|
||||
"appId shouldn't be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the possibly next row key prefix given current prefix and type.
|
||||
*
|
||||
* @param currRowKeyPrefix The current prefix that contains user, cluster,
|
||||
* flow, run, and application id.
|
||||
* @param entityType Current entity type.
|
||||
* @return A new prefix for the possibly immediately next row key.
|
||||
*/
|
||||
private static byte[] getNextRowKey(byte[] currRowKeyPrefix,
|
||||
String entityType) {
|
||||
if (currRowKeyPrefix == null || entityType == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
byte[] entityTypeEncoded = Separator.QUALIFIERS.join(
|
||||
Separator.encode(entityType, Separator.SPACE, Separator.TAB,
|
||||
Separator.QUALIFIERS),
|
||||
Separator.EMPTY_BYTES);
|
||||
|
||||
byte[] currRowKey
|
||||
= new byte[currRowKeyPrefix.length + entityTypeEncoded.length];
|
||||
System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0,
|
||||
currRowKeyPrefix.length);
|
||||
System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
|
||||
entityTypeEncoded.length);
|
||||
|
||||
return
|
||||
TimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(currRowKey);
|
||||
}
|
||||
|
||||
private ResultScanner getResult(Configuration hbaseConf, Connection conn,
|
||||
FilterList filterList, byte[] startPrefix, byte[] endPrefix)
|
||||
throws IOException {
|
||||
Scan scan = new Scan(startPrefix, endPrefix);
|
||||
scan.setFilter(filterList);
|
||||
scan.setSmall(true);
|
||||
return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan);
|
||||
}
|
||||
|
||||
private TimelineEntity parseEntityForType(Result result)
|
||||
throws IOException {
|
||||
if (result == null || result.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow());
|
||||
entity.setType(newRowKey.getEntityType());
|
||||
return entity;
|
||||
}
|
||||
|
||||
}
|
|
@ -41,9 +41,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContex
|
|||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
||||
|
@ -56,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -67,11 +63,6 @@ import com.google.common.base.Preconditions;
|
|||
class GenericEntityReader extends TimelineEntityReader {
|
||||
private static final EntityTable ENTITY_TABLE = new EntityTable();
|
||||
|
||||
/**
|
||||
* Used to look up the flow context.
|
||||
*/
|
||||
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
|
||||
|
||||
/**
|
||||
* Used to convert strings key components to and from storage format.
|
||||
*/
|
||||
|
@ -400,60 +391,6 @@ class GenericEntityReader extends TimelineEntityReader {
|
|||
return listBasedOnFields;
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks up flow context from AppToFlow table.
|
||||
*
|
||||
* @param appToFlowRowKey to identify Cluster and App Ids.
|
||||
* @param hbaseConf HBase configuration.
|
||||
* @param conn HBase Connection.
|
||||
* @return flow context information.
|
||||
* @throws IOException if any problem occurs while fetching flow information.
|
||||
*/
|
||||
protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
|
||||
Configuration hbaseConf, Connection conn) throws IOException {
|
||||
byte[] rowKey = appToFlowRowKey.getRowKey();
|
||||
Get get = new Get(rowKey);
|
||||
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
|
||||
if (result != null && !result.isEmpty()) {
|
||||
return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
|
||||
.toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
||||
((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
|
||||
.longValue());
|
||||
} else {
|
||||
throw new NotFoundException(
|
||||
"Unable to find the context flow ID and flow run ID for clusterId="
|
||||
+ appToFlowRowKey.getClusterId() + ", appId="
|
||||
+ appToFlowRowKey.getAppId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Encapsulates flow context information.
|
||||
*/
|
||||
protected static class FlowContext {
|
||||
private final String userId;
|
||||
private final String flowName;
|
||||
private final Long flowRunId;
|
||||
|
||||
public FlowContext(String user, String flowName, Long flowRunId) {
|
||||
this.userId = user;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
}
|
||||
|
||||
protected String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
protected String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
protected Long getFlowRunId() {
|
||||
return flowRunId;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateParams() {
|
||||
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
||||
|
@ -474,19 +411,7 @@ class GenericEntityReader extends TimelineEntityReader {
|
|||
@Override
|
||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
throws IOException {
|
||||
TimelineReaderContext context = getContext();
|
||||
// In reality all three should be null or neither should be null
|
||||
if (context.getFlowName() == null || context.getFlowRunId() == null
|
||||
|| context.getUserId() == null) {
|
||||
// Get flow context information from AppToFlow table.
|
||||
AppToFlowRowKey appToFlowRowKey =
|
||||
new AppToFlowRowKey(context.getClusterId(), context.getAppId());
|
||||
FlowContext flowContext =
|
||||
lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
|
||||
context.setFlowName(flowContext.flowName);
|
||||
context.setFlowRunId(flowContext.flowRunId);
|
||||
context.setUserId(flowContext.userId);
|
||||
}
|
||||
defaultAugmentParams(hbaseConf, conn);
|
||||
// Add configs/metrics to fields to retrieve if confsToRetrieve and/or
|
||||
// metricsToRetrieve are specified.
|
||||
getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
|
||||
|
|
|
@ -60,11 +60,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
|
|||
* HBase storage. Different types can be defined for different types of the
|
||||
* entities that are being requested.
|
||||
*/
|
||||
public abstract class TimelineEntityReader {
|
||||
public abstract class TimelineEntityReader extends
|
||||
AbstractTimelineStorageReader {
|
||||
private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
|
||||
|
||||
private final boolean singleEntityRead;
|
||||
private TimelineReaderContext context;
|
||||
private TimelineDataToRetrieve dataToRetrieve;
|
||||
// used only for multiple entity read mode
|
||||
private TimelineEntityFilters filters;
|
||||
|
@ -101,9 +101,9 @@ public abstract class TimelineEntityReader {
|
|||
protected TimelineEntityReader(TimelineReaderContext ctxt,
|
||||
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
|
||||
boolean sortedKeys) {
|
||||
super(ctxt);
|
||||
this.singleEntityRead = false;
|
||||
this.sortedKeys = sortedKeys;
|
||||
this.context = ctxt;
|
||||
this.dataToRetrieve = toRetrieve;
|
||||
this.filters = entityFilters;
|
||||
|
||||
|
@ -119,8 +119,8 @@ public abstract class TimelineEntityReader {
|
|||
*/
|
||||
protected TimelineEntityReader(TimelineReaderContext ctxt,
|
||||
TimelineDataToRetrieve toRetrieve) {
|
||||
super(ctxt);
|
||||
this.singleEntityRead = true;
|
||||
this.context = ctxt;
|
||||
this.dataToRetrieve = toRetrieve;
|
||||
|
||||
this.setTable(getTable());
|
||||
|
@ -184,10 +184,6 @@ public abstract class TimelineEntityReader {
|
|||
return null;
|
||||
}
|
||||
|
||||
protected TimelineReaderContext getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
protected TimelineDataToRetrieve getDataToRetrieve() {
|
||||
return dataToRetrieve;
|
||||
}
|
||||
|
@ -228,7 +224,7 @@ public abstract class TimelineEntityReader {
|
|||
if (result == null || result.isEmpty()) {
|
||||
// Could not find a matching row.
|
||||
LOG.info("Cannot find matching entity of type " +
|
||||
context.getEntityType());
|
||||
getContext().getEntityType());
|
||||
return null;
|
||||
}
|
||||
return parseEntity(result);
|
||||
|
@ -287,21 +283,6 @@ public abstract class TimelineEntityReader {
|
|||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the required parameters to read the entities.
|
||||
*/
|
||||
protected abstract void validateParams();
|
||||
|
||||
/**
|
||||
* Sets certain parameters to defaults if the values are not provided.
|
||||
*
|
||||
* @param hbaseConf HBase Configuration.
|
||||
* @param conn HBase Connection.
|
||||
* @throws IOException if any exception is encountered while setting params.
|
||||
*/
|
||||
protected abstract void augmentParams(Configuration hbaseConf,
|
||||
Connection conn) throws IOException;
|
||||
|
||||
/**
|
||||
* Fetches a {@link Result} instance for a single-entity read.
|
||||
*
|
||||
|
|
|
@ -86,4 +86,17 @@ public final class TimelineEntityReaderFactory {
|
|||
return new GenericEntityReader(context, filters, dataToRetrieve, false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a timeline entity type reader that will read all available entity
|
||||
* types within the specified context.
|
||||
*
|
||||
* @param context Reader context which defines the scope in which query has to
|
||||
* be made. Limited to application level only.
|
||||
* @return an <cite>EntityTypeReader</cite> object
|
||||
*/
|
||||
public static EntityTypeReader createEntityTypeReader(
|
||||
TimelineReaderContext context) {
|
||||
return new EntityTypeReader(context);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue