diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java index 5cbb78145d3..cafacab1d66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; final class DataGeneratorForTest { static void loadApps(HBaseTestingUtility util) throws IOException { @@ -358,6 +359,46 @@ static void loadEntities(HBaseTestingUtility util) throws IOException { relatesTo3.put("container2", relatesToSet14); entity2.setRelatesToEntities(relatesTo3); te.addEntity(entity2); + + // For listing types + for (int i = 0; i < 10; i++) { + TimelineEntity entity3 = new TimelineEntity(); + String id3 = "typeTest" + i; + entity3.setId(id3); + StringBuilder typeName = new StringBuilder("newType"); + for (int j = 0; j < (i % 3); j++) { + typeName.append(" ").append(j); + } + entity3.setType(typeName.toString()); + entity3.setCreatedTime(cTime + 80L + i); + te.addEntity(entity3); + } + + // Create app entity for app to flow table + TimelineEntities appTe1 = new TimelineEntities(); + TimelineEntity entityApp1 = new TimelineEntity(); + String appName1 = "application_1231111111_1111"; + entityApp1.setId(appName1); + entityApp1.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entityApp1.setCreatedTime(cTime + 40L); + TimelineEvent appCreationEvent1 = new TimelineEvent(); + appCreationEvent1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + appCreationEvent1.setTimestamp(cTime); + entityApp1.addEvent(appCreationEvent1); + appTe1.addEntity(entityApp1); + + TimelineEntities appTe2 = new TimelineEntities(); + TimelineEntity entityApp2 = new TimelineEntity(); + String appName2 = "application_1231111111_1112"; + entityApp2.setId(appName2); + entityApp2.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entityApp2.setCreatedTime(cTime + 50L); + TimelineEvent appCreationEvent2 = new TimelineEvent(); + appCreationEvent2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + appCreationEvent2.setTimestamp(cTime); + entityApp2.addEvent(appCreationEvent2); + appTe2.addEntity(entityApp2); + HBaseTimelineWriterImpl hbi = null; try { hbi = new HBaseTimelineWriterImpl(); @@ -368,8 +409,10 @@ static void loadEntities(HBaseTestingUtility util) throws IOException { String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - String appName = "application_1231111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName1, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName2, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1); + hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2); hbi.stop(); } finally { if (hbi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java index e18d0d065b4..b12f45ada64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java @@ -1672,6 +1672,29 @@ public void testReadEntitiesInfoFilters() throws Exception { assertEquals(3, entities.size()); } + @Test(timeout = 90000) + public void testListTypesInApp() throws Exception { + Set types = reader.getEntityTypes( + new TimelineReaderContext("cluster1", "user1", "some_flow_name", + 1002345678919L, "application_1231111111_1111", null, null)); + assertEquals(4, types.size()); + + types = reader.getEntityTypes( + new TimelineReaderContext("cluster1", null, null, + null, "application_1231111111_1111", null, null)); + assertEquals(4, types.size()); + + types = reader.getEntityTypes( + new TimelineReaderContext("cluster1", null, null, + null, "application_1231111111_1112", null, null)); + assertEquals(4, types.size()); + + types = reader.getEntityTypes( + new TimelineReaderContext("cluster1", "user1", "some_flow_name", + 1002345678919L, "application_1231111111_1113", null, null)); + assertEquals(0, types.size()); + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index d81c78ce61e..cb216dfe7d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader; import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; import org.slf4j.Logger; @@ -86,4 +87,11 @@ public Set getEntities(TimelineReaderContext context, filters, dataToRetrieve); return reader.readEntities(hbaseConf, conn); } + + @Override + public Set getEntityTypes(TimelineReaderContext context) + throws IOException { + EntityTypeReader reader = new EntityTypeReader(context); + return reader.readEntityTypes(hbaseConf, conn); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java index 25834dc06ea..cdf928731f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; @@ -38,6 +39,7 @@ import java.io.IOException; import java.text.NumberFormat; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -313,4 +315,38 @@ public static Configuration getTimelineServiceHBaseConf(Configuration conf) } return hbaseConf; } + + /** + * Given a row key prefix stored in a byte array, return a byte array for its + * immediate next row key. + * + * @param rowKeyPrefix The provided row key prefix, represented in an array. + * @return the closest next row key of the provided row key. + */ + public static byte[] calculateTheClosestNextRowKeyForPrefix( + byte[] rowKeyPrefix) { + // Essentially we are treating it like an 'unsigned very very long' and + // doing +1 manually. + // Search for the place where the trailing 0xFFs start + int offset = rowKeyPrefix.length; + while (offset > 0) { + if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { + break; + } + offset--; + } + + if (offset == 0) { + // We got an 0xFFFF... (only FFs) stopRow value which is + // the last possible prefix before the end of the table. + // So set it to stop at the 'end of the table' + return HConstants.EMPTY_END_ROW; + } + + // Copy the right length of the original + byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); + // And increment the last one + newStopRow[newStopRow.length - 1]++; + return newStopRow; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java new file mode 100644 index 00000000000..fcd8320d552 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.webapp.NotFoundException; + +import java.io.IOException; + +/** + * The base class for reading timeline data from the HBase storage. This class + * provides basic support to validate and augment reader context. + */ +public abstract class AbstractTimelineStorageReader { + + private final TimelineReaderContext context; + /** + * Used to look up the flow context. + */ + private final AppToFlowTable appToFlowTable = new AppToFlowTable(); + + public AbstractTimelineStorageReader(TimelineReaderContext ctxt) { + context = ctxt; + } + + protected TimelineReaderContext getContext() { + return context; + } + + /** + * Looks up flow context from AppToFlow table. + * + * @param appToFlowRowKey to identify Cluster and App Ids. + * @param hbaseConf HBase configuration. + * @param conn HBase Connection. + * @return flow context information. + * @throws IOException if any problem occurs while fetching flow information. + */ + protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, + Configuration hbaseConf, Connection conn) throws IOException { + byte[] rowKey = appToFlowRowKey.getRowKey(); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + return new FlowContext(AppToFlowColumn.USER_ID.readResult(result) + .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)) + .longValue()); + } else { + throw new NotFoundException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + appToFlowRowKey.getClusterId() + ", appId=" + + appToFlowRowKey.getAppId()); + } + } + + /** + * Sets certain parameters to defaults if the values are not provided. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @throws IOException if any exception is encountered while setting params. + */ + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + defaultAugmentParams(hbaseConf, conn); + } + + /** + * Default behavior for all timeline readers to augment parameters. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @throws IOException if any exception is encountered while setting params. + */ + final protected void defaultAugmentParams(Configuration hbaseConf, + Connection conn) throws IOException { + // In reality all three should be null or neither should be null + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { + // Get flow context information from AppToFlow table. + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, hbaseConf, conn); + context.setFlowName(flowContext.flowName); + context.setFlowRunId(flowContext.flowRunId); + context.setUserId(flowContext.userId); + } + } + + /** + * Validates the required parameters to read the entities. + */ + protected abstract void validateParams(); + + /** + * Encapsulates flow context information. + */ + protected static class FlowContext { + private final String userId; + private final String flowName; + private final Long flowRunId; + + public FlowContext(String user, String flowName, Long flowRunId) { + this.userId = user; + this.flowName = flowName; + this.flowRunId = flowRunId; + } + + protected String getUserId() { + return userId; + } + + protected String getFlowName() { + return flowName; + } + + protected Long getFlowRunId() { + return flowRunId; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index aa2bfdae658..42a6aa88a8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -343,20 +342,9 @@ protected void validateParams() { @Override protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { - TimelineReaderContext context = getContext(); if (isSingleEntityRead()) { // Get flow context information from AppToFlow table. - if (context.getFlowName() == null || context.getFlowRunId() == null - || context.getUserId() == null) { - AppToFlowRowKey appToFlowRowKey = - new AppToFlowRowKey(context.getClusterId(), context.getAppId()); - FlowContext flowContext = - lookupFlowContext(appToFlowRowKey, - hbaseConf, conn); - context.setFlowName(flowContext.getFlowName()); - context.setFlowRunId(flowContext.getFlowRunId()); - context.setUserId(flowContext.getUserId()); - } + defaultAugmentParams(hbaseConf, conn); } // Add configs/metrics to fields to retrieve if confsToRetrieve and/or // metricsToRetrieve are specified. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java new file mode 100644 index 00000000000..ca3cfdd469a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.TreeSet; + +/** + * Timeline entity reader for listing all available entity types given one + * reader context. Right now only supports listing all entity types within one + * YARN application. + */ +public final class EntityTypeReader extends AbstractTimelineStorageReader { + + private static final Log LOG = LogFactory.getLog(EntityTypeReader.class); + private static final EntityTable ENTITY_TABLE = new EntityTable(); + + public EntityTypeReader(TimelineReaderContext context) { + super(context); + } + + /** + * Reads a set of timeline entity types from the HBase storage for the given + * context. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @return a set of TimelineEntity objects, with only type field + * set. + * @throws IOException if any exception is encountered while reading entities. + */ + public Set readEntityTypes(Configuration hbaseConf, + Connection conn) throws IOException { + + validateParams(); + augmentParams(hbaseConf, conn); + + Set types = new TreeSet<>(); + TimelineReaderContext context = getContext(); + EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId()); + byte[] currRowKey = prefix.getRowKeyPrefix(); + byte[] nextRowKey = prefix.getRowKeyPrefix(); + nextRowKey[nextRowKey.length - 1]++; + + FilterList typeFilterList = new FilterList(); + typeFilterList.addFilter(new FirstKeyOnlyFilter()); + typeFilterList.addFilter(new KeyOnlyFilter()); + typeFilterList.addFilter(new PageFilter(1)); + if (LOG.isDebugEnabled()) { + LOG.debug("FilterList created for scan is - " + typeFilterList); + } + + int counter = 0; + while (true) { + try (ResultScanner results + = getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey)) + { + TimelineEntity entity = parseEntityForType(results.next()); + if (entity == null) { + break; + } + ++counter; + if (!types.add(entity.getType())) { + LOG.warn("Failed to add type " + entity.getType() + + " to the result set because there is a duplicated copy. "); + } + String currType = entity.getType(); + if (LOG.isDebugEnabled()) { + LOG.debug("Current row key: " + Arrays.toString(currRowKey)); + LOG.debug("New entity type discovered: " + currType); + } + currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Scanned " + counter + "records for " + + types.size() + "types"); + } + return types; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getAppId(), + "appId shouldn't be null"); + } + + /** + * Gets the possibly next row key prefix given current prefix and type. + * + * @param currRowKeyPrefix The current prefix that contains user, cluster, + * flow, run, and application id. + * @param entityType Current entity type. + * @return A new prefix for the possibly immediately next row key. + */ + private static byte[] getNextRowKey(byte[] currRowKeyPrefix, + String entityType) { + if (currRowKeyPrefix == null || entityType == null) { + return null; + } + + byte[] entityTypeEncoded = Separator.QUALIFIERS.join( + Separator.encode(entityType, Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), + Separator.EMPTY_BYTES); + + byte[] currRowKey + = new byte[currRowKeyPrefix.length + entityTypeEncoded.length]; + System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0, + currRowKeyPrefix.length); + System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length, + entityTypeEncoded.length); + + return HBaseTimelineStorageUtils. + calculateTheClosestNextRowKeyForPrefix(currRowKey); + } + + private ResultScanner getResult(Configuration hbaseConf, Connection conn, + FilterList filterList, byte[] startPrefix, byte[] endPrefix) + throws IOException { + Scan scan = new Scan(startPrefix, endPrefix); + scan.setFilter(filterList); + scan.setSmall(true); + return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan); + } + + private TimelineEntity parseEntityForType(Result result) + throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow()); + entity.setType(newRowKey.getEntityType()); + return entity; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 1e78a180732..0b3f7df4000 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -41,9 +41,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; @@ -56,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.webapp.NotFoundException; import com.google.common.base.Preconditions; @@ -67,11 +63,6 @@ class GenericEntityReader extends TimelineEntityReader { private static final EntityTable ENTITY_TABLE = new EntityTable(); - /** - * Used to look up the flow context. - */ - private final AppToFlowTable appToFlowTable = new AppToFlowTable(); - /** * Used to convert strings key components to and from storage format. */ @@ -400,60 +391,6 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { return listBasedOnFields; } - /** - * Looks up flow context from AppToFlow table. - * - * @param appToFlowRowKey to identify Cluster and App Ids. - * @param hbaseConf HBase configuration. - * @param conn HBase Connection. - * @return flow context information. - * @throws IOException if any problem occurs while fetching flow information. - */ - protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, - Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = appToFlowRowKey.getRowKey(); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - return new FlowContext(AppToFlowColumn.USER_ID.readResult(result) - .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)) - .longValue()); - } else { - throw new NotFoundException( - "Unable to find the context flow ID and flow run ID for clusterId=" - + appToFlowRowKey.getClusterId() + ", appId=" - + appToFlowRowKey.getAppId()); - } - } - - /** - * Encapsulates flow context information. - */ - protected static class FlowContext { - private final String userId; - private final String flowName; - private final Long flowRunId; - - public FlowContext(String user, String flowName, Long flowRunId) { - this.userId = user; - this.flowName = flowName; - this.flowRunId = flowRunId; - } - - protected String getUserId() { - return userId; - } - - protected String getFlowName() { - return flowName; - } - - protected Long getFlowRunId() { - return flowRunId; - } - } - @Override protected void validateParams() { Preconditions.checkNotNull(getContext(), "context shouldn't be null"); @@ -474,19 +411,7 @@ protected void validateParams() { @Override protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { - TimelineReaderContext context = getContext(); - // In reality all three should be null or neither should be null - if (context.getFlowName() == null || context.getFlowRunId() == null - || context.getUserId() == null) { - // Get flow context information from AppToFlow table. - AppToFlowRowKey appToFlowRowKey = - new AppToFlowRowKey(context.getClusterId(), context.getAppId()); - FlowContext flowContext = - lookupFlowContext(appToFlowRowKey, hbaseConf, conn); - context.setFlowName(flowContext.flowName); - context.setFlowRunId(flowContext.flowRunId); - context.setUserId(flowContext.userId); - } + defaultAugmentParams(hbaseConf, conn); // Add configs/metrics to fields to retrieve if confsToRetrieve and/or // metricsToRetrieve are specified. getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 424d14118b2..b521278ea8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -60,12 +60,12 @@ * HBase storage. Different types can be defined for different types of the * entities that are being requested. */ -public abstract class TimelineEntityReader { +public abstract class TimelineEntityReader extends + AbstractTimelineStorageReader { private static final Logger LOG = LoggerFactory.getLogger(TimelineEntityReader.class); private final boolean singleEntityRead; - private TimelineReaderContext context; private TimelineDataToRetrieve dataToRetrieve; // used only for multiple entity read mode private TimelineEntityFilters filters; @@ -102,9 +102,9 @@ public abstract class TimelineEntityReader { protected TimelineEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, boolean sortedKeys) { + super(ctxt); this.singleEntityRead = false; this.sortedKeys = sortedKeys; - this.context = ctxt; this.dataToRetrieve = toRetrieve; this.filters = entityFilters; @@ -120,8 +120,8 @@ protected TimelineEntityReader(TimelineReaderContext ctxt, */ protected TimelineEntityReader(TimelineReaderContext ctxt, TimelineDataToRetrieve toRetrieve) { + super(ctxt); this.singleEntityRead = true; - this.context = ctxt; this.dataToRetrieve = toRetrieve; this.setTable(getTable()); @@ -185,10 +185,6 @@ private FilterList createFilterList() throws IOException { return null; } - protected TimelineReaderContext getContext() { - return context; - } - protected TimelineDataToRetrieve getDataToRetrieve() { return dataToRetrieve; } @@ -229,7 +225,7 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) if (result == null || result.isEmpty()) { // Could not find a matching row. LOG.info("Cannot find matching entity of type " + - context.getEntityType()); + getContext().getEntityType()); return null; } return parseEntity(result); @@ -288,21 +284,6 @@ protected BaseTable getTable() { return table; } - /** - * Validates the required parameters to read the entities. - */ - protected abstract void validateParams(); - - /** - * Sets certain parameters to defaults if the values are not provided. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @throws IOException if any exception is encountered while setting params. - */ - protected abstract void augmentParams(Configuration hbaseConf, - Connection conn) throws IOException; - /** * Fetches a {@link Result} instance for a single-entity read. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java index b2a947648a2..e90338e085c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -86,4 +86,17 @@ public static TimelineEntityReader createMultipleEntitiesReader( return new GenericEntityReader(context, filters, dataToRetrieve, false); } } + + /** + * Creates a timeline entity type reader that will read all available entity + * types within the specified context. + * + * @param context Reader context which defines the scope in which query has to + * be made. Limited to application level only. + * @return an EntityTypeReader object + */ + public static EntityTypeReader createEntityTypeReader( + TimelineReaderContext context) { + return new EntityTypeReader(context); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 4cff3bc2e4c..6e8b823bdd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -176,4 +176,24 @@ public TimelineEntity getEntity(TimelineReaderContext context, } return entity; } + + /** + * Gets a list of available timeline entity types for an application. This can + * be done by making a call to the backend storage implementation. The meaning + * of each argument in detail is the same as {@link TimelineReader#getEntity}. + * If cluster ID has not been supplied by the client, fills the cluster id + * from config before making a call to backend storage. + * + * @param context Timeline context within the scope of which entity types + * have to be fetched. Entity type field of this context should + * be null. + * @return A set which contains available timeline entity types, represented + * as strings if found, empty otherwise. + * @throws IOException if any problem occurs while getting entity types. + */ + public Set getEntityTypes(TimelineReaderContext context) + throws IOException{ + context.setClusterId(getClusterID(context.getClusterId(), getConfig())); + return reader.getEntityTypes(new TimelineReaderContext(context)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index b3e3cdc5fee..af8b12b8ad9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -2860,4 +2860,106 @@ public TimelineEntity getContainer(@Context HttpServletRequest req, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit); } + + /** + * Returns a set of available entity types for a given app id. Cluster ID is + * not provided by client so default cluster ID has to be taken. If userid, + * flow name and flow run id which are optional query parameters are not + * specified, they will be queried based on app id and cluster id from the + * flow context information stored in underlying storage implementation. + * + * @param req Servlet request. + * @param res Servlet response. + * @param appId Application id to be queried(Mandatory path param). + * @param flowName Flow name which should match for the app(Optional query + * param). + * @param flowRunId Run id which should match for the app(Optional query + * param). + * @param userId User id which should match for the app(Optional query param). + * + * @return If successful, a HTTP 200(OK) response having a JSON representing a + * list contains all timeline entity types is returned.
+ * On failures,
+ * If any problem occurs in parsing request, HTTP 400(Bad Request) is + * returned.
+ * If flow context information cannot be retrieved or app for the given + * app id cannot be found, HTTP 404(Not Found) is returned.
+ * For all other errors while retrieving data, HTTP 500(Internal Server + * Error) is returned. + */ + @GET + @Path("/apps/{appid}/entity-types") + @Produces(MediaType.APPLICATION_JSON) + public Set getEntityTypes( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("appid") String appId, + @QueryParam("flowname") String flowName, + @QueryParam("flowrunid") String flowRunId, + @QueryParam("userid") String userId) { + return getEntityTypes(req, res, null, appId, flowName, flowRunId, userId); + } + + /** + * Returns a set of available entity types for a given app id. If userid, + * flow name and flow run id which are optional query parameters are not + * specified, they will be queried based on app id and cluster id from the + * flow context information stored in underlying storage implementation. + * + * @param req Servlet request. + * @param res Servlet response. + * @param clusterId Cluster id to which the app to be queried belong to( + * Mandatory path param). + * @param appId Application id to be queried(Mandatory path param). + * @param flowName Flow name which should match for the app(Optional query + * param). + * @param flowRunId Run id which should match for the app(Optional query + * param). + * @param userId User id which should match for the app(Optional query param). + * + * @return If successful, a HTTP 200(OK) response having a JSON representing a + * list contains all timeline entity types is returned.
+ * On failures,
+ * If any problem occurs in parsing request, HTTP 400(Bad Request) is + * returned.
+ * If flow context information cannot be retrieved or app for the given + * app id cannot be found, HTTP 404(Not Found) is returned.
+ * For all other errors while retrieving data, HTTP 500(Internal Server + * Error) is returned. + */ + @GET + @Path("/clusters/{clusterid}/apps/{appid}/entity-types") + @Produces(MediaType.APPLICATION_JSON) + public Set getEntityTypes( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("clusterid") String clusterId, + @PathParam("appid") String appId, + @QueryParam("flowname") String flowName, + @QueryParam("flowrunid") String flowRunId, + @QueryParam("userid") String userId) { + String url = req.getRequestURI() + + (req.getQueryString() == null ? "" : + QUERY_STRING_SEP + req.getQueryString()); + UserGroupInformation callerUGI = + TimelineReaderWebServicesUtils.getUser(req); + LOG.info("Received URL " + url + " from user " + + TimelineReaderWebServicesUtils.getUserName(callerUGI)); + long startTime = Time.monotonicNow(); + init(res); + TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); + Set results = null; + try { + results = timelineReaderManager.getEntityTypes( + TimelineReaderWebServicesUtils.createTimelineReaderContext( + clusterId, userId, flowName, flowRunId, appId, + null, null)); + } catch (Exception e) { + handleException(e, url, startTime, "flowrunid"); + } + long endTime = Time.monotonicNow(); + LOG.info("Processed URL " + url + + " (Took " + (endTime - startTime) + " ms.)"); + return results; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index b4e792b0ea5..adb8821c3f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -32,6 +32,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.databind.JsonMappingException; @@ -408,4 +409,24 @@ public Set getEntities(TimelineReaderContext context, context.getEntityType()); return getEntities(dir, context.getEntityType(), filters, dataToRetrieve); } + + @Override public Set getEntityTypes(TimelineReaderContext context) + throws IOException { + Set result = new TreeSet<>(); + String flowRunPath = getFlowRunPath(context.getUserId(), + context.getClusterId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId()); + File dir = new File(new File(rootPath, ENTITIES_DIR), + context.getClusterId() + File.separator + flowRunPath + + File.separator + context.getAppId()); + File[] fileList = dir.listFiles(); + if (fileList != null) { + for (File f : fileList) { + if (f.isDirectory()) { + result.add(f.getName()); + } + } + } + return result; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java index e8eabf161a9..d7c1552d0aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -177,4 +177,17 @@ Set getEntities( TimelineReaderContext context, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) throws IOException; + + /** + * The API to list all available entity types of the given context. + * + * @param context A context defines the scope of this query. The incoming + * context should contain at least the cluster id and application id. + * + * @return A set of entity types available in the given context. + * + * @throws IOException if an exception occurred while listing from backend + * storage. + */ + Set getEntityTypes(TimelineReaderContext context) throws IOException; } \ No newline at end of file