From 3429517a00d2647ecbc9079a8afcaf036c1160c6 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Tue, 17 Jan 2017 20:05:47 +0530 Subject: [PATCH] YARN-5378. Accommodate app-id->cluster mapping (Sangjin Lee via Varun Saxena) (cherry picked from commit 6baea680ba6e5df6f254ced086d6defa64fb99f0) --- .../storage/HBaseTimelineWriterImpl.java | 20 +- .../storage/apptoflow/AppToFlowColumn.java | 148 ------------- .../apptoflow/AppToFlowColumnPrefix.java | 206 ++++++++++++++++++ .../storage/apptoflow/AppToFlowRowKey.java | 101 +-------- .../storage/apptoflow/AppToFlowTable.java | 21 +- .../storage/common/ColumnHelper.java | 5 +- .../reader/AbstractTimelineStorageReader.java | 39 ++-- .../storage/common/TestRowKeys.java | 4 +- 8 files changed, 271 insertions(+), 273 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index c1c2a5ee307..dfd63bf08b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; @@ -172,9 +172,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements FlowRunRowKey flowRunRowKey = new FlowRunRowKey(clusterId, userId, flowName, flowRunId); if (event != null) { - AppToFlowRowKey appToFlowRowKey = - new AppToFlowRowKey(clusterId, appId); - onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId, + onApplicationCreated(flowRunRowKey, clusterId, appId, userId, flowVersion, te, event.getTimestamp()); } // if it's an application entity, store metrics @@ -193,18 +191,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements } private void onApplicationCreated(FlowRunRowKey flowRunRowKey, - AppToFlowRowKey appToFlowRowKey, String appId, String userId, - String flowVersion, TimelineEntity te, long appCreatedTimeStamp) + String clusterId, String appId, String userId, String flowVersion, + TimelineEntity te, long appCreatedTimeStamp) throws IOException { String flowName = flowRunRowKey.getFlowName(); Long flowRunId = flowRunRowKey.getFlowRunId(); // store in App to flow table + AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId); byte[] rowKey = appToFlowRowKey.getRowKey(); - AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); - AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); - AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); + AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId, + null, flowName); + AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId, + null, flowRunId); + AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null, + userId); // store in flow run table storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java deleted file mode 100644 index ff616336e17..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; - - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; - -import java.io.IOException; - -/** - * Identifies fully qualified columns for the {@link AppToFlowTable}. - */ -public enum AppToFlowColumn implements Column { - - /** - * The flow ID. - */ - FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"), - - /** - * The flow run ID. - */ - FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"), - - /** - * The user. - */ - USER_ID(AppToFlowColumnFamily.MAPPING, "user_id"); - - private final ColumnHelper column; - private final ColumnFamily columnFamily; - private final String columnQualifier; - private final byte[] columnQualifierBytes; - - AppToFlowColumn(ColumnFamily columnFamily, - String columnQualifier) { - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - // Future-proof by ensuring the right column prefix hygiene. - this.columnQualifierBytes = - Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); - this.column = new ColumnHelper(columnFamily); - } - - /** - * @return the column name value - */ - private String getColumnQualifier() { - return columnQualifier; - } - - @Override - public byte[] getColumnQualifierBytes() { - return columnQualifierBytes.clone(); - } - - public void store(byte[] rowKey, - TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue, Attribute... attributes) throws IOException { - column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue, attributes); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - @Override - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - - public Object readResult(Result result) throws IOException { - return column.readResult(result, columnQualifierBytes); - } - - /** - * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no - * match. The following holds true: {@code columnFor(x) == columnFor(y)} if - * and only if {@code x.equals(y)} or {@code (x == y == null)} - * - * @param columnQualifier Name of the column to retrieve - * @return the corresponding {@link AppToFlowColumn} or null - */ - public static final AppToFlowColumn columnFor(String columnQualifier) { - - // Match column based on value, assume column family matches. - for (AppToFlowColumn ec : AppToFlowColumn.values()) { - // Find a match based only on name. - if (ec.getColumnQualifier().equals(columnQualifier)) { - return ec; - } - } - - // Default to null - return null; - } - - /** - * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no - * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} - * if and only if {@code a.equals(b) & x.equals(y)} or - * {@code (x == y == null)} - * - * @param columnFamily The columnFamily for which to retrieve the column. - * @param name Name of the column to retrieve - * @return the corresponding {@link AppToFlowColumn} or null if both arguments - * don't match. - */ - public static final AppToFlowColumn columnFor( - AppToFlowColumnFamily columnFamily, String name) { - - for (AppToFlowColumn ec : AppToFlowColumn.values()) { - // Find a match based column family and on name. - if (ec.columnFamily.equals(columnFamily) - && ec.getColumnQualifier().equals(name)) { - return ec; - } - } - - // Default to null - return null; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java new file mode 100644 index 00000000000..f1e44956087 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; + +/** + * Identifies partially qualified columns for the app-to-flow table. + */ +public enum AppToFlowColumnPrefix implements ColumnPrefix { + + /** + * The flow name. + */ + FLOW_NAME(AppToFlowColumnFamily.MAPPING, "flow_name"), + + /** + * The flow run ID. + */ + FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"), + + /** + * The user. + */ + USER_ID(AppToFlowColumnFamily.MAPPING, "user_id"); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private AppToFlowColumnPrefix(ColumnFamily columnFamily, + String columnPrefix) { + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = + Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); + } + this.column = new ColumnHelper(columnFamily); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = getColumnPrefixBytes(qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } + + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = getColumnPrefixBytes(qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + + @Override + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + @Override + public Map readResults(Result result, + KeyConverter keyConverter) + throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); + } + + @Override + public NavigableMap> + readResultsWithTimestamps(Result result, + KeyConverter keyConverter) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes, + keyConverter); + } + + /** + * Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there + * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} + * if and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link AppToFlowColumnPrefix} or null + */ + public static final AppToFlowColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) { + // Find a match based only on name. + if (afcp.columnPrefix.equals(columnPrefix)) { + return afcp; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there + * is no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link AppToFlowColumnPrefix} or null if both + * arguments don't match. + */ + public static final AppToFlowColumnPrefix columnFor( + AppToFlowColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) { + // Find a match based column family and on name. + if (afcp.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (afcp.columnPrefix == null)) || + (afcp.columnPrefix.equals(columnPrefix)))) { + return afcp; + } + } + + // Default to null + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index 8df440788de..146c47520d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -17,41 +17,32 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; /** - * Represents a rowkey for the app_flow table. + * Represents a row key for the app_flow table, which is the app id. */ public class AppToFlowRowKey { - private final String clusterId; private final String appId; - private final KeyConverter appToFlowRowKeyConverter = - new AppToFlowRowKeyConverter(); + private final KeyConverter appIdKeyConverter = + new AppIdKeyConverter(); - public AppToFlowRowKey(String clusterId, String appId) { - this.clusterId = clusterId; + public AppToFlowRowKey(String appId) { this.appId = appId; } - public String getClusterId() { - return clusterId; - } - public String getAppId() { return appId; } /** - * Constructs a row key prefix for the app_flow table as follows: - * {@code clusterId!AppId}. + * Constructs a row key prefix for the app_flow table. * * @return byte array with the row key */ public byte[] getRowKey() { - return appToFlowRowKeyConverter.encode(this); + return appIdKeyConverter.encode(appId); } /** @@ -61,83 +52,7 @@ public class AppToFlowRowKey { * @return an AppToFlowRowKey object. */ public static AppToFlowRowKey parseRowKey(byte[] rowKey) { - return new AppToFlowRowKeyConverter().decode(rowKey); - } - - /** - * Encodes and decodes row key for app_flow table. The row key is of the form - * clusterId!appId. clusterId is a string and appId is encoded/decoded using - * {@link AppIdKeyConverter}. - *

- */ - final private static class AppToFlowRowKeyConverter implements - KeyConverter { - - private final KeyConverter appIDKeyConverter = - new AppIdKeyConverter(); - - /** - * Intended for use in AppToFlowRowKey only. - */ - private AppToFlowRowKeyConverter() { - } - - - /** - * App to flow row key is of the form clusterId!appId with the 2 segments - * separated by !. The sizes below indicate sizes of both of these segments - * in sequence. clusterId is a string. appId is represented as 12 bytes w. - * cluster Timestamp part of appid taking 8 bytes(long) and seq id taking 4 - * bytes(int). Strings are variable in size (i.e. end whenever separator is - * encountered). This is used while decoding and helps in determining where - * to split. - */ - private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, - Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT }; - - /* - * (non-Javadoc) - * - * Encodes AppToFlowRowKey object into a byte array with each - * component/field in AppToFlowRowKey separated by Separator#QUALIFIERS. - * This leads to an app to flow table row key of the form clusterId!appId - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common - * .KeyConverter#encode(java.lang.Object) - */ - @Override - public byte[] encode(AppToFlowRowKey rowKey) { - byte[] first = - Separator.encode(rowKey.getClusterId(), Separator.SPACE, - Separator.TAB, Separator.QUALIFIERS); - byte[] second = appIDKeyConverter.encode(rowKey.getAppId()); - return Separator.QUALIFIERS.join(first, second); - } - - /* - * (non-Javadoc) - * - * Decodes an app to flow row key of the form clusterId!appId represented - * in byte format and converts it into an AppToFlowRowKey object. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common - * .KeyConverter#decode(byte[]) - */ - @Override - public AppToFlowRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 2) { - throw new IllegalArgumentException("the row key is not valid for " - + "the app-to-flow table"); - } - String clusterId = - Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String appId = appIDKeyConverter.decode(rowKeyComponents[1]); - return new AppToFlowRowKey(clusterId, appId); - } + String appId = new AppIdKeyConverter().decode(rowKey); + return new AppToFlowRowKey(appId); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java index 301cf997026..583ee049207 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java @@ -41,21 +41,32 @@ import java.io.IOException; *

  * |--------------------------------------|
  * |  Row       | Column Family           |
- * |  key       | info                    |
+ * |  key       | mapping                 |
  * |--------------------------------------|
- * | clusterId! | flowName:               |
- * | AppId      | foo@daily_hive_report   |
+ * | appId      | flow_name!cluster1:     |
+ * |            | foo@daily_hive_report   |
  * |            |                         |
- * |            | flowRunId:              |
+ * |            | flow_run_id!cluster1:   |
  * |            | 1452828720457           |
  * |            |                         |
- * |            | user_id:                |
+ * |            | user_id!cluster1:       |
  * |            | admin                   |
  * |            |                         |
+ * |            | flow_name!cluster2:     |
+ * |            | bar@ad_hoc_query        |
  * |            |                         |
+ * |            | flow_run_id!cluster2:   |
+ * |            | 1452828498752           |
+ * |            |                         |
+ * |            | user_id!cluster2:       |
+ * |            | joe                     |
  * |            |                         |
  * |--------------------------------------|
  * 
+ * + * It is possible (although unlikely) in a multi-cluster environment that there + * may be more than one applications for a given app id. Different clusters are + * recorded as different sets of columns. */ public class AppToFlowTable extends BaseTable { /** app_flow prefix. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index be55db50a2b..b9815eb0911 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -316,8 +316,9 @@ public class ColumnHelper { /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. - * @param qualifier for the remainder of the column. Any - * {@link Separator#QUALIFIERS} will be encoded in the qualifier. + * @param qualifier for the remainder of the column. + * {@link Separator#QUALIFIERS} is permissible in the qualifier + * as it is joined only with the column prefix bytes. * @return fully sanitized column qualifier that is a combination of prefix * and qualifier. If prefix is null, the result is simply the encoded * qualifier without any separator. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java index fcd8320d552..5bacf66fb45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java @@ -17,18 +17,18 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.webapp.NotFoundException; -import java.io.IOException; - /** * The base class for reading timeline data from the HBase storage. This class * provides basic support to validate and augment reader context. @@ -53,26 +53,38 @@ public abstract class AbstractTimelineStorageReader { * Looks up flow context from AppToFlow table. * * @param appToFlowRowKey to identify Cluster and App Ids. + * @param clusterId the cluster id. * @param hbaseConf HBase configuration. * @param conn HBase Connection. * @return flow context information. * @throws IOException if any problem occurs while fetching flow information. */ protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, - Configuration hbaseConf, Connection conn) throws IOException { + String clusterId, Configuration hbaseConf, Connection conn) + throws IOException { byte[] rowKey = appToFlowRowKey.getRowKey(); Get get = new Get(rowKey); Result result = appToFlowTable.getResult(hbaseConf, conn, get); if (result != null && !result.isEmpty()) { - return new FlowContext(AppToFlowColumn.USER_ID.readResult(result) - .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)) - .longValue()); + Object flowName = + AppToFlowColumnPrefix.FLOW_NAME.readResult(result, clusterId); + Object flowRunId = + AppToFlowColumnPrefix.FLOW_RUN_ID.readResult(result, clusterId); + Object userId = + AppToFlowColumnPrefix.USER_ID.readResult(result, clusterId); + if (flowName == null || userId == null || flowRunId == null) { + throw new NotFoundException( + "Unable to find the context flow name, and flow run id, " + + "and user id for clusterId=" + clusterId + + ", appId=" + appToFlowRowKey.getAppId()); + } + return new FlowContext((String)userId, (String)flowName, + ((Number)flowRunId).longValue()); } else { throw new NotFoundException( - "Unable to find the context flow ID and flow run ID for clusterId=" - + appToFlowRowKey.getClusterId() + ", appId=" - + appToFlowRowKey.getAppId()); + "Unable to find the context flow name, and flow run id, " + + "and user id for clusterId=" + clusterId + + ", appId=" + appToFlowRowKey.getAppId()); } } @@ -102,9 +114,10 @@ public abstract class AbstractTimelineStorageReader { || context.getUserId() == null) { // Get flow context information from AppToFlow table. AppToFlowRowKey appToFlowRowKey = - new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + new AppToFlowRowKey(context.getAppId()); FlowContext flowContext = - lookupFlowContext(appToFlowRowKey, hbaseConf, conn); + lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf, + conn); context.setFlowName(flowContext.flowName); context.setFlowRunId(flowContext.flowRunId); context.setUserId(flowContext.userId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 7560f3345dc..cbd2273dc26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -127,10 +127,8 @@ public class TestRowKeys { */ @Test public void testAppToFlowRowKey() { - byte[] byteRowKey = new AppToFlowRowKey(CLUSTER, - APPLICATION_ID).getRowKey(); + byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey(); AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(APPLICATION_ID, rowKey.getAppId()); }