YARN-3649. Allow configurable prefix for hbase table names like prod, exp, test etc (Vrushali C via Varun Saxena)
This commit is contained in:
parent
1d7fc52578
commit
d19726ec37
|
@ -2189,7 +2189,7 @@ public class YarnConfiguration extends Configuration {
|
|||
+ "entity-file.fs-support-append";
|
||||
|
||||
/**
|
||||
* Settings for timeline service v2.0
|
||||
* Settings for timeline service v2.0.
|
||||
*/
|
||||
public static final String TIMELINE_SERVICE_WRITER_CLASS =
|
||||
TIMELINE_SERVICE_PREFIX + "writer.class";
|
||||
|
@ -2197,6 +2197,18 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String TIMELINE_SERVICE_READER_CLASS =
|
||||
TIMELINE_SERVICE_PREFIX + "reader.class";
|
||||
|
||||
/**
|
||||
* default schema prefix for hbase tables.
|
||||
*/
|
||||
public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX =
|
||||
"prod.";
|
||||
|
||||
/**
|
||||
* config param name to override schema prefix.
|
||||
*/
|
||||
public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME =
|
||||
TIMELINE_SERVICE_PREFIX + "hbase-schema.prefix";
|
||||
|
||||
/** The setting that controls how often the timeline collector flushes the
|
||||
* timeline writer.
|
||||
*/
|
||||
|
|
|
@ -2354,6 +2354,18 @@
|
|||
<value>259200000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The value of this parameter sets the prefix for all tables that are part of
|
||||
timeline service in the hbase storage schema. It can be set to "dev."
|
||||
or "staging." if it is to be used for development or staging instances.
|
||||
This way the data in production tables stays in a separate set of tables
|
||||
prefixed by "prod.".
|
||||
</description>
|
||||
<name>yarn.timeline-service.hbase-schema.prefix</name>
|
||||
<value>prod.</value>
|
||||
</property>
|
||||
|
||||
<!-- Shared Cache Configuration -->
|
||||
|
||||
<property>
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
|
||||
|
||||
/**
|
||||
* Unit tests for checking different schema prefixes.
|
||||
*/
|
||||
public class TestHBaseTimelineStorageSchema {
|
||||
private static HBaseTestingUtility util;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
util = new HBaseTestingUtility();
|
||||
util.startMiniCluster();
|
||||
}
|
||||
|
||||
private static void createSchema(Configuration conf) throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(conf, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithDefaultPrefix() throws IOException {
|
||||
Configuration hbaseConf = util.getConfiguration();
|
||||
createSchema(hbaseConf);
|
||||
Connection conn = null;
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
Admin admin = conn.getAdmin();
|
||||
|
||||
TableName entityTableName = BaseTable.getTableName(hbaseConf,
|
||||
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
|
||||
assertTrue(admin.tableExists(entityTableName));
|
||||
assertTrue(entityTableName.getNameAsString().startsWith(
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX));
|
||||
Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
|
||||
assertNotNull(entityTable);
|
||||
|
||||
TableName flowRunTableName = BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
|
||||
assertTrue(admin.tableExists(flowRunTableName));
|
||||
assertTrue(flowRunTableName.getNameAsString().startsWith(
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX));
|
||||
Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
assertNotNull(flowRunTable);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithSetPrefix() throws IOException {
|
||||
Configuration hbaseConf = util.getConfiguration();
|
||||
String prefix = "unit-test.";
|
||||
hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
|
||||
prefix);
|
||||
createSchema(hbaseConf);
|
||||
Connection conn = null;
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
Admin admin = conn.getAdmin();
|
||||
|
||||
TableName entityTableName = BaseTable.getTableName(hbaseConf,
|
||||
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
|
||||
assertTrue(admin.tableExists(entityTableName));
|
||||
assertTrue(entityTableName.getNameAsString().startsWith(prefix));
|
||||
Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
|
||||
assertNotNull(entityTable);
|
||||
|
||||
TableName flowRunTableName = BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
|
||||
assertTrue(admin.tableExists(flowRunTableName));
|
||||
assertTrue(flowRunTableName.getNameAsString().startsWith(prefix));
|
||||
Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
assertNotNull(flowRunTable);
|
||||
|
||||
// create another set with a diff prefix
|
||||
hbaseConf
|
||||
.unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME);
|
||||
prefix = "yet-another-unit-test.";
|
||||
hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
|
||||
prefix);
|
||||
createSchema(hbaseConf);
|
||||
entityTableName = BaseTable.getTableName(hbaseConf,
|
||||
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
|
||||
assertTrue(admin.tableExists(entityTableName));
|
||||
assertTrue(entityTableName.getNameAsString().startsWith(prefix));
|
||||
entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
|
||||
assertNotNull(entityTable);
|
||||
|
||||
flowRunTableName = BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
|
||||
assertTrue(admin.tableExists(flowRunTableName));
|
||||
assertTrue(flowRunTableName.getNameAsString().startsWith(prefix));
|
||||
flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
assertNotNull(flowRunTable);
|
||||
hbaseConf
|
||||
.unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME);
|
||||
}
|
||||
}
|
|
@ -30,7 +30,6 @@ import java.util.Set;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContex
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -155,8 +155,9 @@ public class TestHBaseStorageFlowActivity {
|
|||
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
// check in flow activity table
|
||||
Table table1 = conn.getTable(TableName
|
||||
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(
|
||||
BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
|
||||
FlowActivityTable.DEFAULT_TABLE_NAME));
|
||||
byte[] startRow =
|
||||
new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey();
|
||||
Get g = new Get(startRow);
|
||||
|
@ -286,8 +287,9 @@ public class TestHBaseStorageFlowActivity {
|
|||
.getRowKey();
|
||||
s.setStopRow(stopRow);
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn.getTable(TableName
|
||||
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(
|
||||
BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
|
||||
FlowActivityTable.DEFAULT_TABLE_NAME));
|
||||
ResultScanner scanner = table1.getScanner(s);
|
||||
int rowCount = 0;
|
||||
for (Result result : scanner) {
|
||||
|
@ -424,13 +426,13 @@ public class TestHBaseStorageFlowActivity {
|
|||
new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey();
|
||||
s.setStartRow(startRow);
|
||||
String clusterStop = cluster + "1";
|
||||
byte[] stopRow =
|
||||
new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow)
|
||||
.getRowKey();
|
||||
byte[] stopRow = new FlowActivityRowKey(clusterStop, appCreatedTime, user,
|
||||
flow).getRowKey();
|
||||
s.setStopRow(stopRow);
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn.getTable(TableName
|
||||
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(
|
||||
BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
|
||||
FlowActivityTable.DEFAULT_TABLE_NAME));
|
||||
ResultScanner scanner = table1.getScanner(s);
|
||||
int rowCount = 0;
|
||||
for (Result result : scanner) {
|
||||
|
|
|
@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReader
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -94,8 +94,8 @@ public class TestHBaseStorageFlowRun {
|
|||
@Test
|
||||
public void checkCoProcessorOff() throws IOException, InterruptedException {
|
||||
Configuration hbaseConf = util.getConfiguration();
|
||||
TableName table = TableName.valueOf(hbaseConf.get(
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
TableName table = BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
|
||||
Connection conn = null;
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
Admin admin = conn.getAdmin();
|
||||
|
@ -109,14 +109,14 @@ public class TestHBaseStorageFlowRun {
|
|||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
|
||||
assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
|
||||
hbaseConf));
|
||||
}
|
||||
}
|
||||
|
||||
table = TableName.valueOf(hbaseConf.get(
|
||||
table = BaseTable.getTableName(hbaseConf,
|
||||
FlowActivityTable.TABLE_NAME_CONF_NAME,
|
||||
FlowActivityTable.DEFAULT_TABLE_NAME));
|
||||
FlowActivityTable.DEFAULT_TABLE_NAME);
|
||||
if (admin.tableExists(table)) {
|
||||
// check the regions.
|
||||
// check in flow activity table
|
||||
|
@ -124,14 +124,13 @@ public class TestHBaseStorageFlowRun {
|
|||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
|
||||
assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
|
||||
hbaseConf));
|
||||
}
|
||||
}
|
||||
|
||||
table = TableName.valueOf(hbaseConf.get(
|
||||
EntityTable.TABLE_NAME_CONF_NAME,
|
||||
EntityTable.DEFAULT_TABLE_NAME));
|
||||
table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME,
|
||||
EntityTable.DEFAULT_TABLE_NAME);
|
||||
if (admin.tableExists(table)) {
|
||||
// check the regions.
|
||||
// check in entity run table
|
||||
|
@ -139,7 +138,7 @@ public class TestHBaseStorageFlowRun {
|
|||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
|
||||
assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
|
||||
hbaseConf));
|
||||
}
|
||||
}
|
||||
|
@ -220,8 +219,8 @@ public class TestHBaseStorageFlowRun {
|
|||
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
// check in flow run table
|
||||
Table table1 = conn.getTable(TableName
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(BaseTable.getTableName(c1,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
// scan the table and see that we get back the right min and max
|
||||
// timestamps
|
||||
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
|
||||
|
@ -356,24 +355,24 @@ public class TestHBaseStorageFlowRun {
|
|||
/*
|
||||
* checks the batch limits on a scan
|
||||
*/
|
||||
void checkFlowRunTableBatchLimit(String cluster, String user,
|
||||
String flow, long runid, Configuration c1) throws IOException {
|
||||
void checkFlowRunTableBatchLimit(String cluster, String user, String flow,
|
||||
long runid, Configuration c1) throws IOException {
|
||||
|
||||
Scan s = new Scan();
|
||||
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
|
||||
byte[] startRow =
|
||||
new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
|
||||
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid)
|
||||
.getRowKey();
|
||||
s.setStartRow(startRow);
|
||||
// set a batch limit
|
||||
int batchLimit = 2;
|
||||
s.setBatch(batchLimit);
|
||||
String clusterStop = cluster + "1";
|
||||
byte[] stopRow =
|
||||
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
|
||||
byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid)
|
||||
.getRowKey();
|
||||
s.setStopRow(stopRow);
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn
|
||||
.getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(BaseTable.getTableName(c1,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
ResultScanner scanner = table1.getScanner(s);
|
||||
|
||||
int loopCount = 0;
|
||||
|
@ -517,8 +516,8 @@ public class TestHBaseStorageFlowRun {
|
|||
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
|
||||
s.setStopRow(stopRow);
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn.getTable(TableName
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(BaseTable.getTableName(c1,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
ResultScanner scanner = table1.getScanner(s);
|
||||
|
||||
int rowCount = 0;
|
||||
|
@ -782,8 +781,8 @@ public class TestHBaseStorageFlowRun {
|
|||
boolean checkMax) throws IOException {
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
// check in flow run table
|
||||
Table table1 = conn.getTable(TableName
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(BaseTable.getTableName(c1,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
// scan the table and see that we get back the right min and max
|
||||
// timestamps
|
||||
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -53,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
|||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
||||
|
@ -69,8 +69,8 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
|
||||
private static HBaseTestingUtility util;
|
||||
|
||||
private static final String METRIC_1 = "MAP_SLOT_MILLIS";
|
||||
private static final String METRIC_2 = "HDFS_BYTES_READ";
|
||||
private static final String METRIC1 = "MAP_SLOT_MILLIS";
|
||||
private static final String METRIC2 = "HDFS_BYTES_READ";
|
||||
|
||||
private final byte[] aRowKey = Bytes.toBytes("a");
|
||||
private final byte[] aFamily = Bytes.toBytes("family");
|
||||
|
@ -89,8 +89,9 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
}
|
||||
|
||||
/** Writes non numeric data into flow run table
|
||||
* reads it back.
|
||||
/**
|
||||
* writes non numeric data into flow run table.
|
||||
* reads it back
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
|
@ -106,11 +107,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
|
||||
valueBytes);
|
||||
Configuration hbaseConf = util.getConfiguration();
|
||||
TableName table = TableName.valueOf(hbaseConf.get(
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
Connection conn = null;
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
Table flowRunTable = conn.getTable(table);
|
||||
Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
flowRunTable.put(p);
|
||||
|
||||
Get g = new Get(rowKeyBytes);
|
||||
|
@ -156,11 +156,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
value4Bytes);
|
||||
|
||||
Configuration hbaseConf = util.getConfiguration();
|
||||
TableName table = TableName.valueOf(hbaseConf.get(
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
Connection conn = null;
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
Table flowRunTable = conn.getTable(table);
|
||||
Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
flowRunTable.put(p);
|
||||
|
||||
String rowKey2 = "nonNumericRowKey2";
|
||||
|
@ -262,7 +261,6 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
|
||||
// we expect all back in one next call
|
||||
assertEquals(4, values.size());
|
||||
System.out.println(" values size " + values.size() + " " + batchLimit);
|
||||
rowCount++;
|
||||
}
|
||||
// should get back 1 row with each invocation
|
||||
|
@ -321,10 +319,11 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
}
|
||||
|
||||
// check in flow run table
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(TableName
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
List<Region> regions = server.getOnlineRegions(TableName
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(
|
||||
BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME,
|
||||
FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
List<Region> regions = server.getOnlineRegions(BaseTable.getTableName(c1,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
assertTrue("Didn't find any regions for primary table!",
|
||||
regions.size() > 0);
|
||||
// flush and compact all the regions of the primary table
|
||||
|
@ -349,8 +348,8 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
|
||||
s.setStopRow(stopRow);
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn.getTable(TableName
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
Table table1 = conn.getTable(BaseTable.getTableName(c1,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
ResultScanner scanner = table1.getScanner(s);
|
||||
|
||||
int rowCount = 0;
|
||||
|
@ -364,13 +363,13 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
rowCount++;
|
||||
// check metric1
|
||||
byte[] q = ColumnHelper.getColumnQualifier(
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1);
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1);
|
||||
assertTrue(values.containsKey(q));
|
||||
assertEquals(141, Bytes.toLong(values.get(q)));
|
||||
|
||||
// check metric2
|
||||
q = ColumnHelper.getColumnQualifier(
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2);
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2);
|
||||
assertTrue(values.containsKey(q));
|
||||
assertEquals(57, Bytes.toLong(values.get(q)));
|
||||
}
|
||||
|
@ -587,9 +586,9 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
long cellTsFinalStart = 10001120L;
|
||||
long cellTsFinal = cellTsFinalStart;
|
||||
|
||||
long cellTsFinalStartNotExpire =
|
||||
TimestampGenerator.getSupplementedTimestamp(
|
||||
System.currentTimeMillis(), "application_10266666661166_118821");
|
||||
long cellTsFinalStartNotExpire = TimestampGenerator
|
||||
.getSupplementedTimestamp(System.currentTimeMillis(),
|
||||
"application_10266666661166_118821");
|
||||
long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
|
||||
|
||||
long cellTsNotFinalStart = currentTimestamp - 5;
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* Implements behavior common to tables used in the timeline service storage. It
|
||||
|
@ -114,16 +115,42 @@ public abstract class BaseTable<T> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the table name for this table.
|
||||
* Get the table name for the input table.
|
||||
*
|
||||
* @param hbaseConf HBase configuration from which table name will be fetched.
|
||||
* @param conf HBase configuration from which table name will be fetched.
|
||||
* @param tableName name of the table to be fetched
|
||||
* @return A {@link TableName} object.
|
||||
*/
|
||||
public TableName getTableName(Configuration hbaseConf) {
|
||||
TableName table =
|
||||
TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName));
|
||||
return table;
|
||||
public static TableName getTableName(Configuration conf, String tableName) {
|
||||
String tableSchemaPrefix = conf.get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX);
|
||||
return TableName.valueOf(tableSchemaPrefix + tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the table name for this table.
|
||||
*
|
||||
* @param conf HBase configuration from which table name will be fetched.
|
||||
* @return A {@link TableName} object.
|
||||
*/
|
||||
public TableName getTableName(Configuration conf) {
|
||||
String tableName = conf.get(tableNameConfName, defaultTableName);
|
||||
return getTableName(conf, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the table name based on the input config parameters.
|
||||
*
|
||||
* @param conf HBase configuration from which table name will be fetched.
|
||||
* @param tableNameInConf the table name parameter in conf.
|
||||
* @param defaultTableName the default table name.
|
||||
* @return A {@link TableName} object.
|
||||
*/
|
||||
public static TableName getTableName(Configuration conf,
|
||||
String tableNameInConf, String defaultTableName) {
|
||||
String tableName = conf.get(tableNameInConf, defaultTableName);
|
||||
return getTableName(conf, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,10 +29,8 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -50,7 +48,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
|
||||
|
||||
/**
|
||||
* A bunch of utility functions used across TimelineReader and TimelineWriter.
|
||||
|
@ -566,21 +563,4 @@ public final class TimelineStorageUtils {
|
|||
}
|
||||
return appId;
|
||||
}
|
||||
|
||||
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
|
||||
Configuration conf) {
|
||||
String regionTableName = hRegionInfo.getTable().getNameAsString();
|
||||
String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
|
||||
FlowRunTable.DEFAULT_TABLE_NAME);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("regionTableName=" + regionTableName);
|
||||
}
|
||||
if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(" table is the flow run table!! " + flowRunTableName);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
|
|||
if (e instanceof RegionCoprocessorEnvironment) {
|
||||
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
|
||||
this.region = env.getRegion();
|
||||
isFlowRunRegion = TimelineStorageUtils.isFlowRunTable(
|
||||
isFlowRunRegion = FlowRunTable.isFlowRunTable(
|
||||
region.getRegionInfo(), env.getConfiguration());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
|
@ -138,4 +139,23 @@ public class FlowRunTable extends BaseTable<FlowRunTable> {
|
|||
LOG.info("Status of table creation for " + table.getNameAsString() + "="
|
||||
+ admin.tableExists(table));
|
||||
}
|
||||
|
||||
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
|
||||
Configuration conf) {
|
||||
String regionTableName = hRegionInfo.getTable().getNameAsString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("regionTableName=" + regionTableName);
|
||||
}
|
||||
String flowRunTableName = BaseTable.getTableName(conf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)
|
||||
.getNameAsString();
|
||||
if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(" table is the flow run table!! "
|
||||
+ flowRunTableName);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -143,6 +143,7 @@ New configuration parameters that are introduced with v.2 are marked bold.
|
|||
| **`yarn.timeline-service.writer.class`** | The class for the backend storage writer. Defaults to HBase storage writer. |
|
||||
| **`yarn.timeline-service.reader.class`** | The class for the backend storage reader. Defaults to HBase storage reader. |
|
||||
| **`yarn.system-metrics-publisher.enabled`** | The setting that controls whether yarn system metrics is published on the Timeline service or not by RM And NM. Defaults to `false`. |
|
||||
| **`yarn.timeline-service.schema.prefix`** | The schema prefix for hbase tables. Defaults to "prod.". |
|
||||
|
||||
#### Advanced configuration
|
||||
|
||||
|
@ -194,7 +195,8 @@ Finally, run the schema creator tool to create the necessary tables:
|
|||
|
||||
The `TimelineSchemaCreator` tool supports a few options that may come handy especially when you
|
||||
are testing. For example, you can use `-skipExistingTable` (`-s` for short) to skip existing tables
|
||||
and continue to create other tables rather than failing the schema creation.
|
||||
and continue to create other tables rather than failing the schema creation. By default, the tables
|
||||
will have a schema prefix of "prod."
|
||||
|
||||
#### Enabling Timeline Service v.2
|
||||
Following are the basic configurations to start Timeline service v.2:
|
||||
|
|
Loading…
Reference in New Issue