From d19726ec37704904dbbcae6d1a8e9fb4dab89c74 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Thu, 27 Oct 2016 16:12:06 +0530 Subject: [PATCH] YARN-3649. Allow configurable prefix for hbase table names like prod, exp, test etc (Vrushali C via Varun Saxena) --- .../hadoop/yarn/conf/YarnConfiguration.java | 14 +- .../src/main/resources/yarn-default.xml | 12 ++ .../TestHBaseTimelineStorageSchema.java | 137 ++++++++++++++++++ .../flow/TestHBaseStorageFlowActivity.java | 22 +-- .../storage/flow/TestHBaseStorageFlowRun.java | 49 +++---- .../TestHBaseStorageFlowRunCompaction.java | 45 +++--- .../storage/common/BaseTable.java | 39 ++++- .../storage/common/TimelineStorageUtils.java | 20 --- .../storage/flow/FlowRunCoprocessor.java | 2 +- .../storage/flow/FlowRunTable.java | 20 +++ .../src/site/markdown/TimelineServiceV2.md | 4 +- 11 files changed, 277 insertions(+), 87 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 863f11c68bd..6f796163b2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2189,7 +2189,7 @@ public class YarnConfiguration extends Configuration { + "entity-file.fs-support-append"; /** - * Settings for timeline service v2.0 + * Settings for timeline service v2.0. */ public static final String TIMELINE_SERVICE_WRITER_CLASS = TIMELINE_SERVICE_PREFIX + "writer.class"; @@ -2197,6 +2197,18 @@ public class YarnConfiguration extends Configuration { public static final String TIMELINE_SERVICE_READER_CLASS = TIMELINE_SERVICE_PREFIX + "reader.class"; + /** + * default schema prefix for hbase tables. + */ + public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX = + "prod."; + + /** + * config param name to override schema prefix. + */ + public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME = + TIMELINE_SERVICE_PREFIX + "hbase-schema.prefix"; + /** The setting that controls how often the timeline collector flushes the * timeline writer. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cfe5488654e..6159178a8ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2354,6 +2354,18 @@ 259200000 + + + The value of this parameter sets the prefix for all tables that are part of + timeline service in the hbase storage schema. It can be set to "dev." + or "staging." if it is to be used for development or staging instances. + This way the data in production tables stays in a separate set of tables + prefixed by "prod.". + + yarn.timeline-service.hbase-schema.prefix + prod. + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java new file mode 100644 index 00000000000..53045e57009 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +/** + * Unit tests for checking different schema prefixes. + */ +public class TestHBaseTimelineStorageSchema { + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(); + } + + private static void createSchema(Configuration conf) throws IOException { + TimelineSchemaCreator.createAllTables(conf, false); + } + + @Test + public void createWithDefaultPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + } + + @Test + public void createWithSetPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + String prefix = "unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + + // create another set with a diff prefix + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + prefix = "yet-another-unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 1906574b8cc..1db0649e0ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -30,7 +30,6 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContex import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.junit.AfterClass; @@ -155,8 +155,9 @@ public class TestHBaseStorageFlowActivity { Connection conn = ConnectionFactory.createConnection(c1); // check in flow activity table - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); byte[] startRow = new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey(); Get g = new Get(startRow); @@ -286,8 +287,9 @@ public class TestHBaseStorageFlowActivity { .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { @@ -424,13 +426,13 @@ public class TestHBaseStorageFlowActivity { new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = - new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow) - .getRowKey(); + byte[] stopRow = new FlowActivityRowKey(clusterStop, appCreatedTime, user, + flow).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 74b9e501ca4..c066a1f85f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReader import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -94,8 +94,8 @@ public class TestHBaseStorageFlowRun { @Test public void checkCoProcessorOff() throws IOException, InterruptedException { Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + TableName table = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); Admin admin = conn.getAdmin(); @@ -109,14 +109,14 @@ public class TestHBaseStorageFlowRun { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( + table = BaseTable.getTableName(hbaseConf, FlowActivityTable.TABLE_NAME_CONF_NAME, - FlowActivityTable.DEFAULT_TABLE_NAME)); + FlowActivityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in flow activity table @@ -124,14 +124,13 @@ public class TestHBaseStorageFlowRun { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( - EntityTable.TABLE_NAME_CONF_NAME, - EntityTable.DEFAULT_TABLE_NAME)); + table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME, + EntityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in entity run table @@ -139,7 +138,7 @@ public class TestHBaseStorageFlowRun { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } @@ -220,8 +219,8 @@ public class TestHBaseStorageFlowRun { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); @@ -356,24 +355,24 @@ public class TestHBaseStorageFlowRun { /* * checks the batch limits on a scan */ - void checkFlowRunTableBatchLimit(String cluster, String user, - String flow, long runid, Configuration c1) throws IOException { + void checkFlowRunTableBatchLimit(String cluster, String user, String flow, + long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = - new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid) + .getRowKey(); s.setStartRow(startRow); // set a batch limit int batchLimit = 2; s.setBatch(batchLimit); String clusterStop = cluster + "1"; - byte[] stopRow = - new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); + byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid) + .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn - .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int loopCount = 0; @@ -517,8 +516,8 @@ public class TestHBaseStorageFlowRun { new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; @@ -782,8 +781,8 @@ public class TestHBaseStorageFlowRun { boolean checkMax) throws IOException { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 30940886f32..5fe8b1bacb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -53,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -69,8 +69,8 @@ public class TestHBaseStorageFlowRunCompaction { private static HBaseTestingUtility util; - private static final String METRIC_1 = "MAP_SLOT_MILLIS"; - private static final String METRIC_2 = "HDFS_BYTES_READ"; + private static final String METRIC1 = "MAP_SLOT_MILLIS"; + private static final String METRIC2 = "HDFS_BYTES_READ"; private final byte[] aRowKey = Bytes.toBytes("a"); private final byte[] aFamily = Bytes.toBytes("family"); @@ -89,8 +89,9 @@ public class TestHBaseStorageFlowRunCompaction { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } - /** Writes non numeric data into flow run table - * reads it back. + /** + * writes non numeric data into flow run table. + * reads it back * * @throws Exception */ @@ -106,11 +107,10 @@ public class TestHBaseStorageFlowRunCompaction { p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, valueBytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); Get g = new Get(rowKeyBytes); @@ -156,11 +156,10 @@ public class TestHBaseStorageFlowRunCompaction { value4Bytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); String rowKey2 = "nonNumericRowKey2"; @@ -262,7 +261,6 @@ public class TestHBaseStorageFlowRunCompaction { .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); // we expect all back in one next call assertEquals(4, values.size()); - System.out.println(" values size " + values.size() + " " + batchLimit); rowCount++; } // should get back 1 row with each invocation @@ -321,10 +319,11 @@ public class TestHBaseStorageFlowRunCompaction { } // check in flow run table - HRegionServer server = util.getRSForFirstRegionInTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + HRegionServer server = util.getRSForFirstRegionInTable( + BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME)); + List regions = server.getOnlineRegions(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); assertTrue("Didn't find any regions for primary table!", regions.size() > 0); // flush and compact all the regions of the primary table @@ -349,8 +348,8 @@ public class TestHBaseStorageFlowRunCompaction { new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; @@ -364,13 +363,13 @@ public class TestHBaseStorageFlowRunCompaction { rowCount++; // check metric1 byte[] q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1); + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1); assertTrue(values.containsKey(q)); assertEquals(141, Bytes.toLong(values.get(q))); // check metric2 q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2); + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2); assertTrue(values.containsKey(q)); assertEquals(57, Bytes.toLong(values.get(q))); } @@ -587,9 +586,9 @@ public class TestHBaseStorageFlowRunCompaction { long cellTsFinalStart = 10001120L; long cellTsFinal = cellTsFinalStart; - long cellTsFinalStartNotExpire = - TimestampGenerator.getSupplementedTimestamp( - System.currentTimeMillis(), "application_10266666661166_118821"); + long cellTsFinalStartNotExpire = TimestampGenerator + .getSupplementedTimestamp(System.currentTimeMillis(), + "application_10266666661166_118821"); long cellTsFinalNotExpire = cellTsFinalStartNotExpire; long cellTsNotFinalStart = currentTimestamp - 5; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index 8581aa45f28..93d809c003e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Implements behavior common to tables used in the timeline service storage. It @@ -114,16 +115,42 @@ public abstract class BaseTable { } /** - * Get the table name for this table. + * Get the table name for the input table. * - * @param hbaseConf HBase configuration from which table name will be fetched. + * @param conf HBase configuration from which table name will be fetched. + * @param tableName name of the table to be fetched * @return A {@link TableName} object. */ - public TableName getTableName(Configuration hbaseConf) { - TableName table = - TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName)); - return table; + public static TableName getTableName(Configuration conf, String tableName) { + String tableSchemaPrefix = conf.get( + YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX); + return TableName.valueOf(tableSchemaPrefix + tableName); + } + /** + * Get the table name for this table. + * + * @param conf HBase configuration from which table name will be fetched. + * @return A {@link TableName} object. + */ + public TableName getTableName(Configuration conf) { + String tableName = conf.get(tableNameConfName, defaultTableName); + return getTableName(conf, tableName); + } + + /** + * Get the table name based on the input config parameters. + * + * @param conf HBase configuration from which table name will be fetched. + * @param tableNameInConf the table name parameter in conf. + * @param defaultTableName the default table name. + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, + String tableNameInConf, String defaultTableName) { + String tableName = conf.get(tableNameInConf, defaultTableName); + return getTableName(conf, tableName); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index aa9a793ef48..99e16a17f5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -29,10 +29,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.util.Bytes; @@ -50,7 +48,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** * A bunch of utility functions used across TimelineReader and TimelineWriter. @@ -566,21 +563,4 @@ public final class TimelineStorageUtils { } return appId; } - - public static boolean isFlowRunTable(HRegionInfo hRegionInfo, - Configuration conf) { - String regionTableName = hRegionInfo.getTable().getNameAsString(); - String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, - FlowRunTable.DEFAULT_TABLE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("regionTableName=" + regionTableName); - } - if (flowRunTableName.equalsIgnoreCase(regionTableName)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" table is the flow run table!! " + flowRunTableName); - } - return true; - } - return false; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index a9dcfaad245..5c7b0698867 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -71,7 +71,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); - isFlowRunRegion = TimelineStorageUtils.isFlowRunTable( + isFlowRunRegion = FlowRunTable.isFlowRunTable( region.getRegionInfo(), env.getConfiguration()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java index 547bef075a5..4cd581b38a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -138,4 +139,23 @@ public class FlowRunTable extends BaseTable { LOG.info("Status of table creation for " + table.getNameAsString() + "=" + admin.tableExists(table)); } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("regionTableName=" + regionTableName); + } + String flowRunTableName = BaseTable.getTableName(conf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME) + .getNameAsString(); + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" table is the flow run table!! " + + flowRunTableName); + } + return true; + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md index b6a0da42988..4197d99f16a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md @@ -143,6 +143,7 @@ New configuration parameters that are introduced with v.2 are marked bold. | **`yarn.timeline-service.writer.class`** | The class for the backend storage writer. Defaults to HBase storage writer. | | **`yarn.timeline-service.reader.class`** | The class for the backend storage reader. Defaults to HBase storage reader. | | **`yarn.system-metrics-publisher.enabled`** | The setting that controls whether yarn system metrics is published on the Timeline service or not by RM And NM. Defaults to `false`. | +| **`yarn.timeline-service.schema.prefix`** | The schema prefix for hbase tables. Defaults to "prod.". | #### Advanced configuration @@ -194,7 +195,8 @@ Finally, run the schema creator tool to create the necessary tables: The `TimelineSchemaCreator` tool supports a few options that may come handy especially when you are testing. For example, you can use `-skipExistingTable` (`-s` for short) to skip existing tables -and continue to create other tables rather than failing the schema creation. +and continue to create other tables rather than failing the schema creation. By default, the tables +will have a schema prefix of "prod." #### Enabling Timeline Service v.2 Following are the basic configurations to start Timeline service v.2: