HBASE-16095 Add priority to TableDescriptor and priority region open thread pool

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
This commit is contained in:
Enis Soztutar 2016-07-13 10:31:55 -07:00
parent 8cf6adae72
commit 09c7b1e962
9 changed files with 192 additions and 14 deletions

View File

@ -212,6 +212,13 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
public static final String PRIORITY = "PRIORITY";
private static final ImmutableBytesWritable PRIORITY_KEY =
new ImmutableBytesWritable(Bytes.toBytes(PRIORITY));
/** Relative priority of the table used for rpc scheduling */
private static final int DEFAULT_PRIORITY = HConstants.NORMAL_QOS;
/* /*
* The below are ugly but better than creating them each time till we * The below are ugly but better than creating them each time till we
* replace booleans being saved as Strings with plain booleans. Need a * replace booleans being saved as Strings with plain booleans. Need a
@ -265,6 +272,7 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name
DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION)); DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION));
DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED)); DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED));
DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY));
for (String s : DEFAULT_VALUES.keySet()) { for (String s : DEFAULT_VALUES.keySet()) {
RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s))); RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s)));
} }
@ -1211,9 +1219,13 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
* Returns the configured replicas per region * Returns the configured replicas per region
*/ */
public int getRegionReplication() { public int getRegionReplication() {
byte[] val = getValue(REGION_REPLICATION_KEY); return getIntValue(REGION_REPLICATION_KEY, DEFAULT_REGION_REPLICATION);
}
private int getIntValue(ImmutableBytesWritable key, int defaultVal) {
byte[] val = getValue(key);
if (val == null || val.length == 0) { if (val == null || val.length == 0) {
return DEFAULT_REGION_REPLICATION; return defaultVal;
} }
return Integer.parseInt(Bytes.toString(val)); return Integer.parseInt(Bytes.toString(val));
} }
@ -1253,6 +1265,15 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
return this; return this;
} }
public HTableDescriptor setPriority(int priority) {
setValue(PRIORITY_KEY, Integer.toString(priority));
return this;
}
public int getPriority() {
return getIntValue(PRIORITY_KEY, DEFAULT_PRIORITY);
}
/** /**
* Returns all the column family names of the current table. The map of * Returns all the column family names of the current table. The map of
* HTableDescriptor contains mapping of family name to HColumnDescriptors. * HTableDescriptor contains mapping of family name to HColumnDescriptors.

View File

@ -137,6 +137,12 @@ public enum EventType {
* Master asking RS to close meta. * Master asking RS to close meta.
*/ */
M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META), M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META),
/**
* Messages originating from Master to RS.<br>
* M_RS_OPEN_PRIORITY_REGION<br>
* Master asking RS to open a priority region.
*/
M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION),
/** /**
* Messages originating from Client to Master.<br> * Messages originating from Client to Master.<br>

View File

@ -47,7 +47,8 @@ public enum ExecutorType {
RS_PARALLEL_SEEK (26), RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27), RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28), RS_REGION_REPLICA_FLUSH_OPS (28),
RS_COMPACTED_FILES_DISCHARGER (29); RS_COMPACTED_FILES_DISCHARGER (29),
RS_OPEN_PRIORITY_REGION (30);
ExecutorType(int value) {} ExecutorType(int value) {}

View File

@ -307,4 +307,11 @@ public class TestHTableDescriptor {
hcd.setBlocksize(2000); hcd.setBlocksize(2000);
htd.addFamily(hcd); htd.addFamily(hcd);
} }
@Test
public void testPriority() {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table"));
htd.setPriority(42);
assertEquals(42, htd.getPriority());
}
} }

View File

@ -127,6 +127,10 @@ public class ExecutorService {
return executor; return executor;
} }
@VisibleForTesting
public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
return getExecutor(type).getThreadPoolExecutor();
}
public void startExecutorService(final ExecutorType type, final int maxThreads) { public void startExecutorService(final ExecutorType type, final int maxThreads) {
String name = type.getExecutorName(this.servername); String name = type.getExecutorName(this.servername);
@ -226,6 +230,11 @@ public class ExecutorService {
this.threadPoolExecutor.execute(event); this.threadPoolExecutor.execute(event);
} }
TrackingThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolExecutor;
}
@Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "-" + id + "-" + name; return getClass().getSimpleName() + "-" + id + "-" + name;
} }

View File

@ -1725,6 +1725,8 @@ public class HRegionServer extends HasThread implements
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_META, this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION, this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META, this.service.startExecutorService(ExecutorType.RS_CLOSE_META,

View File

@ -168,6 +168,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -1653,8 +1654,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else { } else {
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
regionOpenInfo.getFavoredNodesList()); regionOpenInfo.getFavoredNodesList());
regionServer.service.submit(new OpenRegionHandler( if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); regionServer.service.submit(new OpenPriorityRegionHandler(
regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
} else {
regionServer.service.submit(new OpenRegionHandler(
regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
}
} }
} }

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* Handles opening of a high priority region on a region server.
* <p>
* This is executed after receiving an OPEN RPC from the master or client.
*/
@InterfaceAudience.Private
public class OpenPriorityRegionHandler extends OpenRegionHandler {
public OpenPriorityRegionHandler(Server server, RegionServerServices rsServices,
HRegionInfo regionInfo, HTableDescriptor htd, long masterSystemTime,
OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) {
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_PRIORITY_REGION,
masterSystemTime, coordination, ord);
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MediumTests.class, RegionServerTests.class})
public class TestRegionOpen {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(TestRegionOpen.class);
private static final int NB_SERVERS = 1;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
final TableName tableName = TableName.valueOf(TestRegionOpen.class.getSimpleName());
@BeforeClass
public static void before() throws Exception {
HTU.startMiniCluster(NB_SERVERS);
}
@AfterClass
public static void afterClass() throws Exception {
HTU.shutdownMiniCluster();
}
private static HRegionServer getRS() {
return HTU.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
}
@Test(timeout = 60000)
public void testPriorityRegionIsOpenedWithSeparateThreadPool() throws Exception {
ThreadPoolExecutor exec = getRS().getExecutorService()
.getExecutorThreadPool(ExecutorType.RS_OPEN_PRIORITY_REGION);
assertEquals(1, exec.getCompletedTaskCount()); // namespace region
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.setPriority(HConstants.HIGH_QOS);
htd.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
admin.createTable(htd);
}
assertEquals(2, exec.getCompletedTaskCount());
}
}