YARN-5378. Accommodate app-id->cluster mapping (Sangjin Lee via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-01-17 20:05:47 +05:30
parent 7f54ac48c6
commit 71847ed44d
7 changed files with 271 additions and 125 deletions

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
@ -172,9 +172,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
FlowRunRowKey flowRunRowKey =
new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
if (event != null) {
AppToFlowRowKey appToFlowRowKey =
new AppToFlowRowKey(clusterId, appId);
onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId,
onApplicationCreated(flowRunRowKey, clusterId, appId, userId,
flowVersion, te, event.getTimestamp());
}
// if it's an application entity, store metrics
@ -193,18 +191,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
}
private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
AppToFlowRowKey appToFlowRowKey, String appId, String userId,
String flowVersion, TimelineEntity te, long appCreatedTimeStamp)
String clusterId, String appId, String userId, String flowVersion,
TimelineEntity te, long appCreatedTimeStamp)
throws IOException {
String flowName = flowRunRowKey.getFlowName();
Long flowRunId = flowRunRowKey.getFlowRunId();
// store in App to flow table
AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
byte[] rowKey = appToFlowRowKey.getRowKey();
AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId,
null, flowName);
AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId,
null, flowRunId);
AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null,
userId);
// store in flow run table
storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the app-to-flow table.
*/
public enum AppToFlowColumnPrefix implements ColumnPrefix<AppToFlowTable> {
/**
* The flow name.
*/
FLOW_NAME(AppToFlowColumnFamily.MAPPING, "flow_name"),
/**
* The flow run ID.
*/
FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
/**
* The user.
*/
USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");
private final ColumnHelper<AppToFlowTable> column;
private final ColumnFamily<AppToFlowTable> columnFamily;
private final String columnPrefix;
private final byte[] columnPrefixBytes;
private AppToFlowColumnPrefix(ColumnFamily<AppToFlowTable> columnFamily,
String columnPrefix) {
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
if (columnPrefix == null) {
this.columnPrefixBytes = null;
} else {
// Future-proof by ensuring the right column prefix hygiene.
this.columnPrefixBytes =
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnFamilyBytes() {
return columnFamily.getBytes();
}
@Override
public void store(byte[] rowKey,
TypedBufferedMutator<AppToFlowTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
@Override
public void store(byte[] rowKey,
TypedBufferedMutator<AppToFlowTable> tableMutator, String qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
@Override
public ValueConverter getValueConverter() {
return column.getValueConverter();
}
@Override
public Object readResult(Result result, String qualifier) throws IOException {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(columnPrefixBytes, qualifier);
return column.readResult(result, columnQualifier);
}
@Override
public <K> Map<K, Object> readResults(Result result,
KeyConverter<K> keyConverter)
throws IOException {
return column.readResults(result, columnPrefixBytes, keyConverter);
}
@Override
public <K, V> NavigableMap<K, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result,
KeyConverter<K> keyConverter) throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes,
keyConverter);
}
/**
* Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there
* is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
* if and only if {@code x.equals(y)} or {@code (x == y == null)}
*
* @param columnPrefix Name of the column to retrieve
* @return the corresponding {@link AppToFlowColumnPrefix} or null
*/
public static final AppToFlowColumnPrefix columnFor(String columnPrefix) {
// Match column based on value, assume column family matches.
for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) {
// Find a match based only on name.
if (afcp.columnPrefix.equals(columnPrefix)) {
return afcp;
}
}
// Default to null
return null;
}
/**
* Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there
* is no match. The following holds true:
* {@code columnFor(a,x) == columnFor(b,y)} if and only if
* {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
*
* @param columnFamily The columnFamily for which to retrieve the column.
* @param columnPrefix Name of the column to retrieve
* @return the corresponding {@link AppToFlowColumnPrefix} or null if both
* arguments don't match.
*/
public static final AppToFlowColumnPrefix columnFor(
AppToFlowColumnFamily columnFamily, String columnPrefix) {
// TODO: needs unit test to confirm and need to update javadoc to explain
// null prefix case.
for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) {
// Find a match based column family and on name.
if (afcp.columnFamily.equals(columnFamily)
&& (((columnPrefix == null) && (afcp.columnPrefix == null)) ||
(afcp.columnPrefix.equals(columnPrefix)))) {
return afcp;
}
}
// Default to null
return null;
}
}

View File

@ -17,41 +17,32 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
* Represents a rowkey for the app_flow table.
* Represents a row key for the app_flow table, which is the app id.
*/
public class AppToFlowRowKey {
private final String clusterId;
private final String appId;
private final KeyConverter<AppToFlowRowKey> appToFlowRowKeyConverter =
new AppToFlowRowKeyConverter();
private final KeyConverter<String> appIdKeyConverter =
new AppIdKeyConverter();
public AppToFlowRowKey(String clusterId, String appId) {
this.clusterId = clusterId;
public AppToFlowRowKey(String appId) {
this.appId = appId;
}
public String getClusterId() {
return clusterId;
}
public String getAppId() {
return appId;
}
/**
* Constructs a row key prefix for the app_flow table as follows:
* {@code clusterId!AppId}.
* Constructs a row key prefix for the app_flow table.
*
* @return byte array with the row key
*/
public byte[] getRowKey() {
return appToFlowRowKeyConverter.encode(this);
return appIdKeyConverter.encode(appId);
}
/**
@ -61,83 +52,7 @@ public class AppToFlowRowKey {
* @return an <cite>AppToFlowRowKey</cite> object.
*/
public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
return new AppToFlowRowKeyConverter().decode(rowKey);
}
/**
* Encodes and decodes row key for app_flow table. The row key is of the form
* clusterId!appId. clusterId is a string and appId is encoded/decoded using
* {@link AppIdKeyConverter}.
* <p>
*/
final private static class AppToFlowRowKeyConverter implements
KeyConverter<AppToFlowRowKey> {
private final KeyConverter<String> appIDKeyConverter =
new AppIdKeyConverter();
/**
* Intended for use in AppToFlowRowKey only.
*/
private AppToFlowRowKeyConverter() {
}
/**
* App to flow row key is of the form clusterId!appId with the 2 segments
* separated by !. The sizes below indicate sizes of both of these segments
* in sequence. clusterId is a string. appId is represented as 12 bytes w.
* cluster Timestamp part of appid taking 8 bytes(long) and seq id taking 4
* bytes(int). Strings are variable in size (i.e. end whenever separator is
* encountered). This is used while decoding and helps in determining where
* to split.
*/
private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
/*
* (non-Javadoc)
*
* Encodes AppToFlowRowKey object into a byte array with each
* component/field in AppToFlowRowKey separated by Separator#QUALIFIERS.
* This leads to an app to flow table row key of the form clusterId!appId
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common
* .KeyConverter#encode(java.lang.Object)
*/
@Override
public byte[] encode(AppToFlowRowKey rowKey) {
byte[] first =
Separator.encode(rowKey.getClusterId(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);
byte[] second = appIDKeyConverter.encode(rowKey.getAppId());
return Separator.QUALIFIERS.join(first, second);
}
/*
* (non-Javadoc)
*
* Decodes an app to flow row key of the form clusterId!appId represented
* in byte format and converts it into an AppToFlowRowKey object.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common
* .KeyConverter#decode(byte[])
*/
@Override
public AppToFlowRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 2) {
throw new IllegalArgumentException("the row key is not valid for "
+ "the app-to-flow table");
}
String clusterId =
Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String appId = appIDKeyConverter.decode(rowKeyComponents[1]);
return new AppToFlowRowKey(clusterId, appId);
}
String appId = new AppIdKeyConverter().decode(rowKey);
return new AppToFlowRowKey(appId);
}
}

View File

@ -41,21 +41,32 @@ import java.io.IOException;
* <pre>
* |--------------------------------------|
* | Row | Column Family |
* | key | info |
* | key | mapping |
* |--------------------------------------|
* | clusterId! | flowName: |
* | AppId | foo@daily_hive_report |
* | appId | flow_name!cluster1: |
* | | foo@daily_hive_report |
* | | |
* | | flowRunId: |
* | | flow_run_id!cluster1: |
* | | 1452828720457 |
* | | |
* | | user_id: |
* | | user_id!cluster1: |
* | | admin |
* | | |
* | | flow_name!cluster2: |
* | | bar@ad_hoc_query |
* | | |
* | | flow_run_id!cluster2: |
* | | 1452828498752 |
* | | |
* | | user_id!cluster2: |
* | | joe |
* | | |
* |--------------------------------------|
* </pre>
*
* It is possible (although unlikely) in a multi-cluster environment that there
* may be more than one applications for a given app id. Different clusters are
* recorded as different sets of columns.
*/
public class AppToFlowTable extends BaseTable<AppToFlowTable> {
/** app_flow prefix. */

View File

@ -318,8 +318,9 @@ public class ColumnHelper<T> {
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.
* @param qualifier for the remainder of the column. Any
* {@link Separator#QUALIFIERS} will be encoded in the qualifier.
* @param qualifier for the remainder of the column.
* {@link Separator#QUALIFIERS} is permissible in the qualifier
* as it is joined only with the column prefix bytes.
* @return fully sanitized column qualifier that is a combination of prefix
* and qualifier. If prefix is null, the result is simply the encoded
* qualifier without any separator.

View File

@ -17,18 +17,18 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import java.io.IOException;
/**
* The base class for reading timeline data from the HBase storage. This class
* provides basic support to validate and augment reader context.
@ -53,26 +53,38 @@ public abstract class AbstractTimelineStorageReader {
* Looks up flow context from AppToFlow table.
*
* @param appToFlowRowKey to identify Cluster and App Ids.
* @param clusterId the cluster id.
* @param hbaseConf HBase configuration.
* @param conn HBase Connection.
* @return flow context information.
* @throws IOException if any problem occurs while fetching flow information.
*/
protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
Configuration hbaseConf, Connection conn) throws IOException {
String clusterId, Configuration hbaseConf, Connection conn)
throws IOException {
byte[] rowKey = appToFlowRowKey.getRowKey();
Get get = new Get(rowKey);
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) {
return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
.toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
.longValue());
Object flowName =
AppToFlowColumnPrefix.FLOW_NAME.readResult(result, clusterId);
Object flowRunId =
AppToFlowColumnPrefix.FLOW_RUN_ID.readResult(result, clusterId);
Object userId =
AppToFlowColumnPrefix.USER_ID.readResult(result, clusterId);
if (flowName == null || userId == null || flowRunId == null) {
throw new NotFoundException(
"Unable to find the context flow name, and flow run id, "
+ "and user id for clusterId=" + clusterId
+ ", appId=" + appToFlowRowKey.getAppId());
}
return new FlowContext((String)userId, (String)flowName,
((Number)flowRunId).longValue());
} else {
throw new NotFoundException(
"Unable to find the context flow ID and flow run ID for clusterId="
+ appToFlowRowKey.getClusterId() + ", appId="
+ appToFlowRowKey.getAppId());
"Unable to find the context flow name, and flow run id, "
+ "and user id for clusterId=" + clusterId
+ ", appId=" + appToFlowRowKey.getAppId());
}
}
@ -102,9 +114,10 @@ public abstract class AbstractTimelineStorageReader {
|| context.getUserId() == null) {
// Get flow context information from AppToFlow table.
AppToFlowRowKey appToFlowRowKey =
new AppToFlowRowKey(context.getClusterId(), context.getAppId());
new AppToFlowRowKey(context.getAppId());
FlowContext flowContext =
lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
conn);
context.setFlowName(flowContext.flowName);
context.setFlowRunId(flowContext.flowRunId);
context.setUserId(flowContext.userId);

View File

@ -127,10 +127,8 @@ public class TestRowKeys {
*/
@Test
public void testAppToFlowRowKey() {
byte[] byteRowKey = new AppToFlowRowKey(CLUSTER,
APPLICATION_ID).getRowKey();
byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey();
AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
}