YARN-5739. Provide timeline reader API to list available timeline entity types for one application. Contributed by Li Lu.
This commit is contained in:
parent
c3d6c7494f
commit
27b8a3fc75
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
|
||||||
final class DataGeneratorForTest {
|
final class DataGeneratorForTest {
|
||||||
static void loadApps(HBaseTestingUtility util) throws IOException {
|
static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||||
@ -358,6 +359,46 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
|||||||
relatesTo3.put("container2", relatesToSet14);
|
relatesTo3.put("container2", relatesToSet14);
|
||||||
entity2.setRelatesToEntities(relatesTo3);
|
entity2.setRelatesToEntities(relatesTo3);
|
||||||
te.addEntity(entity2);
|
te.addEntity(entity2);
|
||||||
|
|
||||||
|
// For listing types
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
TimelineEntity entity3 = new TimelineEntity();
|
||||||
|
String id3 = "typeTest" + i;
|
||||||
|
entity3.setId(id3);
|
||||||
|
StringBuilder typeName = new StringBuilder("newType");
|
||||||
|
for (int j = 0; j < (i % 3); j++) {
|
||||||
|
typeName.append(" ").append(j);
|
||||||
|
}
|
||||||
|
entity3.setType(typeName.toString());
|
||||||
|
entity3.setCreatedTime(cTime + 80L + i);
|
||||||
|
te.addEntity(entity3);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create app entity for app to flow table
|
||||||
|
TimelineEntities appTe1 = new TimelineEntities();
|
||||||
|
TimelineEntity entityApp1 = new TimelineEntity();
|
||||||
|
String appName1 = "application_1231111111_1111";
|
||||||
|
entityApp1.setId(appName1);
|
||||||
|
entityApp1.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||||
|
entityApp1.setCreatedTime(cTime + 40L);
|
||||||
|
TimelineEvent appCreationEvent1 = new TimelineEvent();
|
||||||
|
appCreationEvent1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||||
|
appCreationEvent1.setTimestamp(cTime);
|
||||||
|
entityApp1.addEvent(appCreationEvent1);
|
||||||
|
appTe1.addEntity(entityApp1);
|
||||||
|
|
||||||
|
TimelineEntities appTe2 = new TimelineEntities();
|
||||||
|
TimelineEntity entityApp2 = new TimelineEntity();
|
||||||
|
String appName2 = "application_1231111111_1112";
|
||||||
|
entityApp2.setId(appName2);
|
||||||
|
entityApp2.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||||
|
entityApp2.setCreatedTime(cTime + 50L);
|
||||||
|
TimelineEvent appCreationEvent2 = new TimelineEvent();
|
||||||
|
appCreationEvent2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||||
|
appCreationEvent2.setTimestamp(cTime);
|
||||||
|
entityApp2.addEvent(appCreationEvent2);
|
||||||
|
appTe2.addEntity(entityApp2);
|
||||||
|
|
||||||
HBaseTimelineWriterImpl hbi = null;
|
HBaseTimelineWriterImpl hbi = null;
|
||||||
try {
|
try {
|
||||||
hbi = new HBaseTimelineWriterImpl();
|
hbi = new HBaseTimelineWriterImpl();
|
||||||
@ -368,8 +409,10 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
|||||||
String flow = "some_flow_name";
|
String flow = "some_flow_name";
|
||||||
String flowVersion = "AB7822C10F1111";
|
String flowVersion = "AB7822C10F1111";
|
||||||
long runid = 1002345678919L;
|
long runid = 1002345678919L;
|
||||||
String appName = "application_1231111111_1111";
|
hbi.write(cluster, user, flow, flowVersion, runid, appName1, te);
|
||||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
hbi.write(cluster, user, flow, flowVersion, runid, appName2, te);
|
||||||
|
hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1);
|
||||||
|
hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2);
|
||||||
hbi.stop();
|
hbi.stop();
|
||||||
} finally {
|
} finally {
|
||||||
if (hbi != null) {
|
if (hbi != null) {
|
||||||
|
@ -1672,6 +1672,29 @@ public void testReadEntitiesInfoFilters() throws Exception {
|
|||||||
assertEquals(3, entities.size());
|
assertEquals(3, entities.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testListTypesInApp() throws Exception {
|
||||||
|
Set<String> types = reader.getEntityTypes(
|
||||||
|
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||||
|
1002345678919L, "application_1231111111_1111", null, null));
|
||||||
|
assertEquals(4, types.size());
|
||||||
|
|
||||||
|
types = reader.getEntityTypes(
|
||||||
|
new TimelineReaderContext("cluster1", null, null,
|
||||||
|
null, "application_1231111111_1111", null, null));
|
||||||
|
assertEquals(4, types.size());
|
||||||
|
|
||||||
|
types = reader.getEntityTypes(
|
||||||
|
new TimelineReaderContext("cluster1", null, null,
|
||||||
|
null, "application_1231111111_1112", null, null));
|
||||||
|
assertEquals(4, types.size());
|
||||||
|
|
||||||
|
types = reader.getEntityTypes(
|
||||||
|
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||||
|
1002345678919L, "application_1231111111_1113", null, null));
|
||||||
|
assertEquals(0, types.size());
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
util.shutdownMiniCluster();
|
util.shutdownMiniCluster();
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -86,4 +87,11 @@ public Set<TimelineEntity> getEntities(TimelineReaderContext context,
|
|||||||
filters, dataToRetrieve);
|
filters, dataToRetrieve);
|
||||||
return reader.readEntities(hbaseConf, conn);
|
return reader.readEntities(hbaseConf, conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||||
|
throws IOException {
|
||||||
|
EntityTypeReader reader = new EntityTypeReader(context);
|
||||||
|
return reader.readEntityTypes(hbaseConf, conn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.Tag;
|
import org.apache.hadoop.hbase.Tag;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.text.NumberFormat;
|
import java.text.NumberFormat;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@ -313,4 +315,38 @@ public static Configuration getTimelineServiceHBaseConf(Configuration conf)
|
|||||||
}
|
}
|
||||||
return hbaseConf;
|
return hbaseConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a row key prefix stored in a byte array, return a byte array for its
|
||||||
|
* immediate next row key.
|
||||||
|
*
|
||||||
|
* @param rowKeyPrefix The provided row key prefix, represented in an array.
|
||||||
|
* @return the closest next row key of the provided row key.
|
||||||
|
*/
|
||||||
|
public static byte[] calculateTheClosestNextRowKeyForPrefix(
|
||||||
|
byte[] rowKeyPrefix) {
|
||||||
|
// Essentially we are treating it like an 'unsigned very very long' and
|
||||||
|
// doing +1 manually.
|
||||||
|
// Search for the place where the trailing 0xFFs start
|
||||||
|
int offset = rowKeyPrefix.length;
|
||||||
|
while (offset > 0) {
|
||||||
|
if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
offset--;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offset == 0) {
|
||||||
|
// We got an 0xFFFF... (only FFs) stopRow value which is
|
||||||
|
// the last possible prefix before the end of the table.
|
||||||
|
// So set it to stop at the 'end of the table'
|
||||||
|
return HConstants.EMPTY_END_ROW;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the right length of the original
|
||||||
|
byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
|
||||||
|
// And increment the last one
|
||||||
|
newStopRow[newStopRow.length - 1]++;
|
||||||
|
return newStopRow;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,145 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The base class for reading timeline data from the HBase storage. This class
|
||||||
|
* provides basic support to validate and augment reader context.
|
||||||
|
*/
|
||||||
|
public abstract class AbstractTimelineStorageReader {
|
||||||
|
|
||||||
|
private final TimelineReaderContext context;
|
||||||
|
/**
|
||||||
|
* Used to look up the flow context.
|
||||||
|
*/
|
||||||
|
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
|
||||||
|
|
||||||
|
public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
|
||||||
|
context = ctxt;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TimelineReaderContext getContext() {
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Looks up flow context from AppToFlow table.
|
||||||
|
*
|
||||||
|
* @param appToFlowRowKey to identify Cluster and App Ids.
|
||||||
|
* @param hbaseConf HBase configuration.
|
||||||
|
* @param conn HBase Connection.
|
||||||
|
* @return flow context information.
|
||||||
|
* @throws IOException if any problem occurs while fetching flow information.
|
||||||
|
*/
|
||||||
|
protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
|
||||||
|
Configuration hbaseConf, Connection conn) throws IOException {
|
||||||
|
byte[] rowKey = appToFlowRowKey.getRowKey();
|
||||||
|
Get get = new Get(rowKey);
|
||||||
|
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
|
||||||
|
if (result != null && !result.isEmpty()) {
|
||||||
|
return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
|
||||||
|
.toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
||||||
|
((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
|
||||||
|
.longValue());
|
||||||
|
} else {
|
||||||
|
throw new NotFoundException(
|
||||||
|
"Unable to find the context flow ID and flow run ID for clusterId="
|
||||||
|
+ appToFlowRowKey.getClusterId() + ", appId="
|
||||||
|
+ appToFlowRowKey.getAppId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets certain parameters to defaults if the values are not provided.
|
||||||
|
*
|
||||||
|
* @param hbaseConf HBase Configuration.
|
||||||
|
* @param conn HBase Connection.
|
||||||
|
* @throws IOException if any exception is encountered while setting params.
|
||||||
|
*/
|
||||||
|
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||||
|
throws IOException {
|
||||||
|
defaultAugmentParams(hbaseConf, conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default behavior for all timeline readers to augment parameters.
|
||||||
|
*
|
||||||
|
* @param hbaseConf HBase Configuration.
|
||||||
|
* @param conn HBase Connection.
|
||||||
|
* @throws IOException if any exception is encountered while setting params.
|
||||||
|
*/
|
||||||
|
final protected void defaultAugmentParams(Configuration hbaseConf,
|
||||||
|
Connection conn) throws IOException {
|
||||||
|
// In reality all three should be null or neither should be null
|
||||||
|
if (context.getFlowName() == null || context.getFlowRunId() == null
|
||||||
|
|| context.getUserId() == null) {
|
||||||
|
// Get flow context information from AppToFlow table.
|
||||||
|
AppToFlowRowKey appToFlowRowKey =
|
||||||
|
new AppToFlowRowKey(context.getClusterId(), context.getAppId());
|
||||||
|
FlowContext flowContext =
|
||||||
|
lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
|
||||||
|
context.setFlowName(flowContext.flowName);
|
||||||
|
context.setFlowRunId(flowContext.flowRunId);
|
||||||
|
context.setUserId(flowContext.userId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates the required parameters to read the entities.
|
||||||
|
*/
|
||||||
|
protected abstract void validateParams();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulates flow context information.
|
||||||
|
*/
|
||||||
|
protected static class FlowContext {
|
||||||
|
private final String userId;
|
||||||
|
private final String flowName;
|
||||||
|
private final Long flowRunId;
|
||||||
|
|
||||||
|
public FlowContext(String user, String flowName, Long flowRunId) {
|
||||||
|
this.userId = user;
|
||||||
|
this.flowName = flowName;
|
||||||
|
this.flowRunId = flowRunId;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getUserId() {
|
||||||
|
return userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getFlowName() {
|
||||||
|
return flowName;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Long getFlowRunId() {
|
||||||
|
return flowRunId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -48,7 +48,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||||
@ -343,20 +342,9 @@ protected void validateParams() {
|
|||||||
@Override
|
@Override
|
||||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
TimelineReaderContext context = getContext();
|
|
||||||
if (isSingleEntityRead()) {
|
if (isSingleEntityRead()) {
|
||||||
// Get flow context information from AppToFlow table.
|
// Get flow context information from AppToFlow table.
|
||||||
if (context.getFlowName() == null || context.getFlowRunId() == null
|
defaultAugmentParams(hbaseConf, conn);
|
||||||
|| context.getUserId() == null) {
|
|
||||||
AppToFlowRowKey appToFlowRowKey =
|
|
||||||
new AppToFlowRowKey(context.getClusterId(), context.getAppId());
|
|
||||||
FlowContext flowContext =
|
|
||||||
lookupFlowContext(appToFlowRowKey,
|
|
||||||
hbaseConf, conn);
|
|
||||||
context.setFlowName(flowContext.getFlowName());
|
|
||||||
context.setFlowRunId(flowContext.getFlowRunId());
|
|
||||||
context.setUserId(flowContext.getUserId());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Add configs/metrics to fields to retrieve if confsToRetrieve and/or
|
// Add configs/metrics to fields to retrieve if confsToRetrieve and/or
|
||||||
// metricsToRetrieve are specified.
|
// metricsToRetrieve are specified.
|
||||||
|
@ -0,0 +1,181 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterList;
|
||||||
|
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||||
|
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
|
||||||
|
import org.apache.hadoop.hbase.filter.PageFilter;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeline entity reader for listing all available entity types given one
|
||||||
|
* reader context. Right now only supports listing all entity types within one
|
||||||
|
* YARN application.
|
||||||
|
*/
|
||||||
|
public final class EntityTypeReader extends AbstractTimelineStorageReader {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(EntityTypeReader.class);
|
||||||
|
private static final EntityTable ENTITY_TABLE = new EntityTable();
|
||||||
|
|
||||||
|
public EntityTypeReader(TimelineReaderContext context) {
|
||||||
|
super(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a set of timeline entity types from the HBase storage for the given
|
||||||
|
* context.
|
||||||
|
*
|
||||||
|
* @param hbaseConf HBase Configuration.
|
||||||
|
* @param conn HBase Connection.
|
||||||
|
* @return a set of <cite>TimelineEntity</cite> objects, with only type field
|
||||||
|
* set.
|
||||||
|
* @throws IOException if any exception is encountered while reading entities.
|
||||||
|
*/
|
||||||
|
public Set<String> readEntityTypes(Configuration hbaseConf,
|
||||||
|
Connection conn) throws IOException {
|
||||||
|
|
||||||
|
validateParams();
|
||||||
|
augmentParams(hbaseConf, conn);
|
||||||
|
|
||||||
|
Set<String> types = new TreeSet<>();
|
||||||
|
TimelineReaderContext context = getContext();
|
||||||
|
EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(),
|
||||||
|
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
|
||||||
|
context.getAppId());
|
||||||
|
byte[] currRowKey = prefix.getRowKeyPrefix();
|
||||||
|
byte[] nextRowKey = prefix.getRowKeyPrefix();
|
||||||
|
nextRowKey[nextRowKey.length - 1]++;
|
||||||
|
|
||||||
|
FilterList typeFilterList = new FilterList();
|
||||||
|
typeFilterList.addFilter(new FirstKeyOnlyFilter());
|
||||||
|
typeFilterList.addFilter(new KeyOnlyFilter());
|
||||||
|
typeFilterList.addFilter(new PageFilter(1));
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("FilterList created for scan is - " + typeFilterList);
|
||||||
|
}
|
||||||
|
|
||||||
|
int counter = 0;
|
||||||
|
while (true) {
|
||||||
|
try (ResultScanner results
|
||||||
|
= getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey))
|
||||||
|
{
|
||||||
|
TimelineEntity entity = parseEntityForType(results.next());
|
||||||
|
if (entity == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
++counter;
|
||||||
|
if (!types.add(entity.getType())) {
|
||||||
|
LOG.warn("Failed to add type " + entity.getType()
|
||||||
|
+ " to the result set because there is a duplicated copy. ");
|
||||||
|
}
|
||||||
|
String currType = entity.getType();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Current row key: " + Arrays.toString(currRowKey));
|
||||||
|
LOG.debug("New entity type discovered: " + currType);
|
||||||
|
}
|
||||||
|
currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Scanned " + counter + "records for "
|
||||||
|
+ types.size() + "types");
|
||||||
|
}
|
||||||
|
return types;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void validateParams() {
|
||||||
|
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
||||||
|
Preconditions.checkNotNull(getContext().getClusterId(),
|
||||||
|
"clusterId shouldn't be null");
|
||||||
|
Preconditions.checkNotNull(getContext().getAppId(),
|
||||||
|
"appId shouldn't be null");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the possibly next row key prefix given current prefix and type.
|
||||||
|
*
|
||||||
|
* @param currRowKeyPrefix The current prefix that contains user, cluster,
|
||||||
|
* flow, run, and application id.
|
||||||
|
* @param entityType Current entity type.
|
||||||
|
* @return A new prefix for the possibly immediately next row key.
|
||||||
|
*/
|
||||||
|
private static byte[] getNextRowKey(byte[] currRowKeyPrefix,
|
||||||
|
String entityType) {
|
||||||
|
if (currRowKeyPrefix == null || entityType == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] entityTypeEncoded = Separator.QUALIFIERS.join(
|
||||||
|
Separator.encode(entityType, Separator.SPACE, Separator.TAB,
|
||||||
|
Separator.QUALIFIERS),
|
||||||
|
Separator.EMPTY_BYTES);
|
||||||
|
|
||||||
|
byte[] currRowKey
|
||||||
|
= new byte[currRowKeyPrefix.length + entityTypeEncoded.length];
|
||||||
|
System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0,
|
||||||
|
currRowKeyPrefix.length);
|
||||||
|
System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
|
||||||
|
entityTypeEncoded.length);
|
||||||
|
|
||||||
|
return HBaseTimelineStorageUtils.
|
||||||
|
calculateTheClosestNextRowKeyForPrefix(currRowKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResultScanner getResult(Configuration hbaseConf, Connection conn,
|
||||||
|
FilterList filterList, byte[] startPrefix, byte[] endPrefix)
|
||||||
|
throws IOException {
|
||||||
|
Scan scan = new Scan(startPrefix, endPrefix);
|
||||||
|
scan.setFilter(filterList);
|
||||||
|
scan.setSmall(true);
|
||||||
|
return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimelineEntity parseEntityForType(Result result)
|
||||||
|
throws IOException {
|
||||||
|
if (result == null || result.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow());
|
||||||
|
entity.setType(newRowKey.getEntityType());
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -41,9 +41,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
||||||
@ -56,7 +53,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
@ -67,11 +63,6 @@
|
|||||||
class GenericEntityReader extends TimelineEntityReader {
|
class GenericEntityReader extends TimelineEntityReader {
|
||||||
private static final EntityTable ENTITY_TABLE = new EntityTable();
|
private static final EntityTable ENTITY_TABLE = new EntityTable();
|
||||||
|
|
||||||
/**
|
|
||||||
* Used to look up the flow context.
|
|
||||||
*/
|
|
||||||
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to convert strings key components to and from storage format.
|
* Used to convert strings key components to and from storage format.
|
||||||
*/
|
*/
|
||||||
@ -400,60 +391,6 @@ protected FilterList constructFilterListBasedOnFields() throws IOException {
|
|||||||
return listBasedOnFields;
|
return listBasedOnFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Looks up flow context from AppToFlow table.
|
|
||||||
*
|
|
||||||
* @param appToFlowRowKey to identify Cluster and App Ids.
|
|
||||||
* @param hbaseConf HBase configuration.
|
|
||||||
* @param conn HBase Connection.
|
|
||||||
* @return flow context information.
|
|
||||||
* @throws IOException if any problem occurs while fetching flow information.
|
|
||||||
*/
|
|
||||||
protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
|
|
||||||
Configuration hbaseConf, Connection conn) throws IOException {
|
|
||||||
byte[] rowKey = appToFlowRowKey.getRowKey();
|
|
||||||
Get get = new Get(rowKey);
|
|
||||||
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
|
|
||||||
if (result != null && !result.isEmpty()) {
|
|
||||||
return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
|
|
||||||
.toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
|
||||||
((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
|
|
||||||
.longValue());
|
|
||||||
} else {
|
|
||||||
throw new NotFoundException(
|
|
||||||
"Unable to find the context flow ID and flow run ID for clusterId="
|
|
||||||
+ appToFlowRowKey.getClusterId() + ", appId="
|
|
||||||
+ appToFlowRowKey.getAppId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Encapsulates flow context information.
|
|
||||||
*/
|
|
||||||
protected static class FlowContext {
|
|
||||||
private final String userId;
|
|
||||||
private final String flowName;
|
|
||||||
private final Long flowRunId;
|
|
||||||
|
|
||||||
public FlowContext(String user, String flowName, Long flowRunId) {
|
|
||||||
this.userId = user;
|
|
||||||
this.flowName = flowName;
|
|
||||||
this.flowRunId = flowRunId;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getUserId() {
|
|
||||||
return userId;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getFlowName() {
|
|
||||||
return flowName;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Long getFlowRunId() {
|
|
||||||
return flowRunId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void validateParams() {
|
protected void validateParams() {
|
||||||
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
||||||
@ -474,19 +411,7 @@ protected void validateParams() {
|
|||||||
@Override
|
@Override
|
||||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
TimelineReaderContext context = getContext();
|
defaultAugmentParams(hbaseConf, conn);
|
||||||
// In reality all three should be null or neither should be null
|
|
||||||
if (context.getFlowName() == null || context.getFlowRunId() == null
|
|
||||||
|| context.getUserId() == null) {
|
|
||||||
// Get flow context information from AppToFlow table.
|
|
||||||
AppToFlowRowKey appToFlowRowKey =
|
|
||||||
new AppToFlowRowKey(context.getClusterId(), context.getAppId());
|
|
||||||
FlowContext flowContext =
|
|
||||||
lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
|
|
||||||
context.setFlowName(flowContext.flowName);
|
|
||||||
context.setFlowRunId(flowContext.flowRunId);
|
|
||||||
context.setUserId(flowContext.userId);
|
|
||||||
}
|
|
||||||
// Add configs/metrics to fields to retrieve if confsToRetrieve and/or
|
// Add configs/metrics to fields to retrieve if confsToRetrieve and/or
|
||||||
// metricsToRetrieve are specified.
|
// metricsToRetrieve are specified.
|
||||||
getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
|
getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
|
||||||
|
@ -60,12 +60,12 @@
|
|||||||
* HBase storage. Different types can be defined for different types of the
|
* HBase storage. Different types can be defined for different types of the
|
||||||
* entities that are being requested.
|
* entities that are being requested.
|
||||||
*/
|
*/
|
||||||
public abstract class TimelineEntityReader {
|
public abstract class TimelineEntityReader extends
|
||||||
|
AbstractTimelineStorageReader {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TimelineEntityReader.class);
|
LoggerFactory.getLogger(TimelineEntityReader.class);
|
||||||
|
|
||||||
private final boolean singleEntityRead;
|
private final boolean singleEntityRead;
|
||||||
private TimelineReaderContext context;
|
|
||||||
private TimelineDataToRetrieve dataToRetrieve;
|
private TimelineDataToRetrieve dataToRetrieve;
|
||||||
// used only for multiple entity read mode
|
// used only for multiple entity read mode
|
||||||
private TimelineEntityFilters filters;
|
private TimelineEntityFilters filters;
|
||||||
@ -102,9 +102,9 @@ public abstract class TimelineEntityReader {
|
|||||||
protected TimelineEntityReader(TimelineReaderContext ctxt,
|
protected TimelineEntityReader(TimelineReaderContext ctxt,
|
||||||
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
|
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
|
||||||
boolean sortedKeys) {
|
boolean sortedKeys) {
|
||||||
|
super(ctxt);
|
||||||
this.singleEntityRead = false;
|
this.singleEntityRead = false;
|
||||||
this.sortedKeys = sortedKeys;
|
this.sortedKeys = sortedKeys;
|
||||||
this.context = ctxt;
|
|
||||||
this.dataToRetrieve = toRetrieve;
|
this.dataToRetrieve = toRetrieve;
|
||||||
this.filters = entityFilters;
|
this.filters = entityFilters;
|
||||||
|
|
||||||
@ -120,8 +120,8 @@ protected TimelineEntityReader(TimelineReaderContext ctxt,
|
|||||||
*/
|
*/
|
||||||
protected TimelineEntityReader(TimelineReaderContext ctxt,
|
protected TimelineEntityReader(TimelineReaderContext ctxt,
|
||||||
TimelineDataToRetrieve toRetrieve) {
|
TimelineDataToRetrieve toRetrieve) {
|
||||||
|
super(ctxt);
|
||||||
this.singleEntityRead = true;
|
this.singleEntityRead = true;
|
||||||
this.context = ctxt;
|
|
||||||
this.dataToRetrieve = toRetrieve;
|
this.dataToRetrieve = toRetrieve;
|
||||||
|
|
||||||
this.setTable(getTable());
|
this.setTable(getTable());
|
||||||
@ -185,10 +185,6 @@ private FilterList createFilterList() throws IOException {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TimelineReaderContext getContext() {
|
|
||||||
return context;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected TimelineDataToRetrieve getDataToRetrieve() {
|
protected TimelineDataToRetrieve getDataToRetrieve() {
|
||||||
return dataToRetrieve;
|
return dataToRetrieve;
|
||||||
}
|
}
|
||||||
@ -229,7 +225,7 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
|
|||||||
if (result == null || result.isEmpty()) {
|
if (result == null || result.isEmpty()) {
|
||||||
// Could not find a matching row.
|
// Could not find a matching row.
|
||||||
LOG.info("Cannot find matching entity of type " +
|
LOG.info("Cannot find matching entity of type " +
|
||||||
context.getEntityType());
|
getContext().getEntityType());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return parseEntity(result);
|
return parseEntity(result);
|
||||||
@ -288,21 +284,6 @@ protected BaseTable<?> getTable() {
|
|||||||
return table;
|
return table;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Validates the required parameters to read the entities.
|
|
||||||
*/
|
|
||||||
protected abstract void validateParams();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets certain parameters to defaults if the values are not provided.
|
|
||||||
*
|
|
||||||
* @param hbaseConf HBase Configuration.
|
|
||||||
* @param conn HBase Connection.
|
|
||||||
* @throws IOException if any exception is encountered while setting params.
|
|
||||||
*/
|
|
||||||
protected abstract void augmentParams(Configuration hbaseConf,
|
|
||||||
Connection conn) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches a {@link Result} instance for a single-entity read.
|
* Fetches a {@link Result} instance for a single-entity read.
|
||||||
*
|
*
|
||||||
|
@ -86,4 +86,17 @@ public static TimelineEntityReader createMultipleEntitiesReader(
|
|||||||
return new GenericEntityReader(context, filters, dataToRetrieve, false);
|
return new GenericEntityReader(context, filters, dataToRetrieve, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a timeline entity type reader that will read all available entity
|
||||||
|
* types within the specified context.
|
||||||
|
*
|
||||||
|
* @param context Reader context which defines the scope in which query has to
|
||||||
|
* be made. Limited to application level only.
|
||||||
|
* @return an <cite>EntityTypeReader</cite> object
|
||||||
|
*/
|
||||||
|
public static EntityTypeReader createEntityTypeReader(
|
||||||
|
TimelineReaderContext context) {
|
||||||
|
return new EntityTypeReader(context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,4 +176,24 @@ public TimelineEntity getEntity(TimelineReaderContext context,
|
|||||||
}
|
}
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a list of available timeline entity types for an application. This can
|
||||||
|
* be done by making a call to the backend storage implementation. The meaning
|
||||||
|
* of each argument in detail is the same as {@link TimelineReader#getEntity}.
|
||||||
|
* If cluster ID has not been supplied by the client, fills the cluster id
|
||||||
|
* from config before making a call to backend storage.
|
||||||
|
*
|
||||||
|
* @param context Timeline context within the scope of which entity types
|
||||||
|
* have to be fetched. Entity type field of this context should
|
||||||
|
* be null.
|
||||||
|
* @return A set which contains available timeline entity types, represented
|
||||||
|
* as strings if found, empty otherwise.
|
||||||
|
* @throws IOException if any problem occurs while getting entity types.
|
||||||
|
*/
|
||||||
|
public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||||
|
throws IOException{
|
||||||
|
context.setClusterId(getClusterID(context.getClusterId(), getConfig()));
|
||||||
|
return reader.getEntityTypes(new TimelineReaderContext(context));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2860,4 +2860,106 @@ public TimelineEntity getContainer(@Context HttpServletRequest req,
|
|||||||
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
|
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
|
||||||
metricsLimit);
|
metricsLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a set of available entity types for a given app id. Cluster ID is
|
||||||
|
* not provided by client so default cluster ID has to be taken. If userid,
|
||||||
|
* flow name and flow run id which are optional query parameters are not
|
||||||
|
* specified, they will be queried based on app id and cluster id from the
|
||||||
|
* flow context information stored in underlying storage implementation.
|
||||||
|
*
|
||||||
|
* @param req Servlet request.
|
||||||
|
* @param res Servlet response.
|
||||||
|
* @param appId Application id to be queried(Mandatory path param).
|
||||||
|
* @param flowName Flow name which should match for the app(Optional query
|
||||||
|
* param).
|
||||||
|
* @param flowRunId Run id which should match for the app(Optional query
|
||||||
|
* param).
|
||||||
|
* @param userId User id which should match for the app(Optional query param).
|
||||||
|
*
|
||||||
|
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
||||||
|
* list contains all timeline entity types is returned.<br>
|
||||||
|
* On failures,<br>
|
||||||
|
* If any problem occurs in parsing request, HTTP 400(Bad Request) is
|
||||||
|
* returned.<br>
|
||||||
|
* If flow context information cannot be retrieved or app for the given
|
||||||
|
* app id cannot be found, HTTP 404(Not Found) is returned.<br>
|
||||||
|
* For all other errors while retrieving data, HTTP 500(Internal Server
|
||||||
|
* Error) is returned.
|
||||||
|
*/
|
||||||
|
@GET
|
||||||
|
@Path("/apps/{appid}/entity-types")
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
public Set<String> getEntityTypes(
|
||||||
|
@Context HttpServletRequest req,
|
||||||
|
@Context HttpServletResponse res,
|
||||||
|
@PathParam("appid") String appId,
|
||||||
|
@QueryParam("flowname") String flowName,
|
||||||
|
@QueryParam("flowrunid") String flowRunId,
|
||||||
|
@QueryParam("userid") String userId) {
|
||||||
|
return getEntityTypes(req, res, null, appId, flowName, flowRunId, userId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a set of available entity types for a given app id. If userid,
|
||||||
|
* flow name and flow run id which are optional query parameters are not
|
||||||
|
* specified, they will be queried based on app id and cluster id from the
|
||||||
|
* flow context information stored in underlying storage implementation.
|
||||||
|
*
|
||||||
|
* @param req Servlet request.
|
||||||
|
* @param res Servlet response.
|
||||||
|
* @param clusterId Cluster id to which the app to be queried belong to(
|
||||||
|
* Mandatory path param).
|
||||||
|
* @param appId Application id to be queried(Mandatory path param).
|
||||||
|
* @param flowName Flow name which should match for the app(Optional query
|
||||||
|
* param).
|
||||||
|
* @param flowRunId Run id which should match for the app(Optional query
|
||||||
|
* param).
|
||||||
|
* @param userId User id which should match for the app(Optional query param).
|
||||||
|
*
|
||||||
|
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
||||||
|
* list contains all timeline entity types is returned.<br>
|
||||||
|
* On failures,<br>
|
||||||
|
* If any problem occurs in parsing request, HTTP 400(Bad Request) is
|
||||||
|
* returned.<br>
|
||||||
|
* If flow context information cannot be retrieved or app for the given
|
||||||
|
* app id cannot be found, HTTP 404(Not Found) is returned.<br>
|
||||||
|
* For all other errors while retrieving data, HTTP 500(Internal Server
|
||||||
|
* Error) is returned.
|
||||||
|
*/
|
||||||
|
@GET
|
||||||
|
@Path("/clusters/{clusterid}/apps/{appid}/entity-types")
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
public Set<String> getEntityTypes(
|
||||||
|
@Context HttpServletRequest req,
|
||||||
|
@Context HttpServletResponse res,
|
||||||
|
@PathParam("clusterid") String clusterId,
|
||||||
|
@PathParam("appid") String appId,
|
||||||
|
@QueryParam("flowname") String flowName,
|
||||||
|
@QueryParam("flowrunid") String flowRunId,
|
||||||
|
@QueryParam("userid") String userId) {
|
||||||
|
String url = req.getRequestURI() +
|
||||||
|
(req.getQueryString() == null ? "" :
|
||||||
|
QUERY_STRING_SEP + req.getQueryString());
|
||||||
|
UserGroupInformation callerUGI =
|
||||||
|
TimelineReaderWebServicesUtils.getUser(req);
|
||||||
|
LOG.info("Received URL " + url + " from user " +
|
||||||
|
TimelineReaderWebServicesUtils.getUserName(callerUGI));
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
|
init(res);
|
||||||
|
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
|
||||||
|
Set<String> results = null;
|
||||||
|
try {
|
||||||
|
results = timelineReaderManager.getEntityTypes(
|
||||||
|
TimelineReaderWebServicesUtils.createTimelineReaderContext(
|
||||||
|
clusterId, userId, flowName, flowRunId, appId,
|
||||||
|
null, null));
|
||||||
|
} catch (Exception e) {
|
||||||
|
handleException(e, url, startTime, "flowrunid");
|
||||||
|
}
|
||||||
|
long endTime = Time.monotonicNow();
|
||||||
|
LOG.info("Processed URL " + url +
|
||||||
|
" (Took " + (endTime - startTime) + " ms.)");
|
||||||
|
return results;
|
||||||
|
}
|
||||||
}
|
}
|
@ -32,6 +32,7 @@
|
|||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonGenerationException;
|
import com.fasterxml.jackson.core.JsonGenerationException;
|
||||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||||
@ -408,4 +409,24 @@ public Set<TimelineEntity> getEntities(TimelineReaderContext context,
|
|||||||
context.getEntityType());
|
context.getEntityType());
|
||||||
return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
|
return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||||
|
throws IOException {
|
||||||
|
Set<String> result = new TreeSet<>();
|
||||||
|
String flowRunPath = getFlowRunPath(context.getUserId(),
|
||||||
|
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
|
||||||
|
context.getAppId());
|
||||||
|
File dir = new File(new File(rootPath, ENTITIES_DIR),
|
||||||
|
context.getClusterId() + File.separator + flowRunPath
|
||||||
|
+ File.separator + context.getAppId());
|
||||||
|
File[] fileList = dir.listFiles();
|
||||||
|
if (fileList != null) {
|
||||||
|
for (File f : fileList) {
|
||||||
|
if (f.isDirectory()) {
|
||||||
|
result.add(f.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
@ -177,4 +177,17 @@ Set<TimelineEntity> getEntities(
|
|||||||
TimelineReaderContext context,
|
TimelineReaderContext context,
|
||||||
TimelineEntityFilters filters,
|
TimelineEntityFilters filters,
|
||||||
TimelineDataToRetrieve dataToRetrieve) throws IOException;
|
TimelineDataToRetrieve dataToRetrieve) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The API to list all available entity types of the given context.
|
||||||
|
*
|
||||||
|
* @param context A context defines the scope of this query. The incoming
|
||||||
|
* context should contain at least the cluster id and application id.
|
||||||
|
*
|
||||||
|
* @return A set of entity types available in the given context.
|
||||||
|
*
|
||||||
|
* @throws IOException if an exception occurred while listing from backend
|
||||||
|
* storage.
|
||||||
|
*/
|
||||||
|
Set<String> getEntityTypes(TimelineReaderContext context) throws IOException;
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user