HBASE-12936 Quota support for namespace should take region merge into account
This commit is contained in:
parent
7c8aa2e963
commit
6bfa8ea977
|
@ -2802,7 +2802,12 @@ public class AssignmentManager {
|
|||
errorMsg = onRegionMergePONR(current, hri, serverName, transition);
|
||||
break;
|
||||
case MERGED:
|
||||
errorMsg = onRegionMerged(current, hri, serverName, transition);
|
||||
try {
|
||||
errorMsg = onRegionMerged(current, hri, serverName, transition);
|
||||
regionStateListener.onRegionMerged(hri);
|
||||
} catch (IOException exp) {
|
||||
errorMsg = StringUtils.stringifyException(exp);
|
||||
}
|
||||
break;
|
||||
case MERGE_REVERTED:
|
||||
errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
|
||||
|
|
|
@ -108,7 +108,18 @@ public class NamespaceAuditor {
|
|||
throw new IOException(
|
||||
"Split operation is being performed even before namespace auditor is initialized.");
|
||||
} else if (!stateManager
|
||||
.checkAndUpdateNamespaceRegionCount(hri.getTable(), hri.getRegionName())) {
|
||||
.checkAndUpdateNamespaceRegionCount(hri.getTable(), hri.getRegionName(), 1)) {
|
||||
throw new QuotaExceededException("Region split not possible for :" + hri.getEncodedName()
|
||||
+ " as quota limits are exceeded ");
|
||||
}
|
||||
}
|
||||
|
||||
public void updateQuotaForRegionMerge(HRegionInfo hri) throws IOException {
|
||||
if (!stateManager.isInitialized()) {
|
||||
throw new IOException(
|
||||
"Merge operation is being performed even before namespace auditor is initialized.");
|
||||
} else if (!stateManager
|
||||
.checkAndUpdateNamespaceRegionCount(hri.getTable(), hri.getRegionName(), -1)) {
|
||||
throw new QuotaExceededException("Region split not possible for :" + hri.getEncodedName()
|
||||
+ " as quota limits are exceeded ");
|
||||
}
|
||||
|
|
|
@ -79,17 +79,19 @@ class NamespaceStateManager {
|
|||
*
|
||||
* @param TableName
|
||||
* @param regionName
|
||||
* @param incr
|
||||
* @return true, if region can be added to table.
|
||||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
synchronized boolean checkAndUpdateNamespaceRegionCount(TableName name,
|
||||
byte[] regionName) throws IOException {
|
||||
byte[] regionName, int incr) throws IOException {
|
||||
String namespace = name.getNamespaceAsString();
|
||||
NamespaceDescriptor nspdesc = getNamespaceDescriptor(namespace);
|
||||
if (nspdesc != null) {
|
||||
NamespaceTableAndRegionInfo currentStatus;
|
||||
currentStatus = getState(namespace);
|
||||
if (currentStatus.getRegionCount() >= TableNamespaceManager.getMaxRegions(nspdesc)) {
|
||||
if (incr > 0 &&
|
||||
currentStatus.getRegionCount() >= TableNamespaceManager.getMaxRegions(nspdesc)) {
|
||||
LOG.warn("The region " + Bytes.toStringBinary(regionName)
|
||||
+ " cannot be created. The region count will exceed quota on the namespace. "
|
||||
+ "This may be transient, please retry later if there are any ongoing split"
|
||||
|
@ -98,7 +100,7 @@ class NamespaceStateManager {
|
|||
}
|
||||
NamespaceTableAndRegionInfo nsInfo = nsStateCache.get(namespace);
|
||||
if (nsInfo != null) {
|
||||
nsInfo.incRegionCountForTable(name, 1);
|
||||
nsInfo.incRegionCountForTable(name, incr);
|
||||
} else {
|
||||
LOG.warn("Namespace state found null for namespace : " + namespace);
|
||||
}
|
||||
|
|
|
@ -313,6 +313,12 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
}
|
||||
}
|
||||
|
||||
public void onRegionMerged(HRegionInfo hri) throws IOException {
|
||||
if (enabled) {
|
||||
namespaceQuotaManager.updateQuotaForRegionMerge(hri);
|
||||
}
|
||||
}
|
||||
|
||||
public void onRegionSplit(HRegionInfo hri) throws IOException {
|
||||
if (enabled) {
|
||||
namespaceQuotaManager.checkQuotaToSplitRegion(hri);
|
||||
|
|
|
@ -44,4 +44,11 @@ public interface RegionStateListener {
|
|||
*/
|
||||
void onRegionSplitReverted(HRegionInfo hri) throws IOException;
|
||||
|
||||
/**
|
||||
* Process region merge event.
|
||||
*
|
||||
* @param hri An instance of HRegionInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
void onRegionMerged(HRegionInfo hri) throws IOException;
|
||||
}
|
||||
|
|
|
@ -31,13 +31,17 @@ import java.util.concurrent.CountDownLatch;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
|
@ -47,12 +51,19 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.TestRegionServerObserver.CPRegionServerObserver;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
|
||||
import org.apache.hadoop.hbase.master.TableNamespaceManager;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -74,7 +85,10 @@ public class TestNamespaceAuditor {
|
|||
public static void before() throws Exception {
|
||||
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
CustomObserver.class.getName());
|
||||
UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class,
|
||||
RegionServerObserver.class);
|
||||
UTIL.startMiniCluster(1, 3);
|
||||
UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
|
@ -242,6 +256,84 @@ public class TestNamespaceAuditor {
|
|||
assertNull("Namespace state not found to be null.", stateInfo);
|
||||
}
|
||||
|
||||
public static class CPRegionServerObserver extends BaseRegionServerObserver {
|
||||
private boolean shouldFailMerge = false;
|
||||
|
||||
public void failMerge(boolean fail) {
|
||||
shouldFailMerge = fail;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
|
||||
HRegion regionB) throws IOException {
|
||||
if (shouldFailMerge) {
|
||||
throw new IOException("fail merge");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionMerge() throws Exception {
|
||||
String nsp1 = prefix + "_regiontest";
|
||||
NamespaceDescriptor nspDesc = NamespaceDescriptor.create(nsp1)
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "3")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
|
||||
admin.createNamespace(nspDesc);
|
||||
final TableName tableTwo = TableName.valueOf(nsp1 + TableName.NAMESPACE_DELIM + "table2");
|
||||
byte[] columnFamily = Bytes.toBytes("info");
|
||||
HTableDescriptor tableDescOne = new HTableDescriptor(tableTwo);
|
||||
tableDescOne.addFamily(new HColumnDescriptor(columnFamily));
|
||||
NamespaceTableAndRegionInfo stateInfo;
|
||||
final int initialRegions = 3;
|
||||
admin.createTable(tableDescOne, Bytes.toBytes("1"), Bytes.toBytes("2000"), initialRegions);
|
||||
Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||
HTable htable = (HTable)connection.getTable(tableTwo);
|
||||
UTIL.loadNumericRows(htable, Bytes.toBytes("info"), 1, 1000);
|
||||
admin.flush(tableTwo);
|
||||
stateInfo = getNamespaceState(nsp1);
|
||||
List<HRegionInfo> hris = admin.getTableRegions(tableTwo);
|
||||
// merge the two regions
|
||||
admin.mergeRegions(hris.get(0).getEncodedNameAsBytes(),
|
||||
hris.get(1).getEncodedNameAsBytes(), false);
|
||||
while (admin.getTableRegions(tableTwo).size() == initialRegions) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
hris = admin.getTableRegions(tableTwo);
|
||||
assertEquals(initialRegions-1, hris.size());
|
||||
|
||||
HRegion actualRegion = UTIL.getHBaseCluster().getRegions(tableTwo).get(0);
|
||||
byte[] splitKey = getSplitKey(actualRegion.getStartKey(), actualRegion.getEndKey());
|
||||
admin.split(tableTwo, Bytes.toBytes("500"));
|
||||
while (admin.getTableRegions(tableTwo).size() != initialRegions) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertEquals(initialRegions, admin.getTableRegions(tableTwo).size());
|
||||
|
||||
// fail region merge through Coprocessor hook
|
||||
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
|
||||
hris = admin.getTableRegions(tableTwo);
|
||||
HRegionServer regionServer = cluster.getRegionServer(
|
||||
cluster.getServerWith(hris.get(0).getRegionName()));
|
||||
RegionServerCoprocessorHost cpHost = regionServer.getRegionServerCoprocessorHost();
|
||||
Coprocessor coprocessor = cpHost.findCoprocessor(CPRegionServerObserver.class.getName());
|
||||
CPRegionServerObserver regionServerObserver = (CPRegionServerObserver) coprocessor;
|
||||
regionServerObserver.failMerge(true);
|
||||
admin.mergeRegions(hris.get(0).getEncodedNameAsBytes(),
|
||||
hris.get(1).getEncodedNameAsBytes(), false);
|
||||
assertEquals(initialRegions, admin.getTableRegions(tableTwo).size());
|
||||
// verify that we cannot split
|
||||
actualRegion = UTIL.getHBaseCluster().getRegions(tableTwo).get(0);
|
||||
admin.split(tableTwo, TableInputFormatBase.getSplitKey(actualRegion.getStartKey(),
|
||||
actualRegion.getEndKey(), true));
|
||||
while (admin.getTableRegions(tableTwo).size() != initialRegions) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertEquals(initialRegions, admin.getTableRegions(tableTwo).size());
|
||||
regionServerObserver.failMerge(true);
|
||||
|
||||
htable.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionOperations() throws Exception {
|
||||
String nsp1 = prefix + "_regiontest";
|
||||
|
@ -289,8 +381,10 @@ public class TestNamespaceAuditor {
|
|||
admin.split(tableOne, getSplitKey(actualRegion.getStartKey(), actualRegion.getEndKey()));
|
||||
observer.postSplit.await();
|
||||
// Make sure no regions have been added.
|
||||
assertEquals(2, admin.getTableRegions(tableOne).size());
|
||||
List<HRegionInfo> hris = admin.getTableRegions(tableOne);
|
||||
assertEquals(2, hris.size());
|
||||
assertTrue("split completed", observer.preSplitBeforePONR.getCount() == 1);
|
||||
|
||||
htable.close();
|
||||
}
|
||||
|
||||
|
@ -313,6 +407,7 @@ public class TestNamespaceAuditor {
|
|||
public static class CustomObserver extends BaseRegionObserver{
|
||||
volatile CountDownLatch postSplit;
|
||||
volatile CountDownLatch preSplitBeforePONR;
|
||||
|
||||
@Override
|
||||
public void postCompleteSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
|
|
Loading…
Reference in New Issue