diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index d4c4c2b2613..a29481da408 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -212,6 +212,13 @@ public class HTableDescriptor implements WritableComparable { /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; + public static final String PRIORITY = "PRIORITY"; + private static final ImmutableBytesWritable PRIORITY_KEY = + new ImmutableBytesWritable(Bytes.toBytes(PRIORITY)); + + /** Relative priority of the table used for rpc scheduling */ + private static final int DEFAULT_PRIORITY = HConstants.NORMAL_QOS; + /* * The below are ugly but better than creating them each time till we * replace booleans being saved as Strings with plain booleans. Need a @@ -265,6 +272,7 @@ public class HTableDescriptor implements WritableComparable { DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION)); DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED)); + DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY)); for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s))); } @@ -1211,9 +1219,13 @@ public class HTableDescriptor implements WritableComparable { * Returns the configured replicas per region */ public int getRegionReplication() { - byte[] val = getValue(REGION_REPLICATION_KEY); + return getIntValue(REGION_REPLICATION_KEY, DEFAULT_REGION_REPLICATION); + } + + private int getIntValue(ImmutableBytesWritable key, int defaultVal) { + byte[] val = getValue(key); if (val == null || val.length == 0) { - return DEFAULT_REGION_REPLICATION; + return defaultVal; } return Integer.parseInt(Bytes.toString(val)); } @@ -1253,6 +1265,15 @@ public class HTableDescriptor implements WritableComparable { return this; } + public HTableDescriptor setPriority(int priority) { + setValue(PRIORITY_KEY, Integer.toString(priority)); + return this; + } + + public int getPriority() { + return getIntValue(PRIORITY_KEY, DEFAULT_PRIORITY); + } + /** * Returns all the column family names of the current table. The map of * HTableDescriptor contains mapping of family name to HColumnDescriptors. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index a7759c53c78..9b7751d48a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -137,6 +137,12 @@ public enum EventType { * Master asking RS to close meta. */ M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META), + /** + * Messages originating from Master to RS.
+ * M_RS_OPEN_PRIORITY_REGION
+ * Master asking RS to open a priority region. + */ + M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION), /** * Messages originating from Client to Master.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 5a161497b91..e9b0ad58c57 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -47,7 +47,8 @@ public enum ExecutorType { RS_PARALLEL_SEEK (26), RS_LOG_REPLAY_OPS (27), RS_REGION_REPLICA_FLUSH_OPS (28), - RS_COMPACTED_FILES_DISCHARGER (29); + RS_COMPACTED_FILES_DISCHARGER (29), + RS_OPEN_PRIORITY_REGION (30); ExecutorType(int value) {} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java index c09e41b7b9e..d126994b614 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java @@ -307,4 +307,11 @@ public class TestHTableDescriptor { hcd.setBlocksize(2000); htd.addFamily(hcd); } + + @Test + public void testPriority() { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table")); + htd.setPriority(42); + assertEquals(42, htd.getPriority()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 018e173f2d7..479184feb1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -127,6 +127,10 @@ public class ExecutorService { return executor; } + @VisibleForTesting + public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { + return getExecutor(type).getThreadPoolExecutor(); + } public void startExecutorService(final ExecutorType type, final int maxThreads) { String name = type.getExecutorName(this.servername); @@ -180,7 +184,7 @@ public class ExecutorService { } return ret; } - + /** * Executor instance. */ @@ -225,7 +229,12 @@ public class ExecutorService { } this.threadPoolExecutor.execute(event); } - + + TrackingThreadPoolExecutor getThreadPoolExecutor() { + return threadPoolExecutor; + } + + @Override public String toString() { return getClass().getSimpleName() + "-" + id + "-" + name; } @@ -239,7 +248,7 @@ public class ExecutorService { } queuedEvents.add((EventHandler)r); } - + List running = Lists.newArrayList(); for (Map.Entry e : threadPoolExecutor.getRunningTasks().entrySet()) { @@ -250,18 +259,18 @@ public class ExecutorService { } running.add(new RunningEventStatus(e.getKey(), (EventHandler)r)); } - + return new ExecutorStatus(this, queuedEvents, running); } } - + /** * A subclass of ThreadPoolExecutor that keeps track of the Runnables that * are executing at any given point in time. */ static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { - private ConcurrentMap running = Maps.newConcurrentMap(); - + private ConcurrentMap running = Maps.newConcurrentMap(); + public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); @@ -279,7 +288,7 @@ public class ExecutorService { assert oldPut == null : "inconsistency for thread " + t; super.beforeExecute(t, r); } - + /** * @return a map of the threads currently running tasks * inside this executor. Each key is an active thread, @@ -310,7 +319,7 @@ public class ExecutorService { this.queuedEvents = queuedEvents; this.running = running; } - + /** * Dump a textual representation of the executor's status * to the given writer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5e7ce5a5da1..2b8f84027fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1725,6 +1725,8 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_OPEN_META, conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); + this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, + conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION, conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_META, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 640cd8facbb..329abb6950b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -168,6 +168,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; +import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -1653,8 +1654,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else { regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), regionOpenInfo.getFavoredNodesList()); - regionServer.service.submit(new OpenRegionHandler( - regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); + if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { + regionServer.service.submit(new OpenPriorityRegionHandler( + regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); + } else { + regionServer.service.submit(new OpenRegionHandler( + regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenPriorityRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenPriorityRegionHandler.java new file mode 100644 index 00000000000..7ce2ac03f0d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenPriorityRegionHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.handler; + +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * Handles opening of a high priority region on a region server. + *

+ * This is executed after receiving an OPEN RPC from the master or client. + */ +@InterfaceAudience.Private +public class OpenPriorityRegionHandler extends OpenRegionHandler { + public OpenPriorityRegionHandler(Server server, RegionServerServices rsServices, + HRegionInfo regionInfo, HTableDescriptor htd, long masterSystemTime, + OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) { + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_PRIORITY_REGION, + masterSystemTime, coordination, ord); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionOpen.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionOpen.java new file mode 100644 index 00000000000..aac872d9b3e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionOpen.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.executor.ExecutorType; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class, RegionServerTests.class}) +public class TestRegionOpen { + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(TestRegionOpen.class); + private static final int NB_SERVERS = 1; + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + final TableName tableName = TableName.valueOf(TestRegionOpen.class.getSimpleName()); + + @BeforeClass + public static void before() throws Exception { + HTU.startMiniCluster(NB_SERVERS); + } + + @AfterClass + public static void afterClass() throws Exception { + HTU.shutdownMiniCluster(); + } + + private static HRegionServer getRS() { + return HTU.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer(); + } + + @Test(timeout = 60000) + public void testPriorityRegionIsOpenedWithSeparateThreadPool() throws Exception { + ThreadPoolExecutor exec = getRS().getExecutorService() + .getExecutorThreadPool(ExecutorType.RS_OPEN_PRIORITY_REGION); + + assertEquals(1, exec.getCompletedTaskCount()); // namespace region + + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.setPriority(HConstants.HIGH_QOS); + htd.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Admin admin = connection.getAdmin()) { + admin.createTable(htd); + } + + assertEquals(2, exec.getCompletedTaskCount()); + } +}