YARN-3649. Allow configurable prefix for hbase table names like prod, exp, test etc (Vrushali C via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-10-27 16:01:45 +05:30
parent 78b7e070d8
commit 9bb5cc3d49
11 changed files with 280 additions and 72 deletions

View File

@ -2099,7 +2099,7 @@ public static boolean isAclEnabled(Configuration conf) {
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir"; TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir";
/** /**
* Settings for timeline service v2.0 * Settings for timeline service v2.0.
*/ */
public static final String TIMELINE_SERVICE_WRITER_CLASS = public static final String TIMELINE_SERVICE_WRITER_CLASS =
TIMELINE_SERVICE_PREFIX + "writer.class"; TIMELINE_SERVICE_PREFIX + "writer.class";
@ -2115,6 +2115,17 @@ public static boolean isAclEnabled(Configuration conf) {
"org.apache.hadoop.yarn.server.timelineservice" + "org.apache.hadoop.yarn.server.timelineservice" +
".storage.HBaseTimelineReaderImpl"; ".storage.HBaseTimelineReaderImpl";
/**
* default schema prefix for hbase tables.
*/
public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX =
"prod.";
/**
* config param name to override schema prefix.
*/
public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME =
TIMELINE_SERVICE_PREFIX + "hbase-schema.prefix";
/** The setting that controls how often the timeline collector flushes the /** The setting that controls how often the timeline collector flushes the
* timeline writer. * timeline writer.

View File

@ -2341,6 +2341,18 @@
<value>259200000</value> <value>259200000</value>
</property> </property>
<property>
<description>
The value of this parameter sets the prefix for all tables that are part of
timeline service in the hbase storage schema. It can be set to "dev."
or "staging." if it is to be used for development or staging instances.
This way the data in production tables stays in a separate set of tables
prefixed by "prod.".
</description>
<name>yarn.timeline-service.hbase-schema.prefix</name>
<value>prod.</value>
</property>
<!-- Shared Cache Configuration --> <!-- Shared Cache Configuration -->
<property> <property>

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
* Unit tests for checking different schema prefixes.
*/
public class TestHBaseTimelineStorageSchema {
private static HBaseTestingUtility util;
@BeforeClass
public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility();
util.startMiniCluster();
}
private static void createSchema(Configuration conf) throws IOException {
TimelineSchemaCreator.createAllTables(conf, false);
}
@Test
public void createWithDefaultPrefix() throws IOException {
Configuration hbaseConf = util.getConfiguration();
createSchema(hbaseConf);
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin();
TableName entityTableName = BaseTable.getTableName(hbaseConf,
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
assertTrue(admin.tableExists(entityTableName));
assertTrue(entityTableName.getNameAsString().startsWith(
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX));
Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
assertNotNull(entityTable);
TableName flowRunTableName = BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
assertTrue(admin.tableExists(flowRunTableName));
assertTrue(flowRunTableName.getNameAsString().startsWith(
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX));
Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
assertNotNull(flowRunTable);
}
@Test
public void createWithSetPrefix() throws IOException {
Configuration hbaseConf = util.getConfiguration();
String prefix = "unit-test.";
hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
prefix);
createSchema(hbaseConf);
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin();
TableName entityTableName = BaseTable.getTableName(hbaseConf,
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
assertTrue(admin.tableExists(entityTableName));
assertTrue(entityTableName.getNameAsString().startsWith(prefix));
Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
assertNotNull(entityTable);
TableName flowRunTableName = BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
assertTrue(admin.tableExists(flowRunTableName));
assertTrue(flowRunTableName.getNameAsString().startsWith(prefix));
Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
assertNotNull(flowRunTable);
// create another set with a diff prefix
hbaseConf
.unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME);
prefix = "yet-another-unit-test.";
hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
prefix);
createSchema(hbaseConf);
entityTableName = BaseTable.getTableName(hbaseConf,
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
assertTrue(admin.tableExists(entityTableName));
assertTrue(entityTableName.getNameAsString().startsWith(prefix));
entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
assertNotNull(entityTable);
flowRunTableName = BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
assertTrue(admin.tableExists(flowRunTableName));
assertTrue(flowRunTableName.getNameAsString().startsWith(prefix));
flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
assertNotNull(flowRunTable);
hbaseConf
.unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME);
}
}

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -51,6 +50,7 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -155,8 +155,9 @@ public void testWriteFlowRunMinMax() throws Exception {
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
// check in flow activity table // check in flow activity table
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
FlowActivityTable.DEFAULT_TABLE_NAME));
byte[] startRow = byte[] startRow =
new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey(); new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey();
Get g = new Get(startRow); Get g = new Get(startRow);
@ -286,8 +287,9 @@ private void checkFlowActivityTable(String cluster, String user, String flow,
.getRowKey(); .getRowKey();
s.setStopRow(stopRow); s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
FlowActivityTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s); ResultScanner scanner = table1.getScanner(s);
int rowCount = 0; int rowCount = 0;
for (Result result : scanner) { for (Result result : scanner) {
@ -425,13 +427,13 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user,
new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey(); new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey();
s.setStartRow(startRow); s.setStartRow(startRow);
String clusterStop = cluster + "1"; String clusterStop = cluster + "1";
byte[] stopRow = byte[] stopRow = new FlowActivityRowKey(clusterStop, appCreatedTime, user,
new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow) flow).getRowKey();
.getRowKey();
s.setStopRow(stopRow); s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
FlowActivityTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s); ResultScanner scanner = table1.getScanner(s);
int rowCount = 0; int rowCount = 0;
for (Result result : scanner) { for (Result result : scanner) {

View File

@ -61,8 +61,8 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -94,8 +94,8 @@ private static void createSchema() throws IOException {
@Test @Test
public void checkCoProcessorOff() throws IOException, InterruptedException { public void checkCoProcessorOff() throws IOException, InterruptedException {
Configuration hbaseConf = util.getConfiguration(); Configuration hbaseConf = util.getConfiguration();
TableName table = TableName.valueOf(hbaseConf.get( TableName table = BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
Connection conn = null; Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf); conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin(); Admin admin = conn.getAdmin();
@ -109,14 +109,14 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) { for (Region region : regions) {
assertTrue(HBaseTimelineStorageUtils.isFlowRunTable( assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
region.getRegionInfo(), hbaseConf)); hbaseConf));
} }
} }
table = TableName.valueOf(hbaseConf.get( table = BaseTable.getTableName(hbaseConf,
FlowActivityTable.TABLE_NAME_CONF_NAME, FlowActivityTable.TABLE_NAME_CONF_NAME,
FlowActivityTable.DEFAULT_TABLE_NAME)); FlowActivityTable.DEFAULT_TABLE_NAME);
if (admin.tableExists(table)) { if (admin.tableExists(table)) {
// check the regions. // check the regions.
// check in flow activity table // check in flow activity table
@ -124,14 +124,13 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) { for (Region region : regions) {
assertFalse(HBaseTimelineStorageUtils.isFlowRunTable( assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
region.getRegionInfo(), hbaseConf)); hbaseConf));
} }
} }
table = TableName.valueOf(hbaseConf.get( table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME,
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
EntityTable.DEFAULT_TABLE_NAME));
if (admin.tableExists(table)) { if (admin.tableExists(table)) {
// check the regions. // check the regions.
// check in entity run table // check in entity run table
@ -139,8 +138,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) { for (Region region : regions) {
assertFalse(HBaseTimelineStorageUtils.isFlowRunTable( assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
region.getRegionInfo(), hbaseConf)); hbaseConf));
} }
} }
} }
@ -220,8 +219,8 @@ public void testWriteFlowRunMinMax() throws Exception {
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table // check in flow run table
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(BaseTable.getTableName(c1,
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max // scan the table and see that we get back the right min and max
// timestamps // timestamps
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
@ -356,24 +355,24 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
/* /*
* checks the batch limits on a scan * checks the batch limits on a scan
*/ */
void checkFlowRunTableBatchLimit(String cluster, String user, void checkFlowRunTableBatchLimit(String cluster, String user, String flow,
String flow, long runid, Configuration c1) throws IOException { long runid, Configuration c1) throws IOException {
Scan s = new Scan(); Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes()); s.addFamily(FlowRunColumnFamily.INFO.getBytes());
byte[] startRow = byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid)
new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); .getRowKey();
s.setStartRow(startRow); s.setStartRow(startRow);
// set a batch limit // set a batch limit
int batchLimit = 2; int batchLimit = 2;
s.setBatch(batchLimit); s.setBatch(batchLimit);
String clusterStop = cluster + "1"; String clusterStop = cluster + "1";
byte[] stopRow = byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid)
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); .getRowKey();
s.setStopRow(stopRow); s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn Table table1 = conn.getTable(BaseTable.getTableName(c1,
.getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s); ResultScanner scanner = table1.getScanner(s);
int loopCount = 0; int loopCount = 0;
@ -517,8 +516,8 @@ private void checkFlowRunTable(String cluster, String user, String flow,
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow); s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(BaseTable.getTableName(c1,
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s); ResultScanner scanner = table1.getScanner(s);
int rowCount = 0; int rowCount = 0;
@ -782,8 +781,8 @@ private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
boolean checkMax) throws IOException { boolean checkMax) throws IOException {
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table // check in flow run table
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(BaseTable.getTableName(c1,
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max // scan the table and see that we get back the right min and max
// timestamps // timestamps
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();

View File

@ -35,7 +35,6 @@
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -53,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
@ -69,8 +69,8 @@ public class TestHBaseStorageFlowRunCompaction {
private static HBaseTestingUtility util; private static HBaseTestingUtility util;
private static final String METRIC_1 = "MAP_SLOT_MILLIS"; private static final String METRIC1 = "MAP_SLOT_MILLIS";
private static final String METRIC_2 = "HDFS_BYTES_READ"; private static final String METRIC2 = "HDFS_BYTES_READ";
private final byte[] aRowKey = Bytes.toBytes("a"); private final byte[] aRowKey = Bytes.toBytes("a");
private final byte[] aFamily = Bytes.toBytes("family"); private final byte[] aFamily = Bytes.toBytes("family");
@ -89,8 +89,9 @@ private static void createSchema() throws IOException {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
} }
/** Writes non numeric data into flow run table /**
* reads it back. * writes non numeric data into flow run table.
* reads it back
* *
* @throws Exception * @throws Exception
*/ */
@ -106,11 +107,10 @@ public void testWriteNonNumericData() throws Exception {
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
valueBytes); valueBytes);
Configuration hbaseConf = util.getConfiguration(); Configuration hbaseConf = util.getConfiguration();
TableName table = TableName.valueOf(hbaseConf.get(
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null; Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf); conn = ConnectionFactory.createConnection(hbaseConf);
Table flowRunTable = conn.getTable(table); Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
flowRunTable.put(p); flowRunTable.put(p);
Get g = new Get(rowKeyBytes); Get g = new Get(rowKeyBytes);
@ -156,11 +156,10 @@ public void testWriteScanBatchLimit() throws Exception {
value4Bytes); value4Bytes);
Configuration hbaseConf = util.getConfiguration(); Configuration hbaseConf = util.getConfiguration();
TableName table = TableName.valueOf(hbaseConf.get(
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null; Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf); conn = ConnectionFactory.createConnection(hbaseConf);
Table flowRunTable = conn.getTable(table); Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
flowRunTable.put(p); flowRunTable.put(p);
String rowKey2 = "nonNumericRowKey2"; String rowKey2 = "nonNumericRowKey2";
@ -262,7 +261,6 @@ public void testWriteScanBatchLimit() throws Exception {
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
// we expect all back in one next call // we expect all back in one next call
assertEquals(4, values.size()); assertEquals(4, values.size());
System.out.println(" values size " + values.size() + " " + batchLimit);
rowCount++; rowCount++;
} }
// should get back 1 row with each invocation // should get back 1 row with each invocation
@ -321,10 +319,11 @@ public void testWriteFlowRunCompaction() throws Exception {
} }
// check in flow run table // check in flow run table
HRegionServer server = util.getRSForFirstRegionInTable(TableName HRegionServer server = util.getRSForFirstRegionInTable(
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME,
List<Region> regions = server.getOnlineRegions(TableName FlowRunTable.DEFAULT_TABLE_NAME));
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); List<Region> regions = server.getOnlineRegions(BaseTable.getTableName(c1,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!", assertTrue("Didn't find any regions for primary table!",
regions.size() > 0); regions.size() > 0);
// flush and compact all the regions of the primary table // flush and compact all the regions of the primary table
@ -349,8 +348,8 @@ private void checkFlowRunTable(String cluster, String user, String flow,
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow); s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(BaseTable.getTableName(c1,
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s); ResultScanner scanner = table1.getScanner(s);
int rowCount = 0; int rowCount = 0;
@ -364,13 +363,13 @@ private void checkFlowRunTable(String cluster, String user, String flow,
rowCount++; rowCount++;
// check metric1 // check metric1
byte[] q = ColumnHelper.getColumnQualifier( byte[] q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1); FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1);
assertTrue(values.containsKey(q)); assertTrue(values.containsKey(q));
assertEquals(141, Bytes.toLong(values.get(q))); assertEquals(141, Bytes.toLong(values.get(q)));
// check metric2 // check metric2
q = ColumnHelper.getColumnQualifier( q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2); FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2);
assertTrue(values.containsKey(q)); assertTrue(values.containsKey(q));
assertEquals(57, Bytes.toLong(values.get(q))); assertEquals(57, Bytes.toLong(values.get(q)));
} }
@ -587,9 +586,9 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
long cellTsFinalStart = 10001120L; long cellTsFinalStart = 10001120L;
long cellTsFinal = cellTsFinalStart; long cellTsFinal = cellTsFinalStart;
long cellTsFinalStartNotExpire = long cellTsFinalStartNotExpire = TimestampGenerator
TimestampGenerator.getSupplementedTimestamp( .getSupplementedTimestamp(System.currentTimeMillis(),
System.currentTimeMillis(), "application_10266666661166_118821"); "application_10266666661166_118821");
long cellTsFinalNotExpire = cellTsFinalStartNotExpire; long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
long cellTsNotFinalStart = currentTimestamp - 5; long cellTsNotFinalStart = currentTimestamp - 5;

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/** /**
* Implements behavior common to tables used in the timeline service storage. It * Implements behavior common to tables used in the timeline service storage. It
@ -114,16 +115,42 @@ public Result getResult(Configuration hbaseConf, Connection conn, Get get)
} }
/** /**
* Get the table name for this table. * Get the table name for the input table.
* *
* @param hbaseConf HBase configuration from which table name will be fetched. * @param conf HBase configuration from which table name will be fetched.
* @param tableName name of the table to be fetched
* @return A {@link TableName} object. * @return A {@link TableName} object.
*/ */
public TableName getTableName(Configuration hbaseConf) { public static TableName getTableName(Configuration conf, String tableName) {
TableName table = String tableSchemaPrefix = conf.get(
TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName)); YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
return table; YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX);
return TableName.valueOf(tableSchemaPrefix + tableName);
}
/**
* Get the table name for this table.
*
* @param conf HBase configuration from which table name will be fetched.
* @return A {@link TableName} object.
*/
public TableName getTableName(Configuration conf) {
String tableName = conf.get(tableNameConfName, defaultTableName);
return getTableName(conf, tableName);
}
/**
* Get the table name based on the input config parameters.
*
* @param conf HBase configuration from which table name will be fetched.
* @param tableNameInConf the table name parameter in conf.
* @param defaultTableName the default table name.
* @return A {@link TableName} object.
*/
public static TableName getTableName(Configuration conf,
String tableNameInConf, String defaultTableName) {
String tableName = conf.get(tableNameInConf, defaultTableName);
return getTableName(conf, tableName);
} }
/** /**

View File

@ -72,7 +72,7 @@ public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) { if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
this.region = env.getRegion(); this.region = env.getRegion();
isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable( isFlowRunRegion = FlowRunTable.isFlowRunTable(
region.getRegionInfo(), env.getConfiguration()); region.getRegionInfo(), env.getConfiguration());
} }
} }

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
@ -139,4 +140,23 @@ public void createTable(Admin admin, Configuration hbaseConf)
LOG.info("Status of table creation for " + table.getNameAsString() + "=" LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ admin.tableExists(table)); + admin.tableExists(table));
} }
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
Configuration conf) {
String regionTableName = hRegionInfo.getTable().getNameAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("regionTableName=" + regionTableName);
}
String flowRunTableName = BaseTable.getTableName(conf,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)
.getNameAsString();
if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" table is the flow run table!! "
+ flowRunTableName);
}
return true;
}
return false;
}
} }

View File

@ -371,5 +371,4 @@ public static boolean isIntegralValue(Object obj) {
return (obj instanceof Short) || (obj instanceof Integer) || return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long); (obj instanceof Long);
} }
} }

View File

@ -127,6 +127,7 @@ New configuration parameters that are introduced with v.2 are marked bold.
| **`yarn.timeline-service.writer.class`** | The class for the backend storage writer. Defaults to HBase storage writer. | | **`yarn.timeline-service.writer.class`** | The class for the backend storage writer. Defaults to HBase storage writer. |
| **`yarn.timeline-service.reader.class`** | The class for the backend storage reader. Defaults to HBase storage reader. | | **`yarn.timeline-service.reader.class`** | The class for the backend storage reader. Defaults to HBase storage reader. |
| **`yarn.system-metrics-publisher.enabled`** | The setting that controls whether yarn system metrics is published on the Timeline service or not by RM And NM. Defaults to `false`. | | **`yarn.system-metrics-publisher.enabled`** | The setting that controls whether yarn system metrics is published on the Timeline service or not by RM And NM. Defaults to `false`. |
| **`yarn.timeline-service.schema.prefix`** | The schema prefix for hbase tables. Defaults to "prod.". |
#### Advanced configuration #### Advanced configuration
@ -187,8 +188,9 @@ Finally, run the schema creator tool to create the necessary tables:
The `TimelineSchemaCreator` tool supports a few options that may come handy especially when you The `TimelineSchemaCreator` tool supports a few options that may come handy especially when you
are testing. For example, you can use `-skipExistingTable` (`-s` for short) to skip existing tables are testing. For example, you can use `-skipExistingTable` (`-s` for short) to skip existing tables
and continue to create other tables rather than failing the schema creation. When no option or '-help' and continue to create other tables rather than failing the schema creation. By default, the tables
('-h' for short) is provided, the command usage is printed. will have a schema prefix of "prod.". When no option or '-help' ('-h' for short) is provided, the
command usage is printed.
#### Enabling Timeline Service v.2 #### Enabling Timeline Service v.2
Following are the basic configurations to start Timeline service v.2: Following are the basic configurations to start Timeline service v.2: