YARN-6733. Add table for storing sub-application entities. Contributed by Vrushali C.

This commit is contained in:
Rohith Sharma K S 2017-07-25 15:25:21 +05:30 committed by Varun Saxena
parent 61136d03f2
commit a990ff70c2
13 changed files with 1114 additions and 4 deletions

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelin
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@ -63,7 +64,9 @@ public final class TimelineSchemaCreator {
LoggerFactory.getLogger(TimelineSchemaCreator.class);
private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
private static final String APP_METRICS_TTL_OPTION_SHORT = "ma";
private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa";
private static final String APP_TABLE_NAME_SHORT = "a";
private static final String SUB_APP_TABLE_NAME_SHORT = "sa";
private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me";
private static final String ENTITY_TABLE_NAME_SHORT = "e";
@ -121,6 +124,21 @@ public final class TimelineSchemaCreator {
new ApplicationTable().setMetricsTTL(appMetricsTTL, hbaseConf);
}
// Grab the subApplicationTableName argument
String subApplicationTableName = commandLine.getOptionValue(
SUB_APP_TABLE_NAME_SHORT);
if (StringUtils.isNotBlank(subApplicationTableName)) {
hbaseConf.set(SubApplicationTable.TABLE_NAME_CONF_NAME,
subApplicationTableName);
}
// Grab the subApplication metrics TTL
String subApplicationTableMetricsTTL = commandLine
.getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT);
if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) {
int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL);
new SubApplicationTable().setMetricsTTL(subAppMetricsTTL, hbaseConf);
}
// create all table schemas in hbase
final boolean skipExisting = commandLine.hasOption(
SKIP_EXISTING_TABLE_OPTION_SHORT);
@ -182,6 +200,18 @@ public final class TimelineSchemaCreator {
o.setRequired(false);
options.addOption(o);
o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true,
"subApplication table name");
o.setArgName("subApplicationTableName");
o.setRequired(false);
options.addOption(o);
o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL",
true, "TTL for metrics column family");
o.setArgName("subApplicationMetricsTTL");
o.setRequired(false);
options.addOption(o);
// Options without an argument
// No need to set arg name since we do not need an argument here
o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
@ -220,6 +250,11 @@ public final class TimelineSchemaCreator {
" The name of the Application table\n");
usage.append("[-applicationMetricsTTL <Application Table Metrics TTL>]" +
" TTL for metrics in the Application table\n");
usage.append("[-subApplicationTableName <SubApplication Table Name>]" +
" The name of the SubApplication table\n");
usage.append("[-subApplicationMetricsTTL " +
" <SubApplication Table Metrics TTL>]" +
" TTL for metrics in the SubApplication table\n");
usage.append("[-skipExistingTable] Whether to skip existing" +
" hbase tables\n");
System.out.println(usage.toString());
@ -312,6 +347,15 @@ public final class TimelineSchemaCreator {
throw e;
}
}
try {
new SubApplicationTable().createTable(admin, hbaseConf);
} catch (IOException e) {
if (skipExisting) {
LOG.warn("Skip and continue on: " + e.getMessage());
} else {
throw e;
}
}
} finally {
if (conn != null) {
conn.close();

View File

@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
public class ApplicationTable extends BaseTable<ApplicationTable> {
/** application prefix. */
private static final String PREFIX =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".application";
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "application";
/** config param name that specifies the application table name. */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";

View File

@ -209,18 +209,18 @@ public class EntityRowKey {
Separator.EMPTY_BYTES);
}
byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix());
byte[] entityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix());
if (rowKey.getEntityId() == null) {
return Separator.QUALIFIERS.join(first, second, third, entityType,
enitityIdPrefix, Separator.EMPTY_BYTES);
entityIdPrefix, Separator.EMPTY_BYTES);
}
byte[] entityId = Separator.encode(rowKey.getEntityId(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);
byte[] fourth =
Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId);
Separator.QUALIFIERS.join(entityType, entityIdPrefix, entityId);
return Separator.QUALIFIERS.join(first, second, third, fourth);
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies fully qualified columns for the {@link SubApplicationTable}.
*/
public enum SubApplicationColumn implements Column<SubApplicationTable> {
/**
* Identifier for the sub application.
*/
ID(SubApplicationColumnFamily.INFO, "id"),
/**
* The type of sub application.
*/
TYPE(SubApplicationColumnFamily.INFO, "type"),
/**
* When the sub application was created.
*/
CREATED_TIME(SubApplicationColumnFamily.INFO, "created_time",
new LongConverter()),
/**
* The version of the flow that this sub application belongs to.
*/
FLOW_VERSION(SubApplicationColumnFamily.INFO, "flow_version");
private final ColumnHelper<SubApplicationTable> column;
private final ColumnFamily<SubApplicationTable> columnFamily;
private final String columnQualifier;
private final byte[] columnQualifierBytes;
SubApplicationColumn(ColumnFamily<SubApplicationTable> columnFamily,
String columnQualifier) {
this(columnFamily, columnQualifier, GenericConverter.getInstance());
}
SubApplicationColumn(ColumnFamily<SubApplicationTable> columnFamily,
String columnQualifier, ValueConverter converter) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
// Future-proof by ensuring the right column prefix hygiene.
this.columnQualifierBytes =
Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
this.column = new ColumnHelper<SubApplicationTable>(columnFamily,
converter);
}
public void store(byte[] rowKey,
TypedBufferedMutator<SubApplicationTable> tableMutator, Long timestamp,
Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue, attributes);
}
public Object readResult(Result result) throws IOException {
return column.readResult(result, columnQualifierBytes);
}
@Override
public byte[] getColumnQualifierBytes() {
return columnQualifierBytes.clone();
}
@Override
public byte[] getColumnFamilyBytes() {
return columnFamily.getBytes();
}
@Override
public ValueConverter getValueConverter() {
return column.getValueConverter();
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
* Represents the sub application table column families.
*/
public enum SubApplicationColumnFamily
implements ColumnFamily<SubApplicationTable> {
/**
* Info column family houses known columns, specifically ones included in
* columnfamily filters.
*/
INFO("i"),
/**
* Configurations are in a separate column family for two reasons:
* a) the size of the config values can be very large and
* b) we expect that config values
* are often separately accessed from other metrics and info columns.
*/
CONFIGS("c"),
/**
* Metrics have a separate column family, because they have a separate TTL.
*/
METRICS("m");
/**
* Byte representation of this column family.
*/
private final byte[] bytes;
/**
* @param value
* create a column family with this name. Must be lower case and
* without spaces.
*/
SubApplicationColumnFamily(String value) {
// column families should be lower case and not contain any spaces.
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
}
public byte[] getBytes() {
return Bytes.copy(bytes);
}
}

View File

@ -0,0 +1,250 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the sub app table.
*/
public enum SubApplicationColumnPrefix
implements ColumnPrefix<SubApplicationTable> {
/**
* To store TimelineEntity getIsRelatedToEntities values.
*/
IS_RELATED_TO(SubApplicationColumnFamily.INFO, "s"),
/**
* To store TimelineEntity getRelatesToEntities values.
*/
RELATES_TO(SubApplicationColumnFamily.INFO, "r"),
/**
* To store TimelineEntity info values.
*/
INFO(SubApplicationColumnFamily.INFO, "i"),
/**
* Lifecycle events for an entity.
*/
EVENT(SubApplicationColumnFamily.INFO, "e", true),
/**
* Config column stores configuration with config key as the column name.
*/
CONFIG(SubApplicationColumnFamily.CONFIGS, null),
/**
* Metrics are stored with the metric name as the column name.
*/
METRIC(SubApplicationColumnFamily.METRICS, null, new LongConverter());
private final ColumnHelper<SubApplicationTable> column;
private final ColumnFamily<SubApplicationTable> columnFamily;
/**
* Can be null for those cases where the provided column qualifier is the
* entire column name.
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
/**
* Private constructor, meant to be used by the enum definition.
*
* @param columnFamily that this column is stored in.
* @param columnPrefix for this column.
*/
SubApplicationColumnPrefix(ColumnFamily<SubApplicationTable> columnFamily,
String columnPrefix) {
this(columnFamily, columnPrefix, false, GenericConverter.getInstance());
}
SubApplicationColumnPrefix(ColumnFamily<SubApplicationTable> columnFamily,
String columnPrefix, boolean compondColQual) {
this(columnFamily, columnPrefix, compondColQual,
GenericConverter.getInstance());
}
SubApplicationColumnPrefix(ColumnFamily<SubApplicationTable> columnFamily,
String columnPrefix, ValueConverter converter) {
this(columnFamily, columnPrefix, false, converter);
}
/**
* Private constructor, meant to be used by the enum definition.
*
* @param columnFamily that this column is stored in.
* @param columnPrefix for this column.
* @param converter used to encode/decode values to be stored in HBase for
* this column prefix.
*/
SubApplicationColumnPrefix(ColumnFamily<SubApplicationTable> columnFamily,
String columnPrefix, boolean compondColQual, ValueConverter converter) {
column = new ColumnHelper<SubApplicationTable>(columnFamily, converter);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
if (columnPrefix == null) {
this.columnPrefixBytes = null;
} else {
// Future-proof by ensuring the right column prefix hygiene.
this.columnPrefixBytes =
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
}
/**
* @return the column name value
*/
public String getColumnPrefix() {
return columnPrefix;
}
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnFamilyBytes() {
return columnFamily.getBytes();
}
@Override
public ValueConverter getValueConverter() {
return column.getValueConverter();
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
* org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
*/
public void store(byte[] rowKey,
TypedBufferedMutator<SubApplicationTable> tableMutator, String qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
*/
public void store(byte[] rowKey,
TypedBufferedMutator<SubApplicationTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
*/
public Object readResult(Result result, String qualifier) throws IOException {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
return column.readResult(result, columnQualifier);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResults(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public <K> Map<K, Object> readResults(Result result,
KeyConverter<K> keyConverter) throws IOException {
return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public <K, V> NavigableMap<K, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes,
keyConverter);
}
}

View File

@ -0,0 +1,290 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
* Represents a rowkey for the sub app table.
*/
public class SubApplicationRowKey {
private final String subAppUserId;
private final String clusterId;
private final String entityType;
private final Long entityIdPrefix;
private final String entityId;
private final String userId;
private final SubApplicationRowKeyConverter subAppRowKeyConverter =
new SubApplicationRowKeyConverter();
public SubApplicationRowKey(String subAppUserId, String clusterId,
String entityType, Long entityIdPrefix, String entityId, String userId) {
this.subAppUserId = subAppUserId;
this.clusterId = clusterId;
this.entityType = entityType;
this.entityIdPrefix = entityIdPrefix;
this.entityId = entityId;
this.userId = userId;
}
public String getClusterId() {
return clusterId;
}
public String getSubAppUserId() {
return subAppUserId;
}
public String getEntityType() {
return entityType;
}
public String getEntityId() {
return entityId;
}
public Long getEntityIdPrefix() {
return entityIdPrefix;
}
public String getUserId() {
return userId;
}
/**
* Constructs a row key for the sub app table as follows:
* {@code subAppUserId!clusterId!entityType
* !entityPrefix!entityId!userId}.
* Typically used while querying a specific sub app.
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
* @return byte array with the row key.
*/
public byte[] getRowKey() {
return subAppRowKeyConverter.encode(this);
}
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey byte representation of row key.
* @return An <cite>SubApplicationRowKey</cite> object.
*/
public static SubApplicationRowKey parseRowKey(byte[] rowKey) {
return new SubApplicationRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the sub app table as follows:
* <p>
* {@code subAppUserId!clusterId!
* entityType!entityIdPrefix!entityId!userId}.
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that that the AM runs as.
*
* </p>
*
* @return String representation of row key.
*/
public String getRowKeyAsString() {
return subAppRowKeyConverter.encodeAsString(this);
}
/**
* Given the encoded row key as string, returns the row key as an object.
*
* @param encodedRowKey String representation of row key.
* @return A <cite>SubApplicationRowKey</cite> object.
*/
public static SubApplicationRowKey parseRowKeyFromString(
String encodedRowKey) {
return new SubApplicationRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* Encodes and decodes row key for sub app table.
* The row key is of the form :
* subAppUserId!clusterId!flowRunId!appId!entityType!entityId!userId
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
* <p>
*/
final private static class SubApplicationRowKeyConverter
implements KeyConverter<SubApplicationRowKey>,
KeyConverterToString<SubApplicationRowKey> {
private SubApplicationRowKeyConverter() {
}
/**
* sub app row key is of the form
* subAppUserId!clusterId!entityType!entityPrefix!entityId!userId
* w. each segment separated by !.
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
* The sizes below indicate sizes of each one of these
* segments in sequence. clusterId, subAppUserId, entityType,
* entityId and userId are strings.
* entity prefix is a long hence 8 bytes in size. Strings are
* variable in size (i.e. end whenever separator is encountered).
* This is used while decoding and helps in determining where to split.
*/
private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE};
/*
* (non-Javadoc)
*
* Encodes SubApplicationRowKey object into a byte array with each
* component/field in SubApplicationRowKey separated by
* Separator#QUALIFIERS.
* This leads to an sub app table row key of the form
* subAppUserId!clusterId!entityType!entityPrefix!entityId!userId
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
* If entityType in passed SubApplicationRowKey object is null (and the
* fields preceding it are not null i.e. clusterId, subAppUserId), this
* returns a row key prefix of the form subAppUserId!clusterId!
* If entityId in SubApplicationRowKey is null
* (other components are not null), this returns a row key prefix
* of the form subAppUserId!clusterId!entityType!
*
* @see org.apache.hadoop.yarn.server.timelineservice.storage.common
* .KeyConverter#encode(java.lang.Object)
*/
@Override
public byte[] encode(SubApplicationRowKey rowKey) {
byte[] subAppUser = Separator.encode(rowKey.getSubAppUserId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] cluster = Separator.encode(rowKey.getClusterId(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);
byte[] first = Separator.QUALIFIERS.join(subAppUser, cluster);
if (rowKey.getEntityType() == null) {
return first;
}
byte[] entityType = Separator.encode(rowKey.getEntityType(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
if (rowKey.getEntityIdPrefix() == null) {
return Separator.QUALIFIERS.join(first, entityType,
Separator.EMPTY_BYTES);
}
byte[] entityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix());
if (rowKey.getEntityId() == null) {
return Separator.QUALIFIERS.join(first, entityType, entityIdPrefix,
Separator.EMPTY_BYTES);
}
byte[] entityId = Separator.encode(rowKey.getEntityId(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);
byte[] userId = Separator.encode(rowKey.getUserId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] second = Separator.QUALIFIERS.join(entityType, entityIdPrefix,
entityId, userId);
return Separator.QUALIFIERS.join(first, second);
}
/*
* (non-Javadoc)
*
* Decodes a sub application row key of the form
* subAppUserId!clusterId!entityType!entityPrefix!entityId!userId
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
* represented in byte format
* and converts it into an SubApplicationRowKey object.
*
* @see org.apache.hadoop.yarn.server.timelineservice.storage.common
* .KeyConverter#decode(byte[])
*/
@Override
public SubApplicationRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 6) {
throw new IllegalArgumentException(
"the row key is not valid for " + "a sub app");
}
String subAppUserId =
Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String entityType = Separator.decode(Bytes.toString(rowKeyComponents[2]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
Long entityPrefixId = Bytes.toLong(rowKeyComponents[3]);
String entityId = Separator.decode(Bytes.toString(rowKeyComponents[4]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String userId =
Separator.decode(Bytes.toString(rowKeyComponents[5]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
return new SubApplicationRowKey(subAppUserId, clusterId, entityType,
entityPrefixId, entityId, userId);
}
@Override
public String encodeAsString(SubApplicationRowKey key) {
if (key.subAppUserId == null || key.clusterId == null
|| key.entityType == null || key.entityIdPrefix == null
|| key.entityId == null || key.userId == null) {
throw new IllegalArgumentException();
}
return TimelineReaderUtils.joinAndEscapeStrings(
new String[] {key.subAppUserId, key.clusterId, key.entityType,
key.entityIdPrefix.toString(), key.entityId, key.userId});
}
@Override
public SubApplicationRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 6) {
throw new IllegalArgumentException(
"Invalid row key for sub app table.");
}
Long entityIdPrefix = Long.valueOf(split.get(3));
return new SubApplicationRowKey(split.get(0), split.get(1),
split.get(2), entityIdPrefix, split.get(4), split.get(5));
}
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
/**
* Represents a partial rowkey without the entityId or without entityType and
* entityId for the sub application table.
*
*/
public class SubApplicationRowKeyPrefix extends SubApplicationRowKey
implements RowKeyPrefix<SubApplicationRowKey> {
/**
* Creates a prefix which generates the following rowKeyPrefixes for the sub
* application table:
* {@code subAppUserId!clusterId!entityType!entityPrefix!userId}.
*
* @param subAppUserId
* identifying the subApp User
* @param clusterId
* identifying the cluster
* @param entityType
* which entity type
* @param entityIdPrefix
* for entityId
* @param entityId
* for an entity
* @param userId
* for the user who runs the AM
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
*/
public SubApplicationRowKeyPrefix(String subAppUserId, String clusterId,
String entityType, Long entityIdPrefix, String entityId,
String userId) {
super(subAppUserId, clusterId, entityType, entityIdPrefix, entityId,
userId);
}
/**
* Creates a prefix which generates the following rowKeyPrefixes for the sub
* application table:
* {@code subAppUserId!clusterId!entityType!entityPrefix!entityId!userId}.
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
* @param clusterId
* identifying the cluster
* @param subAppUserId
* identifying the sub app user
* @param userId
* identifying the user who runs the AM
*/
public SubApplicationRowKeyPrefix(String clusterId, String subAppUserId,
String userId) {
this(subAppUserId, clusterId, null, null, null, userId);
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.
* RowKeyPrefix#getRowKeyPrefix()
*/
public byte[] getRowKeyPrefix() {
return super.getRowKey();
}
}

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
/**
* The sub application table has column families:
* info, config and metrics.
* Info stores information about a timeline entity object
* config stores configuration data of a timeline entity object
* metrics stores the metrics of a timeline entity object
*
* Example sub application table record:
*
* <pre>
* |-------------------------------------------------------------------------|
* | Row | Column Family | Column Family| Column Family|
* | key | info | metrics | config |
* |-------------------------------------------------------------------------|
* | subAppUserId! | id:entityId | metricId1: | configKey1: |
* | clusterId! | type:entityType | metricValue1 | configValue1 |
* | entityType! | | @timestamp1 | |
* | idPrefix!| | | | configKey2: |
* | entityId! | created_time: | metricId1: | configValue2 |
* | userId | 1392993084018 | metricValue2 | |
* | | | @timestamp2 | |
* | | i!infoKey: | | |
* | | infoValue | metricId1: | |
* | | | metricValue1 | |
* | | | @timestamp2 | |
* | | e!eventId=timestamp= | | |
* | | infoKey: | | |
* | | eventInfoValue | | |
* | | | | |
* | | r!relatesToKey: | | |
* | | id3=id4=id5 | | |
* | | | | |
* | | s!isRelatedToKey | | |
* | | id7=id9=id6 | | |
* | | | | |
* | | flowVersion: | | |
* | | versionValue | | |
* |-------------------------------------------------------------------------|
* </pre>
*/
public class SubApplicationTable extends BaseTable<SubApplicationTable> {
/** sub app prefix. */
private static final String PREFIX =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "subapplication";
/** config param name that specifies the subapplication table name. */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
/**
* config param name that specifies the TTL for metrics column family in
* subapplication table.
*/
private static final String METRICS_TTL_CONF_NAME = PREFIX
+ ".table.metrics.ttl";
/**
* config param name that specifies max-versions for
* metrics column family in subapplication table.
*/
private static final String METRICS_MAX_VERSIONS =
PREFIX + ".table.metrics.max-versions";
/** default value for subapplication table name. */
public static final String DEFAULT_TABLE_NAME =
"timelineservice.subapplication";
/** default TTL is 30 days for metrics timeseries. */
private static final int DEFAULT_METRICS_TTL = 2592000;
/** default max number of versions. */
private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000;
private static final Log LOG = LogFactory.getLog(
SubApplicationTable.class);
public SubApplicationTable() {
super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
* (org.apache.hadoop.hbase.client.Admin,
* org.apache.hadoop.conf.Configuration)
*/
public void createTable(Admin admin, Configuration hbaseConf)
throws IOException {
TableName table = getTableName(hbaseConf);
if (admin.tableExists(table)) {
// do not disable / delete existing table
// similar to the approach taken by map-reduce jobs when
// output directory exists
throw new IOException("Table " + table.getNameAsString()
+ " already exists.");
}
HTableDescriptor subAppTableDescp = new HTableDescriptor(table);
HColumnDescriptor infoCF =
new HColumnDescriptor(SubApplicationColumnFamily.INFO.getBytes());
infoCF.setBloomFilterType(BloomType.ROWCOL);
subAppTableDescp.addFamily(infoCF);
HColumnDescriptor configCF =
new HColumnDescriptor(SubApplicationColumnFamily.CONFIGS.getBytes());
configCF.setBloomFilterType(BloomType.ROWCOL);
configCF.setBlockCacheEnabled(true);
subAppTableDescp.addFamily(configCF);
HColumnDescriptor metricsCF =
new HColumnDescriptor(SubApplicationColumnFamily.METRICS.getBytes());
subAppTableDescp.addFamily(metricsCF);
metricsCF.setBlockCacheEnabled(true);
// always keep 1 version (the latest)
metricsCF.setMinVersions(1);
metricsCF.setMaxVersions(
hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS));
metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
DEFAULT_METRICS_TTL));
subAppTableDescp.setRegionSplitPolicyClassName(
"org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
subAppTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
admin.createTable(subAppTableDescp,
TimelineHBaseSchemaConstants.getUsernameSplits());
LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ admin.tableExists(table));
}
/**
* @param metricsTTL time to live parameter for the metricss in this table.
* @param hbaseConf configururation in which to set the metrics TTL config
* variable.
*/
public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication
* contains classes related to implementation for subapplication table.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -26,6 +26,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Test;
/**
* Unit tests for key converters for various tables' row keys.
*
*/
public class TestKeyConverters {
@Test

View File

@ -31,9 +31,14 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
import org.junit.Test;
/**
* Class to test the row key structures for various tables.
*
*/
public class TestRowKeys {
private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
@ -41,6 +46,7 @@ public class TestRowKeys {
.toBytes(QUALIFIER_SEP);
private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
private final static String USER = QUALIFIER_SEP + "user";
private final static String SUB_APP_USER = QUALIFIER_SEP + "subAppUser";
private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow"
+ QUALIFIER_SEP;
private final static Long FLOW_RUN_ID;
@ -247,4 +253,24 @@ public class TestRowKeys {
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testSubAppRowKey() {
TimelineEntity entity = new TimelineEntity();
entity.setId("entity1");
entity.setType("DAG");
entity.setIdPrefix(54321);
byte[] byteRowKey =
new SubApplicationRowKey(SUB_APP_USER, CLUSTER,
entity.getType(), entity.getIdPrefix(),
entity.getId(), USER).getRowKey();
SubApplicationRowKey rowKey = SubApplicationRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(SUB_APP_USER, rowKey.getSubAppUserId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
assertEquals(entity.getId(), rowKey.getEntityId());
assertEquals(USER, rowKey.getUserId());
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
import org.junit.Test;
/**
@ -38,6 +39,9 @@ public class TestRowKeysAsString {
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
private final static String USER =
TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
private final static String SUB_APP_USER =
TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "subAppUser";
private final static String FLOW_NAME =
"dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
@ -112,4 +116,29 @@ public class TestRowKeysAsString {
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
}
@Test(timeout = 10000)
public void testSubApplicationRowKey() {
char del = TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
char esc = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
String id = del + esc + "ent" + esc + del + "ity" + esc + del + esc + "id"
+ esc + del + esc;
String type = "entity" + esc + del + esc + "Type";
TimelineEntity entity = new TimelineEntity();
entity.setId(id);
entity.setType(type);
entity.setIdPrefix(54321);
String rowKeyAsString = new SubApplicationRowKey(SUB_APP_USER, CLUSTER,
entity.getType(), entity.getIdPrefix(), entity.getId(), USER)
.getRowKeyAsString();
SubApplicationRowKey rowKey = SubApplicationRowKey
.parseRowKeyFromString(rowKeyAsString);
assertEquals(SUB_APP_USER, rowKey.getSubAppUserId());
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
assertEquals(entity.getId(), rowKey.getEntityId());
assertEquals(USER, rowKey.getUserId());
}
}