HBASE-16095 Add priority to TableDescriptor and priority region open thread pool
This commit is contained in:
parent
86f3768627
commit
a07892558b
|
@ -191,6 +191,13 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
|
||||||
/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
|
/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
|
||||||
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
|
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
|
||||||
|
|
||||||
|
public static final String PRIORITY = "PRIORITY";
|
||||||
|
private static final Bytes PRIORITY_KEY =
|
||||||
|
new Bytes(Bytes.toBytes(PRIORITY));
|
||||||
|
|
||||||
|
/** Relative priority of the table used for rpc scheduling */
|
||||||
|
private static final int DEFAULT_PRIORITY = HConstants.NORMAL_QOS;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The below are ugly but better than creating them each time till we
|
* The below are ugly but better than creating them each time till we
|
||||||
* replace booleans being saved as Strings with plain booleans. Need a
|
* replace booleans being saved as Strings with plain booleans. Need a
|
||||||
|
@ -245,6 +252,7 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
|
||||||
DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name
|
DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name
|
||||||
DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION));
|
DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION));
|
||||||
DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED));
|
DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED));
|
||||||
|
DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY));
|
||||||
for (String s : DEFAULT_VALUES.keySet()) {
|
for (String s : DEFAULT_VALUES.keySet()) {
|
||||||
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s)));
|
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s)));
|
||||||
}
|
}
|
||||||
|
@ -1110,9 +1118,13 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
|
||||||
* Returns the configured replicas per region
|
* Returns the configured replicas per region
|
||||||
*/
|
*/
|
||||||
public int getRegionReplication() {
|
public int getRegionReplication() {
|
||||||
byte[] val = getValue(REGION_REPLICATION_KEY);
|
return getIntValue(REGION_REPLICATION_KEY, DEFAULT_REGION_REPLICATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getIntValue(Bytes key, int defaultVal) {
|
||||||
|
byte[] val = getValue(key);
|
||||||
if (val == null || val.length == 0) {
|
if (val == null || val.length == 0) {
|
||||||
return DEFAULT_REGION_REPLICATION;
|
return defaultVal;
|
||||||
}
|
}
|
||||||
return Integer.parseInt(Bytes.toString(val));
|
return Integer.parseInt(Bytes.toString(val));
|
||||||
}
|
}
|
||||||
|
@ -1152,6 +1164,15 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HTableDescriptor setPriority(int priority) {
|
||||||
|
setValue(PRIORITY_KEY, Integer.toString(priority));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPriority() {
|
||||||
|
return getIntValue(PRIORITY_KEY, DEFAULT_PRIORITY);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns all the column family names of the current table. The map of
|
* Returns all the column family names of the current table. The map of
|
||||||
* HTableDescriptor contains mapping of family name to HColumnDescriptors.
|
* HTableDescriptor contains mapping of family name to HColumnDescriptors.
|
||||||
|
|
|
@ -137,6 +137,12 @@ public enum EventType {
|
||||||
* Master asking RS to close meta.
|
* Master asking RS to close meta.
|
||||||
*/
|
*/
|
||||||
M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META),
|
M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META),
|
||||||
|
/**
|
||||||
|
* Messages originating from Master to RS.<br>
|
||||||
|
* M_RS_OPEN_PRIORITY_REGION<br>
|
||||||
|
* Master asking RS to open a priority region.
|
||||||
|
*/
|
||||||
|
M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Messages originating from Client to Master.<br>
|
* Messages originating from Client to Master.<br>
|
||||||
|
|
|
@ -47,7 +47,8 @@ public enum ExecutorType {
|
||||||
RS_PARALLEL_SEEK (26),
|
RS_PARALLEL_SEEK (26),
|
||||||
RS_LOG_REPLAY_OPS (27),
|
RS_LOG_REPLAY_OPS (27),
|
||||||
RS_REGION_REPLICA_FLUSH_OPS (28),
|
RS_REGION_REPLICA_FLUSH_OPS (28),
|
||||||
RS_COMPACTED_FILES_DISCHARGER (29);
|
RS_COMPACTED_FILES_DISCHARGER (29),
|
||||||
|
RS_OPEN_PRIORITY_REGION (30);
|
||||||
|
|
||||||
ExecutorType(int value) {}
|
ExecutorType(int value) {}
|
||||||
|
|
||||||
|
|
|
@ -308,4 +308,11 @@ public class TestHTableDescriptor {
|
||||||
hcd.setBlocksize(2000);
|
hcd.setBlocksize(2000);
|
||||||
htd.addFamily(hcd);
|
htd.addFamily(hcd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPriority() {
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table"));
|
||||||
|
htd.setPriority(42);
|
||||||
|
assertEquals(42, htd.getPriority());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,6 +119,10 @@ public class ExecutorService {
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
|
||||||
|
return getExecutor(type).getThreadPoolExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
public void startExecutorService(final ExecutorType type, final int maxThreads) {
|
public void startExecutorService(final ExecutorType type, final int maxThreads) {
|
||||||
String name = type.getExecutorName(this.servername);
|
String name = type.getExecutorName(this.servername);
|
||||||
|
@ -150,7 +154,7 @@ public class ExecutorService {
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor instance.
|
* Executor instance.
|
||||||
*/
|
*/
|
||||||
|
@ -187,7 +191,12 @@ public class ExecutorService {
|
||||||
// and after process methods.
|
// and after process methods.
|
||||||
this.threadPoolExecutor.execute(event);
|
this.threadPoolExecutor.execute(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TrackingThreadPoolExecutor getThreadPoolExecutor() {
|
||||||
|
return threadPoolExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + "-" + id + "-" + name;
|
return getClass().getSimpleName() + "-" + id + "-" + name;
|
||||||
}
|
}
|
||||||
|
@ -201,7 +210,7 @@ public class ExecutorService {
|
||||||
}
|
}
|
||||||
queuedEvents.add((EventHandler)r);
|
queuedEvents.add((EventHandler)r);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<RunningEventStatus> running = Lists.newArrayList();
|
List<RunningEventStatus> running = Lists.newArrayList();
|
||||||
for (Map.Entry<Thread, Runnable> e :
|
for (Map.Entry<Thread, Runnable> e :
|
||||||
threadPoolExecutor.getRunningTasks().entrySet()) {
|
threadPoolExecutor.getRunningTasks().entrySet()) {
|
||||||
|
@ -212,18 +221,18 @@ public class ExecutorService {
|
||||||
}
|
}
|
||||||
running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
|
running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ExecutorStatus(this, queuedEvents, running);
|
return new ExecutorStatus(this, queuedEvents, running);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A subclass of ThreadPoolExecutor that keeps track of the Runnables that
|
* A subclass of ThreadPoolExecutor that keeps track of the Runnables that
|
||||||
* are executing at any given point in time.
|
* are executing at any given point in time.
|
||||||
*/
|
*/
|
||||||
static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
|
static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
|
private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
|
||||||
|
|
||||||
public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
|
public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
|
||||||
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
|
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
|
||||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
||||||
|
@ -241,7 +250,7 @@ public class ExecutorService {
|
||||||
assert oldPut == null : "inconsistency for thread " + t;
|
assert oldPut == null : "inconsistency for thread " + t;
|
||||||
super.beforeExecute(t, r);
|
super.beforeExecute(t, r);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a map of the threads currently running tasks
|
* @return a map of the threads currently running tasks
|
||||||
* inside this executor. Each key is an active thread,
|
* inside this executor. Each key is an active thread,
|
||||||
|
@ -272,7 +281,7 @@ public class ExecutorService {
|
||||||
this.queuedEvents = queuedEvents;
|
this.queuedEvents = queuedEvents;
|
||||||
this.running = running;
|
this.running = running;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dump a textual representation of the executor's status
|
* Dump a textual representation of the executor's status
|
||||||
* to the given writer.
|
* to the given writer.
|
||||||
|
|
|
@ -1735,6 +1735,8 @@ public class HRegionServer extends HasThread implements
|
||||||
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
|
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
|
||||||
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
|
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
|
||||||
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
|
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
|
||||||
|
this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
|
||||||
|
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
|
||||||
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
|
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
|
||||||
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
|
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
|
||||||
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
||||||
|
|
|
@ -170,6 +170,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
@ -1725,8 +1726,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
} else {
|
} else {
|
||||||
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
|
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
|
||||||
regionOpenInfo.getFavoredNodesList());
|
regionOpenInfo.getFavoredNodesList());
|
||||||
regionServer.service.submit(new OpenRegionHandler(
|
if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
|
||||||
regionServer, regionServer, region, htd, masterSystemTime));
|
regionServer.service.submit(new OpenPriorityRegionHandler(
|
||||||
|
regionServer, regionServer, region, htd, masterSystemTime));
|
||||||
|
} else {
|
||||||
|
regionServer.service.submit(new OpenRegionHandler(
|
||||||
|
regionServer, regionServer, region, htd, masterSystemTime));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.regionserver.handler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles opening of a high priority region on a region server.
|
||||||
|
* <p>
|
||||||
|
* This is executed after receiving an OPEN RPC from the master or client.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class OpenPriorityRegionHandler extends OpenRegionHandler {
|
||||||
|
public OpenPriorityRegionHandler(Server server, RegionServerServices rsServices,
|
||||||
|
HRegionInfo regionInfo, HTableDescriptor htd, long masterSystemTime) {
|
||||||
|
super(server, rsServices, regionInfo, htd, masterSystemTime,
|
||||||
|
EventType.M_RS_OPEN_PRIORITY_REGION);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({MediumTests.class, RegionServerTests.class})
|
||||||
|
public class TestRegionOpen {
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestRegionOpen.class);
|
||||||
|
private static final int NB_SERVERS = 1;
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||||
|
final TableName tableName = TableName.valueOf(TestRegionOpen.class.getSimpleName());
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void before() throws Exception {
|
||||||
|
HTU.startMiniCluster(NB_SERVERS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
HTU.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HRegionServer getRS() {
|
||||||
|
return HTU.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testPriorityRegionIsOpenedWithSeparateThreadPool() throws Exception {
|
||||||
|
ThreadPoolExecutor exec = getRS().getExecutorService()
|
||||||
|
.getExecutorThreadPool(ExecutorType.RS_OPEN_PRIORITY_REGION);
|
||||||
|
|
||||||
|
assertEquals(0, exec.getCompletedTaskCount());
|
||||||
|
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
htd.setPriority(HConstants.HIGH_QOS);
|
||||||
|
htd.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||||
|
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
||||||
|
Admin admin = connection.getAdmin()) {
|
||||||
|
admin.createTable(htd);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(1, exec.getCompletedTaskCount());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue