From a2b195ee8df1197379a70a02d60b8c8584c3f713 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Thu, 27 Oct 2016 16:01:45 +0530 Subject: [PATCH] YARN-3649. Allow configurable prefix for hbase table names like prod, exp, test etc (Vrushali C via Varun Saxena) --- .../hadoop/yarn/conf/YarnConfiguration.java | 13 +- .../src/main/resources/yarn-default.xml | 12 ++ .../TestHBaseTimelineStorageSchema.java | 137 ++++++++++++++++++ .../flow/TestHBaseStorageFlowActivity.java | 22 +-- .../storage/flow/TestHBaseStorageFlowRun.java | 55 ++++--- .../TestHBaseStorageFlowRunCompaction.java | 45 +++--- .../storage/common/BaseTable.java | 39 ++++- .../storage/flow/FlowRunCoprocessor.java | 2 +- .../storage/flow/FlowRunTable.java | 20 +++ .../storage/common/TimelineStorageUtils.java | 1 - .../src/site/markdown/TimelineServiceV2.md | 6 +- 11 files changed, 280 insertions(+), 72 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 16bd73a121e..a2fe351378f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2099,7 +2099,7 @@ public class YarnConfiguration extends Configuration { TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir"; /** - * Settings for timeline service v2.0 + * Settings for timeline service v2.0. */ public static final String TIMELINE_SERVICE_WRITER_CLASS = TIMELINE_SERVICE_PREFIX + "writer.class"; @@ -2115,6 +2115,17 @@ public class YarnConfiguration extends Configuration { "org.apache.hadoop.yarn.server.timelineservice" + ".storage.HBaseTimelineReaderImpl"; + /** + * default schema prefix for hbase tables. + */ + public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX = + "prod."; + + /** + * config param name to override schema prefix. + */ + public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME = + TIMELINE_SERVICE_PREFIX + "hbase-schema.prefix"; /** The setting that controls how often the timeline collector flushes the * timeline writer. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0823dfe7531..3d25722309b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2341,6 +2341,18 @@ 259200000 + + + The value of this parameter sets the prefix for all tables that are part of + timeline service in the hbase storage schema. It can be set to "dev." + or "staging." if it is to be used for development or staging instances. + This way the data in production tables stays in a separate set of tables + prefixed by "prod.". + + yarn.timeline-service.hbase-schema.prefix + prod. + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java new file mode 100644 index 00000000000..53045e57009 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +/** + * Unit tests for checking different schema prefixes. + */ +public class TestHBaseTimelineStorageSchema { + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(); + } + + private static void createSchema(Configuration conf) throws IOException { + TimelineSchemaCreator.createAllTables(conf, false); + } + + @Test + public void createWithDefaultPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + } + + @Test + public void createWithSetPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + String prefix = "unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + + // create another set with a diff prefix + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + prefix = "yet-another-unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 2778f50df73..97d40fdd185 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -30,7 +30,6 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContex import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.junit.AfterClass; @@ -155,8 +155,9 @@ public class TestHBaseStorageFlowActivity { Connection conn = ConnectionFactory.createConnection(c1); // check in flow activity table - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); byte[] startRow = new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey(); Get g = new Get(startRow); @@ -286,8 +287,9 @@ public class TestHBaseStorageFlowActivity { .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { @@ -425,13 +427,13 @@ public class TestHBaseStorageFlowActivity { new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = - new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow) - .getRowKey(); + byte[] stopRow = new FlowActivityRowKey(clusterStop, appCreatedTime, user, + flow).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 7f46a5a0fc8..00fee69c527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReader import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -94,8 +94,8 @@ public class TestHBaseStorageFlowRun { @Test public void checkCoProcessorOff() throws IOException, InterruptedException { Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + TableName table = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); Admin admin = conn.getAdmin(); @@ -109,14 +109,14 @@ public class TestHBaseStorageFlowRun { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertTrue(HBaseTimelineStorageUtils.isFlowRunTable( - region.getRegionInfo(), hbaseConf)); + assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( + table = BaseTable.getTableName(hbaseConf, FlowActivityTable.TABLE_NAME_CONF_NAME, - FlowActivityTable.DEFAULT_TABLE_NAME)); + FlowActivityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in flow activity table @@ -124,14 +124,13 @@ public class TestHBaseStorageFlowRun { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(HBaseTimelineStorageUtils.isFlowRunTable( - region.getRegionInfo(), hbaseConf)); + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( - EntityTable.TABLE_NAME_CONF_NAME, - EntityTable.DEFAULT_TABLE_NAME)); + table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME, + EntityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in entity run table @@ -139,8 +138,8 @@ public class TestHBaseStorageFlowRun { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(HBaseTimelineStorageUtils.isFlowRunTable( - region.getRegionInfo(), hbaseConf)); + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); } } } @@ -220,8 +219,8 @@ public class TestHBaseStorageFlowRun { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); @@ -356,24 +355,24 @@ public class TestHBaseStorageFlowRun { /* * checks the batch limits on a scan */ - void checkFlowRunTableBatchLimit(String cluster, String user, - String flow, long runid, Configuration c1) throws IOException { + void checkFlowRunTableBatchLimit(String cluster, String user, String flow, + long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = - new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid) + .getRowKey(); s.setStartRow(startRow); // set a batch limit int batchLimit = 2; s.setBatch(batchLimit); String clusterStop = cluster + "1"; - byte[] stopRow = - new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); + byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid) + .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn - .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int loopCount = 0; @@ -517,8 +516,8 @@ public class TestHBaseStorageFlowRun { new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; @@ -782,8 +781,8 @@ public class TestHBaseStorageFlowRun { boolean checkMax) throws IOException { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index eb18e28243d..a4c0e4498ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -53,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; @@ -69,8 +69,8 @@ public class TestHBaseStorageFlowRunCompaction { private static HBaseTestingUtility util; - private static final String METRIC_1 = "MAP_SLOT_MILLIS"; - private static final String METRIC_2 = "HDFS_BYTES_READ"; + private static final String METRIC1 = "MAP_SLOT_MILLIS"; + private static final String METRIC2 = "HDFS_BYTES_READ"; private final byte[] aRowKey = Bytes.toBytes("a"); private final byte[] aFamily = Bytes.toBytes("family"); @@ -89,8 +89,9 @@ public class TestHBaseStorageFlowRunCompaction { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } - /** Writes non numeric data into flow run table - * reads it back. + /** + * writes non numeric data into flow run table. + * reads it back * * @throws Exception */ @@ -106,11 +107,10 @@ public class TestHBaseStorageFlowRunCompaction { p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, valueBytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); Get g = new Get(rowKeyBytes); @@ -156,11 +156,10 @@ public class TestHBaseStorageFlowRunCompaction { value4Bytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); String rowKey2 = "nonNumericRowKey2"; @@ -262,7 +261,6 @@ public class TestHBaseStorageFlowRunCompaction { .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); // we expect all back in one next call assertEquals(4, values.size()); - System.out.println(" values size " + values.size() + " " + batchLimit); rowCount++; } // should get back 1 row with each invocation @@ -321,10 +319,11 @@ public class TestHBaseStorageFlowRunCompaction { } // check in flow run table - HRegionServer server = util.getRSForFirstRegionInTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + HRegionServer server = util.getRSForFirstRegionInTable( + BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME)); + List regions = server.getOnlineRegions(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); assertTrue("Didn't find any regions for primary table!", regions.size() > 0); // flush and compact all the regions of the primary table @@ -349,8 +348,8 @@ public class TestHBaseStorageFlowRunCompaction { new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; @@ -364,13 +363,13 @@ public class TestHBaseStorageFlowRunCompaction { rowCount++; // check metric1 byte[] q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1); + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1); assertTrue(values.containsKey(q)); assertEquals(141, Bytes.toLong(values.get(q))); // check metric2 q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2); + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2); assertTrue(values.containsKey(q)); assertEquals(57, Bytes.toLong(values.get(q))); } @@ -587,9 +586,9 @@ public class TestHBaseStorageFlowRunCompaction { long cellTsFinalStart = 10001120L; long cellTsFinal = cellTsFinalStart; - long cellTsFinalStartNotExpire = - TimestampGenerator.getSupplementedTimestamp( - System.currentTimeMillis(), "application_10266666661166_118821"); + long cellTsFinalStartNotExpire = TimestampGenerator + .getSupplementedTimestamp(System.currentTimeMillis(), + "application_10266666661166_118821"); long cellTsFinalNotExpire = cellTsFinalStartNotExpire; long cellTsNotFinalStart = currentTimestamp - 5; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index 8581aa45f28..93d809c003e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Implements behavior common to tables used in the timeline service storage. It @@ -114,16 +115,42 @@ public abstract class BaseTable { } /** - * Get the table name for this table. + * Get the table name for the input table. * - * @param hbaseConf HBase configuration from which table name will be fetched. + * @param conf HBase configuration from which table name will be fetched. + * @param tableName name of the table to be fetched * @return A {@link TableName} object. */ - public TableName getTableName(Configuration hbaseConf) { - TableName table = - TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName)); - return table; + public static TableName getTableName(Configuration conf, String tableName) { + String tableSchemaPrefix = conf.get( + YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX); + return TableName.valueOf(tableSchemaPrefix + tableName); + } + /** + * Get the table name for this table. + * + * @param conf HBase configuration from which table name will be fetched. + * @return A {@link TableName} object. + */ + public TableName getTableName(Configuration conf) { + String tableName = conf.get(tableNameConfName, defaultTableName); + return getTableName(conf, tableName); + } + + /** + * Get the table name based on the input config parameters. + * + * @param conf HBase configuration from which table name will be fetched. + * @param tableNameInConf the table name parameter in conf. + * @param defaultTableName the default table name. + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, + String tableNameInConf, String defaultTableName) { + String tableName = conf.get(tableNameInConf, defaultTableName); + return getTableName(conf, tableName); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 221420eb503..a3c355fdee5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -72,7 +72,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); - isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable( + isFlowRunRegion = FlowRunTable.isFlowRunTable( region.getRegionInfo(), env.getConfiguration()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java index 9c6549ffc12..8fdd68527a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -139,4 +140,23 @@ public class FlowRunTable extends BaseTable { LOG.info("Status of table creation for " + table.getNameAsString() + "=" + admin.tableExists(table)); } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("regionTableName=" + regionTableName); + } + String flowRunTableName = BaseTable.getTableName(conf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME) + .getNameAsString(); + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" table is the flow run table!! " + + flowRunTableName); + } + return true; + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 7f7d6405ae9..203d950fa81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -371,5 +371,4 @@ public final class TimelineStorageUtils { return (obj instanceof Short) || (obj instanceof Integer) || (obj instanceof Long); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md index 04822c9fb06..6f98dcc53a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md @@ -127,6 +127,7 @@ New configuration parameters that are introduced with v.2 are marked bold. | **`yarn.timeline-service.writer.class`** | The class for the backend storage writer. Defaults to HBase storage writer. | | **`yarn.timeline-service.reader.class`** | The class for the backend storage reader. Defaults to HBase storage reader. | | **`yarn.system-metrics-publisher.enabled`** | The setting that controls whether yarn system metrics is published on the Timeline service or not by RM And NM. Defaults to `false`. | +| **`yarn.timeline-service.schema.prefix`** | The schema prefix for hbase tables. Defaults to "prod.". | #### Advanced configuration @@ -187,8 +188,9 @@ Finally, run the schema creator tool to create the necessary tables: The `TimelineSchemaCreator` tool supports a few options that may come handy especially when you are testing. For example, you can use `-skipExistingTable` (`-s` for short) to skip existing tables -and continue to create other tables rather than failing the schema creation. When no option or '-help' -('-h' for short) is provided, the command usage is printed. +and continue to create other tables rather than failing the schema creation. By default, the tables +will have a schema prefix of "prod.". When no option or '-help' ('-h' for short) is provided, the +command usage is printed. #### Enabling Timeline Service v.2 Following are the basic configurations to start Timeline service v.2: