HBASE-13438 [branch-1] Backport Basic quota support for namespaces (Ashish Singhi)
This commit is contained in:
parent
bcd5c4d137
commit
daa82b9fd9
|
@ -45,7 +45,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -55,6 +54,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
|
@ -63,9 +63,8 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableStateManager;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
|
||||
|
@ -90,11 +89,11 @@ import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.quotas.RegionStateListener;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.util.ConfigUtil;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||
|
@ -102,11 +101,13 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.zookeeper.AsyncCallback;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
|
@ -255,6 +256,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
/** Listeners that are called on assignment events. */
|
||||
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
|
||||
|
||||
private RegionStateListener regionStateListener;
|
||||
|
||||
/**
|
||||
* Constructs a new assignment manager.
|
||||
*
|
||||
|
@ -4211,17 +4214,33 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
break;
|
||||
|
||||
case READY_TO_SPLIT:
|
||||
try {
|
||||
regionStateListener.onRegionSplit(hri);
|
||||
} catch (IOException exp) {
|
||||
errorMsg = StringUtils.stringifyException(exp);
|
||||
}
|
||||
case SPLIT_PONR:
|
||||
case SPLIT:
|
||||
case SPLIT_REVERTED:
|
||||
errorMsg = onRegionSplit(serverName, code, hri,
|
||||
HRegionInfo.convert(transition.getRegionInfo(1)),
|
||||
errorMsg =
|
||||
onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
|
||||
HRegionInfo.convert(transition.getRegionInfo(2)));
|
||||
if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
|
||||
try {
|
||||
regionStateListener.onRegionSplitReverted(hri);
|
||||
} catch (IOException exp) {
|
||||
LOG.warn(StringUtils.stringifyException(exp));
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case READY_TO_MERGE:
|
||||
case MERGE_PONR:
|
||||
case MERGED:
|
||||
try {
|
||||
regionStateListener.onRegionMerged(hri);
|
||||
} catch (IOException exp) {
|
||||
errorMsg = StringUtils.stringifyException(exp);
|
||||
}
|
||||
case MERGE_REVERTED:
|
||||
errorMsg = onRegionMerge(serverName, code, hri,
|
||||
HRegionInfo.convert(transition.getRegionInfo(1)),
|
||||
|
@ -4249,4 +4268,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
|
||||
return getRegionStates().getRegionAssignments(infos);
|
||||
}
|
||||
|
||||
void setRegionStateListener(RegionStateListener listener) {
|
||||
this.regionStateListener = listener;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionStateListener;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
|
@ -725,9 +726,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
status.setStatus("Starting namespace manager");
|
||||
initNamespace();
|
||||
|
||||
status.setStatus("Starting quota manager");
|
||||
initQuotaManager();
|
||||
|
||||
if (this.cpHost != null) {
|
||||
try {
|
||||
this.cpHost.preMasterInitialization();
|
||||
|
@ -741,6 +739,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
configurationManager.registerObserver(this.balancer);
|
||||
initialized = true;
|
||||
|
||||
status.setStatus("Starting quota manager");
|
||||
initQuotaManager();
|
||||
|
||||
// assign the meta replicas
|
||||
Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
|
||||
int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
|
||||
|
@ -769,6 +770,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
|
||||
private void initQuotaManager() throws IOException {
|
||||
quotaManager = new MasterQuotaManager(this);
|
||||
this.assignmentManager.setRegionStateListener((RegionStateListener) quotaManager);
|
||||
quotaManager.start();
|
||||
}
|
||||
|
||||
|
@ -1324,6 +1326,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
HRegionInfo[] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
|
||||
checkInitialized();
|
||||
sanityCheckTableDescriptor(hTableDescriptor);
|
||||
this.quotaManager.checkNamespaceTableAndRegionQuota(hTableDescriptor.getTableName(),
|
||||
newRegions.length);
|
||||
if (cpHost != null) {
|
||||
cpHost.preCreateTable(hTableDescriptor, newRegions);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
|
@ -71,6 +73,9 @@ public class TableNamespaceManager {
|
|||
private ZKNamespaceManager zkNamespaceManager;
|
||||
private boolean initialized;
|
||||
|
||||
public static final String KEY_MAX_REGIONS = "hbase.namespace.quota.maxregions";
|
||||
public static final String KEY_MAX_TABLES = "hbase.namespace.quota.maxtables";
|
||||
|
||||
static final String NS_INIT_TIMEOUT = "hbase.master.namespace.init.timeout";
|
||||
static final int DEFAULT_NS_INIT_TIMEOUT = 300000;
|
||||
|
||||
|
@ -149,13 +154,18 @@ public class TableNamespaceManager {
|
|||
if (get(table, ns.getName()) != null) {
|
||||
throw new NamespaceExistException(ns.getName());
|
||||
}
|
||||
validateTableAndRegionCount(ns);
|
||||
FileSystem fs = masterServices.getMasterFileSystem().getFileSystem();
|
||||
fs.mkdirs(FSUtils.getNamespaceDir(
|
||||
masterServices.getMasterFileSystem().getRootDir(), ns.getName()));
|
||||
upsert(table, ns);
|
||||
if (this.masterServices.isInitialized()) {
|
||||
this.masterServices.getMasterQuotaManager().setNamespaceQuota(ns);
|
||||
}
|
||||
}
|
||||
|
||||
private void upsert(Table table, NamespaceDescriptor ns) throws IOException {
|
||||
validateTableAndRegionCount(ns);
|
||||
Put p = new Put(Bytes.toBytes(ns.getName()));
|
||||
p.addImmutable(HTableDescriptor.NAMESPACE_FAMILY_INFO_BYTES,
|
||||
HTableDescriptor.NAMESPACE_COL_DESC_BYTES,
|
||||
|
@ -204,6 +214,7 @@ public class TableNamespaceManager {
|
|||
masterServices.getMasterFileSystem().getRootDir(), name), true)) {
|
||||
throw new IOException("Failed to remove namespace: "+name);
|
||||
}
|
||||
this.masterServices.getMasterQuotaManager().removeNamespaceQuota(name);
|
||||
}
|
||||
|
||||
public synchronized NavigableSet<NamespaceDescriptor> list() throws IOException {
|
||||
|
@ -300,4 +311,47 @@ public class TableNamespaceManager {
|
|||
return !masterServices.getAssignmentManager().getRegionStates().
|
||||
getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME).isEmpty();
|
||||
}
|
||||
|
||||
void validateTableAndRegionCount(NamespaceDescriptor desc) throws IOException {
|
||||
if (getMaxRegions(desc) <= 0) {
|
||||
throw new ConstraintException("The max region quota for " + desc.getName()
|
||||
+ " is less than or equal to zero.");
|
||||
}
|
||||
if (getMaxTables(desc) <= 0) {
|
||||
throw new ConstraintException("The max tables quota for " + desc.getName()
|
||||
+ " is less than or equal to zero.");
|
||||
}
|
||||
}
|
||||
|
||||
public static long getMaxTables(NamespaceDescriptor ns) throws IOException {
|
||||
String value = ns.getConfigurationValue(KEY_MAX_TABLES);
|
||||
long maxTables = 0;
|
||||
if (StringUtils.isNotEmpty(value)) {
|
||||
try {
|
||||
maxTables = Long.parseLong(value);
|
||||
} catch (NumberFormatException exp) {
|
||||
throw new DoNotRetryIOException("NumberFormatException while getting max tables.", exp);
|
||||
}
|
||||
} else {
|
||||
// The property is not set, so assume its the max long value.
|
||||
maxTables = Long.MAX_VALUE;
|
||||
}
|
||||
return maxTables;
|
||||
}
|
||||
|
||||
public static long getMaxRegions(NamespaceDescriptor ns) throws IOException {
|
||||
String value = ns.getConfigurationValue(KEY_MAX_REGIONS);
|
||||
long maxRegions = 0;
|
||||
if (StringUtils.isNotEmpty(value)) {
|
||||
try {
|
||||
maxRegions = Long.parseLong(value);
|
||||
} catch (NumberFormatException exp) {
|
||||
throw new DoNotRetryIOException("NumberFormatException while getting max regions.", exp);
|
||||
}
|
||||
} else {
|
||||
// The property is not set, so assume its the max long value.
|
||||
maxRegions = Long.MAX_VALUE;
|
||||
}
|
||||
return maxRegions;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -186,9 +186,9 @@ public class CreateTableHandler extends EventHandler {
|
|||
public void process() {
|
||||
TableName tableName = this.hTableDescriptor.getTableName();
|
||||
LOG.info("Create table " + tableName);
|
||||
|
||||
HMaster master = ((HMaster) this.server);
|
||||
try {
|
||||
final MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost();
|
||||
final MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions);
|
||||
}
|
||||
|
@ -205,6 +205,14 @@ public class CreateTableHandler extends EventHandler {
|
|||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Error trying to create the table " + tableName, e);
|
||||
if (master.isInitialized()) {
|
||||
try {
|
||||
((HMaster) this.server).getMasterQuotaManager().removeTableFromNamespaceQuota(
|
||||
hTableDescriptor.getTableName());
|
||||
} catch (IOException e1) {
|
||||
LOG.error("Error trying to update namespace quota " + e1);
|
||||
}
|
||||
}
|
||||
completed(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,6 +133,7 @@ public class DeleteTableHandler extends TableEventHandler {
|
|||
if (cpHost != null) {
|
||||
cpHost.postDeleteTableHandler(this.tableName);
|
||||
}
|
||||
((HMaster) this.server).getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.namespace;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The Class NamespaceAuditor performs checks to ensure operations like table creation and region
|
||||
* splitting preserve namespace quota. The namespace quota can be specified while namespace
|
||||
* creation.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class NamespaceAuditor {
|
||||
private static Log LOG = LogFactory.getLog(NamespaceAuditor.class);
|
||||
static final String NS_AUDITOR_INIT_TIMEOUT = "hbase.namespace.auditor.init.timeout";
|
||||
static final int DEFAULT_NS_AUDITOR_INIT_TIMEOUT = 120000;
|
||||
private NamespaceStateManager stateManager;
|
||||
private MasterServices masterServices;
|
||||
|
||||
public NamespaceAuditor(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
stateManager = new NamespaceStateManager(masterServices, masterServices.getZooKeeper());
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
stateManager.start();
|
||||
LOG.info("NamespaceAuditor started.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Check quota to create table. We add the table information to namespace state cache, assuming
|
||||
* the operation will pass. If the operation fails, then the next time namespace state chore runs
|
||||
* namespace state cache will be corrected.
|
||||
* @param tName - The table name to check quota.
|
||||
* @param regions - Number of regions that will be added.
|
||||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
public void checkQuotaToCreateTable(TableName tName, int regions) throws IOException {
|
||||
if (stateManager.isInitialized()) {
|
||||
// We do this check to fail fast.
|
||||
if (MetaTableAccessor.tableExists(this.masterServices.getConnection(), tName)) {
|
||||
throw new TableExistsException(tName);
|
||||
}
|
||||
stateManager.checkAndUpdateNamespaceTableCount(tName, regions);
|
||||
} else {
|
||||
checkTableTypeAndThrowException(tName);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkTableTypeAndThrowException(TableName name) throws IOException {
|
||||
if (name.isSystemTable()) {
|
||||
LOG.debug("Namespace auditor checks not performed for table " + name.getNameAsString());
|
||||
} else {
|
||||
throw new HBaseIOException(name
|
||||
+ " is being created even before namespace auditor has been initialized.");
|
||||
}
|
||||
}
|
||||
|
||||
public void checkQuotaToSplitRegion(HRegionInfo hri) throws IOException {
|
||||
if (!stateManager.isInitialized()) {
|
||||
throw new IOException(
|
||||
"Split operation is being performed even before namespace auditor is initialized.");
|
||||
} else if (!stateManager.checkAndUpdateNamespaceRegionCount(hri.getTable(),
|
||||
hri.getRegionName(), 1)) {
|
||||
throw new QuotaExceededException("Region split not possible for :" + hri.getEncodedName()
|
||||
+ " as quota limits are exceeded ");
|
||||
}
|
||||
}
|
||||
|
||||
public void updateQuotaForRegionMerge(HRegionInfo hri) throws IOException {
|
||||
if (!stateManager.isInitialized()) {
|
||||
throw new IOException(
|
||||
"Merge operation is being performed even before namespace auditor is initialized.");
|
||||
} else if (!stateManager
|
||||
.checkAndUpdateNamespaceRegionCount(hri.getTable(), hri.getRegionName(), -1)) {
|
||||
throw new QuotaExceededException("Region split not possible for :" + hri.getEncodedName()
|
||||
+ " as quota limits are exceeded ");
|
||||
}
|
||||
}
|
||||
|
||||
public void addNamespace(NamespaceDescriptor ns) throws IOException {
|
||||
stateManager.addNamespace(ns.getName());
|
||||
}
|
||||
|
||||
public void deleteNamespace(String namespace) throws IOException {
|
||||
stateManager.deleteNamespace(namespace);
|
||||
}
|
||||
|
||||
public void removeFromNamespaceUsage(TableName tableName) throws IOException {
|
||||
stateManager.removeTable(tableName);
|
||||
}
|
||||
|
||||
public void removeRegionFromNamespaceUsage(HRegionInfo hri) throws IOException {
|
||||
stateManager.removeRegionFromTable(hri);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used only for unit tests.
|
||||
* @param namespace The name of the namespace
|
||||
* @return An instance of NamespaceTableAndRegionInfo
|
||||
*/
|
||||
@VisibleForTesting
|
||||
NamespaceTableAndRegionInfo getState(String namespace) {
|
||||
if (stateManager.isInitialized()) {
|
||||
return stateManager.getState(namespace);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if namespace auditor is initialized. Used only for testing.
|
||||
* @return true, if is initialized
|
||||
*/
|
||||
public boolean isInitialized() {
|
||||
return stateManager.isInitialized();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,273 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.namespace;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.TableNamespaceManager;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* NamespaceStateManager manages state (in terms of quota) of all the namespaces. It contains a
|
||||
* cache which is updated based on the hooks in the NamespaceAuditor class.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class NamespaceStateManager extends ZooKeeperListener {
|
||||
|
||||
private static Log LOG = LogFactory.getLog(NamespaceStateManager.class);
|
||||
private ConcurrentMap<String, NamespaceTableAndRegionInfo> nsStateCache;
|
||||
private MasterServices master;
|
||||
private volatile boolean initialized = false;
|
||||
|
||||
public NamespaceStateManager(MasterServices masterServices, ZooKeeperWatcher zkw) {
|
||||
super(zkw);
|
||||
nsStateCache = new ConcurrentHashMap<String, NamespaceTableAndRegionInfo>();
|
||||
master = masterServices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the NamespaceStateManager. The boot strap of cache is done in the post master start hook
|
||||
* of the NamespaceAuditor class.
|
||||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
public void start() throws IOException {
|
||||
LOG.info("Namespace State Manager started.");
|
||||
initialize();
|
||||
watcher.registerListenerFirst(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an instance of NamespaceTableAndRegionInfo associated with namespace.
|
||||
* @param The name of the namespace
|
||||
* @return An instance of NamespaceTableAndRegionInfo.
|
||||
*/
|
||||
public NamespaceTableAndRegionInfo getState(String name) {
|
||||
return nsStateCache.get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if adding a region violates namespace quota, if not update namespace cache.
|
||||
* @param TableName
|
||||
* @param regionName
|
||||
* @param incr
|
||||
* @return true, if region can be added to table.
|
||||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
synchronized boolean checkAndUpdateNamespaceRegionCount(TableName name, byte[] regionName,
|
||||
int incr) throws IOException {
|
||||
String namespace = name.getNamespaceAsString();
|
||||
NamespaceDescriptor nspdesc = getNamespaceDescriptor(namespace);
|
||||
if (nspdesc != null) {
|
||||
NamespaceTableAndRegionInfo currentStatus;
|
||||
currentStatus = getState(namespace);
|
||||
if (incr > 0
|
||||
&& currentStatus.getRegionCount() >= TableNamespaceManager.getMaxRegions(nspdesc)) {
|
||||
LOG.warn("The region " + Bytes.toStringBinary(regionName)
|
||||
+ " cannot be created. The region count will exceed quota on the namespace. "
|
||||
+ "This may be transient, please retry later if there are any ongoing split"
|
||||
+ " operations in the namespace.");
|
||||
return false;
|
||||
}
|
||||
NamespaceTableAndRegionInfo nsInfo = nsStateCache.get(namespace);
|
||||
if (nsInfo != null) {
|
||||
nsInfo.incRegionCountForTable(name, incr);
|
||||
} else {
|
||||
LOG.warn("Namespace state found null for namespace : " + namespace);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private NamespaceDescriptor getNamespaceDescriptor(String namespaceAsString) {
|
||||
try {
|
||||
return this.master.getNamespaceDescriptor(namespaceAsString);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while fetching namespace descriptor for namespace : " + namespaceAsString);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void checkAndUpdateNamespaceTableCount(TableName table, int numRegions)
|
||||
throws IOException {
|
||||
String namespace = table.getNamespaceAsString();
|
||||
NamespaceDescriptor nspdesc = getNamespaceDescriptor(namespace);
|
||||
if (nspdesc != null) {
|
||||
NamespaceTableAndRegionInfo currentStatus;
|
||||
currentStatus = getState(nspdesc.getName());
|
||||
if ((currentStatus.getTables().size()) >= TableNamespaceManager.getMaxTables(nspdesc)) {
|
||||
throw new QuotaExceededException("The table " + table.getNameAsString()
|
||||
+ "cannot be created as it would exceed maximum number of tables allowed "
|
||||
+ " in the namespace. The total number of tables permitted is "
|
||||
+ TableNamespaceManager.getMaxTables(nspdesc));
|
||||
}
|
||||
if ((currentStatus.getRegionCount() + numRegions) > TableNamespaceManager
|
||||
.getMaxRegions(nspdesc)) {
|
||||
throw new QuotaExceededException("The table " + table.getNameAsString()
|
||||
+ " is not allowed to have " + numRegions
|
||||
+ " regions. The total number of regions permitted is only "
|
||||
+ TableNamespaceManager.getMaxRegions(nspdesc) + ", while current region count is "
|
||||
+ currentStatus.getRegionCount()
|
||||
+ ". This may be transient, please retry later if there are any"
|
||||
+ " ongoing split operations in the namespace.");
|
||||
}
|
||||
} else {
|
||||
throw new IOException("Namespace Descriptor found null for " + namespace
|
||||
+ " This is unexpected.");
|
||||
}
|
||||
addTable(table, numRegions);
|
||||
}
|
||||
|
||||
NamespaceTableAndRegionInfo addNamespace(String namespace) {
|
||||
if (!nsStateCache.containsKey(namespace)) {
|
||||
NamespaceTableAndRegionInfo a1 = new NamespaceTableAndRegionInfo(namespace);
|
||||
nsStateCache.put(namespace, a1);
|
||||
}
|
||||
return nsStateCache.get(namespace);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the namespace state.
|
||||
* @param An instance of NamespaceTableAndRegionInfo
|
||||
*/
|
||||
void deleteNamespace(String namespace) {
|
||||
this.nsStateCache.remove(namespace);
|
||||
}
|
||||
|
||||
private void addTable(TableName tableName, int regionCount) throws IOException {
|
||||
NamespaceTableAndRegionInfo info = nsStateCache.get(tableName.getNamespaceAsString());
|
||||
if (info != null) {
|
||||
info.addTable(tableName, regionCount);
|
||||
} else {
|
||||
throw new IOException("Bad state : Namespace quota information not found for namespace : "
|
||||
+ tableName.getNamespaceAsString());
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeTable(TableName tableName) {
|
||||
NamespaceTableAndRegionInfo info = nsStateCache.get(tableName.getNamespaceAsString());
|
||||
if (info != null) {
|
||||
info.removeTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize namespace state cache by scanning meta table.
|
||||
*/
|
||||
private void initialize() throws IOException {
|
||||
List<NamespaceDescriptor> namespaces = this.master.listNamespaceDescriptors();
|
||||
for (NamespaceDescriptor namespace : namespaces) {
|
||||
addNamespace(namespace.getName());
|
||||
List<TableName> tables = this.master.listTableNamesByNamespace(namespace.getName());
|
||||
for (TableName table : tables) {
|
||||
if (table.isSystemTable()) {
|
||||
continue;
|
||||
}
|
||||
int regionCount = 0;
|
||||
Map<HRegionInfo, ServerName> regions =
|
||||
MetaScanner.allTableRegions(this.master.getConnection(), table);
|
||||
for (HRegionInfo info : regions.keySet()) {
|
||||
if (!info.isSplit()) {
|
||||
regionCount++;
|
||||
}
|
||||
}
|
||||
addTable(table, regionCount);
|
||||
}
|
||||
}
|
||||
LOG.info("Finished updating state of " + nsStateCache.size() + " namespaces. ");
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
boolean isInitialized() {
|
||||
return initialized;
|
||||
}
|
||||
|
||||
public synchronized void removeRegionFromTable(HRegionInfo hri) throws IOException {
|
||||
String namespace = hri.getTable().getNamespaceAsString();
|
||||
NamespaceTableAndRegionInfo nsInfo = nsStateCache.get(namespace);
|
||||
if (nsInfo != null) {
|
||||
nsInfo.decrementRegionCountForTable(hri.getTable(), 1);
|
||||
} else {
|
||||
throw new IOException("Namespace state found null for namespace : " + namespace);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeCreated(String path) {
|
||||
checkSplittingOrMergingNode(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeChildrenChanged(String path) {
|
||||
checkSplittingOrMergingNode(path);
|
||||
}
|
||||
|
||||
private void checkSplittingOrMergingNode(String path) {
|
||||
String msg = "Error reading data from zookeeper, ";
|
||||
try {
|
||||
if (path.startsWith(watcher.assignmentZNode)) {
|
||||
List<String> children =
|
||||
ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode);
|
||||
if (children != null) {
|
||||
for (String child : children) {
|
||||
Stat stat = new Stat();
|
||||
byte[] data =
|
||||
ZKAssign.getDataAndWatch(watcher, ZKUtil.joinZNode(watcher.assignmentZNode, child),
|
||||
stat);
|
||||
if (data != null) {
|
||||
RegionTransition rt = RegionTransition.parseFrom(data);
|
||||
if (rt.getEventType().equals(EventType.RS_ZK_REQUEST_REGION_SPLIT)) {
|
||||
TableName table = HRegionInfo.getTable(rt.getRegionName());
|
||||
if (!checkAndUpdateNamespaceRegionCount(table, rt.getRegionName(), 1)) {
|
||||
ZKUtil.deleteNode(watcher, ZKUtil.joinZNode(watcher.assignmentZNode, child));
|
||||
}
|
||||
} else if (rt.getEventType().equals(EventType.RS_ZK_REQUEST_REGION_MERGE)) {
|
||||
TableName table = HRegionInfo.getTable(rt.getRegionName());
|
||||
checkAndUpdateNamespaceRegionCount(table, rt.getRegionName(), -1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
LOG.error(msg, ke);
|
||||
watcher.abort(msg, ke);
|
||||
} catch (DeserializationException e) {
|
||||
LOG.error(msg, e);
|
||||
watcher.abort(msg, e);
|
||||
} catch (IOException e) {
|
||||
LOG.error(msg, e);
|
||||
watcher.abort(msg, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namespace;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* NamespaceTableAndRegionInfo is a helper class that contains information about current state of
|
||||
* tables and regions in a namespace.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class NamespaceTableAndRegionInfo {
|
||||
private String name;
|
||||
private Map<TableName, AtomicInteger> tableAndRegionInfo;
|
||||
|
||||
public NamespaceTableAndRegionInfo(String namespace) {
|
||||
this.name = namespace;
|
||||
this.tableAndRegionInfo = new HashMap<TableName, AtomicInteger>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the name of the namespace.
|
||||
* @return name of the namespace.
|
||||
*/
|
||||
String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the set of table names belonging to namespace.
|
||||
* @return A set of table names.
|
||||
*/
|
||||
synchronized Set<TableName> getTables() {
|
||||
return this.tableAndRegionInfo.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the total number of regions in namespace.
|
||||
* @return the region count
|
||||
*/
|
||||
synchronized int getRegionCount() {
|
||||
int regionCount = 0;
|
||||
for (Entry<TableName, AtomicInteger> entry : this.tableAndRegionInfo.entrySet()) {
|
||||
regionCount = regionCount + entry.getValue().get();
|
||||
}
|
||||
return regionCount;
|
||||
}
|
||||
|
||||
synchronized int getRegionCountOfTable(TableName tableName) {
|
||||
if (tableAndRegionInfo.containsKey(tableName)) {
|
||||
return this.tableAndRegionInfo.get(tableName).get();
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized boolean containsTable(TableName tableName) {
|
||||
return this.tableAndRegionInfo.containsKey(tableName);
|
||||
}
|
||||
|
||||
synchronized void addTable(TableName tableName, int regionCount) {
|
||||
if (!name.equalsIgnoreCase(tableName.getNamespaceAsString())) {
|
||||
throw new IllegalStateException("Table : " + tableName + " does not belong to namespace "
|
||||
+ name);
|
||||
}
|
||||
if (!tableAndRegionInfo.containsKey(tableName)) {
|
||||
tableAndRegionInfo.put(tableName, new AtomicInteger(regionCount));
|
||||
} else {
|
||||
throw new IllegalStateException("Table already in the cache " + tableName);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeTable(TableName tableName) {
|
||||
tableAndRegionInfo.remove(tableName);
|
||||
}
|
||||
|
||||
synchronized int incRegionCountForTable(TableName tableName, int count) {
|
||||
return tableAndRegionInfo.get(tableName).addAndGet(count);
|
||||
}
|
||||
|
||||
synchronized int decrementRegionCountForTable(TableName tableName, int count) {
|
||||
return tableAndRegionInfo.get(tableName).decrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
Joiner.MapJoiner mapJoiner = Joiner.on(',').withKeyValueSeparator("=");
|
||||
return "NamespaceTableAndRegionInfo [name=" + name + ", tableAndRegionInfo="
|
||||
+ mapJoiner.join(tableAndRegionInfo) + "]";
|
||||
}
|
||||
}
|
|
@ -19,11 +19,13 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
|
||||
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||
|
@ -40,7 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class MasterQuotaManager {
|
||||
public class MasterQuotaManager implements RegionStateListener {
|
||||
private static final Log LOG = LogFactory.getLog(MasterQuotaManager.class);
|
||||
|
||||
private final MasterServices masterServices;
|
||||
|
@ -48,6 +50,7 @@ public class MasterQuotaManager {
|
|||
private NamedLock<TableName> tableLocks;
|
||||
private NamedLock<String> userLocks;
|
||||
private boolean enabled = false;
|
||||
private NamespaceAuditor namespaceQuotaManager;
|
||||
|
||||
public MasterQuotaManager(final MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
|
@ -72,6 +75,8 @@ public class MasterQuotaManager {
|
|||
tableLocks = new NamedLock<TableName>();
|
||||
userLocks = new NamedLock<String>();
|
||||
|
||||
namespaceQuotaManager = new NamespaceAuditor(masterServices);
|
||||
namespaceQuotaManager.start();
|
||||
enabled = true;
|
||||
}
|
||||
|
||||
|
@ -79,7 +84,7 @@ public class MasterQuotaManager {
|
|||
}
|
||||
|
||||
public boolean isQuotaEnabled() {
|
||||
return enabled;
|
||||
return enabled && namespaceQuotaManager.isInitialized();
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -276,6 +281,18 @@ public class MasterQuotaManager {
|
|||
});
|
||||
}
|
||||
|
||||
public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
|
||||
if (enabled) {
|
||||
this.namespaceQuotaManager.addNamespace(desc);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeNamespaceQuota(String namespace) throws IOException {
|
||||
if (enabled) {
|
||||
this.namespaceQuotaManager.deleteNamespace(namespace);
|
||||
}
|
||||
}
|
||||
|
||||
private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
|
||||
throws IOException, InterruptedException {
|
||||
if (req.hasRemoveAll() && req.getRemoveAll() == true) {
|
||||
|
@ -303,6 +320,39 @@ public class MasterQuotaManager {
|
|||
quotaOps.postApply(quotas);
|
||||
}
|
||||
|
||||
public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
|
||||
if (enabled) {
|
||||
namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
|
||||
}
|
||||
}
|
||||
|
||||
public void onRegionMerged(HRegionInfo hri) throws IOException {
|
||||
if (enabled) {
|
||||
namespaceQuotaManager.updateQuotaForRegionMerge(hri);
|
||||
}
|
||||
}
|
||||
|
||||
public void onRegionSplit(HRegionInfo hri) throws IOException {
|
||||
if (enabled) {
|
||||
namespaceQuotaManager.checkQuotaToSplitRegion(hri);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove table from namespace quota.
|
||||
* @param tName - The table name to update quota usage.
|
||||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
|
||||
if (enabled) {
|
||||
namespaceQuotaManager.removeFromNamespaceUsage(tName);
|
||||
}
|
||||
}
|
||||
|
||||
public NamespaceAuditor getNamespaceQuotaManager() {
|
||||
return this.namespaceQuotaManager;
|
||||
}
|
||||
|
||||
private static interface SetQuotaOperations {
|
||||
Quotas fetch() throws IOException;
|
||||
|
||||
|
@ -438,4 +488,11 @@ public class MasterQuotaManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRegionSplitReverted(HRegionInfo hri) throws IOException {
|
||||
if (enabled) {
|
||||
this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The listener interface for receiving region state events.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface RegionStateListener {
|
||||
|
||||
/**
|
||||
* Process region split event.
|
||||
* @param hri An instance of HRegionInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
void onRegionSplit(HRegionInfo hri) throws IOException;
|
||||
|
||||
/**
|
||||
* Process region split reverted event.
|
||||
* @param hri An instance of HRegionInfo
|
||||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
void onRegionSplitReverted(HRegionInfo hri) throws IOException;
|
||||
|
||||
/**
|
||||
* Process region merge event.
|
||||
* @param hri An instance of HRegionInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
void onRegionMerged(HRegionInfo hri) throws IOException;
|
||||
}
|
|
@ -0,0 +1,618 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.namespace;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.TableNamespaceManager;
|
||||
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestNamespaceAuditor {
|
||||
private static final Log LOG = LogFactory.getLog(TestNamespaceAuditor.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static HBaseAdmin ADMIN;
|
||||
private String prefix = "TestNamespaceAuditor";
|
||||
|
||||
@BeforeClass
|
||||
public static void before() throws Exception {
|
||||
UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
|
||||
setupOnce();
|
||||
}
|
||||
|
||||
public static void setupOnce() throws Exception, IOException {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName());
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MasterSyncObserver.class.getName());
|
||||
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class,
|
||||
RegionServerObserver.class);
|
||||
UTIL.startMiniCluster(1, 1);
|
||||
waitForQuotaEnabled();
|
||||
ADMIN = UTIL.getHBaseAdmin();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception, KeeperException {
|
||||
for (HTableDescriptor table : ADMIN.listTables()) {
|
||||
ADMIN.disableTable(table.getTableName());
|
||||
deleteTable(table.getTableName());
|
||||
}
|
||||
for (NamespaceDescriptor ns : ADMIN.listNamespaceDescriptors()) {
|
||||
if (ns.getName().startsWith(prefix)) {
|
||||
ADMIN.deleteNamespace(ns.getName());
|
||||
}
|
||||
}
|
||||
assertTrue("Quota manager not enabled", UTIL.getHBaseCluster().getMaster()
|
||||
.getMasterQuotaManager().isQuotaEnabled());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTableOperations() throws Exception {
|
||||
String nsp = prefix + "_np2";
|
||||
NamespaceDescriptor nspDesc =
|
||||
NamespaceDescriptor.create(nsp)
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "5")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
assertNotNull("Namespace descriptor found null.", ADMIN.getNamespaceDescriptor(nsp));
|
||||
assertEquals(ADMIN.listNamespaceDescriptors().length, 3);
|
||||
HTableDescriptor tableDescOne =
|
||||
new HTableDescriptor(TableName.valueOf(nsp + TableName.NAMESPACE_DELIM + "table1"));
|
||||
HTableDescriptor tableDescTwo =
|
||||
new HTableDescriptor(TableName.valueOf(nsp + TableName.NAMESPACE_DELIM + "table2"));
|
||||
HTableDescriptor tableDescThree =
|
||||
new HTableDescriptor(TableName.valueOf(nsp + TableName.NAMESPACE_DELIM + "table3"));
|
||||
ADMIN.createTable(tableDescOne);
|
||||
boolean constraintViolated = false;
|
||||
try {
|
||||
ADMIN.createTable(tableDescTwo, Bytes.toBytes("AAA"), Bytes.toBytes("ZZZ"), 5);
|
||||
} catch (Exception exp) {
|
||||
assertTrue(exp instanceof IOException);
|
||||
constraintViolated = true;
|
||||
} finally {
|
||||
assertTrue("Constraint not violated for table " + tableDescTwo.getTableName(),
|
||||
constraintViolated);
|
||||
}
|
||||
ADMIN.createTable(tableDescTwo, Bytes.toBytes("AAA"), Bytes.toBytes("ZZZ"), 4);
|
||||
NamespaceTableAndRegionInfo nspState = getQuotaManager().getState(nsp);
|
||||
assertNotNull(nspState);
|
||||
assertTrue(nspState.getTables().size() == 2);
|
||||
assertTrue(nspState.getRegionCount() == 5);
|
||||
constraintViolated = false;
|
||||
try {
|
||||
ADMIN.createTable(tableDescThree);
|
||||
} catch (Exception exp) {
|
||||
assertTrue(exp instanceof IOException);
|
||||
constraintViolated = true;
|
||||
} finally {
|
||||
assertTrue("Constraint not violated for table " + tableDescThree.getTableName(),
|
||||
constraintViolated);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidQuotas() throws Exception {
|
||||
boolean exceptionCaught = false;
|
||||
FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
|
||||
Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
|
||||
NamespaceDescriptor nspDesc =
|
||||
NamespaceDescriptor.create(prefix + "vq1")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "hihdufh")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
|
||||
try {
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
} catch (Exception exp) {
|
||||
LOG.warn(exp);
|
||||
exceptionCaught = true;
|
||||
} finally {
|
||||
assertTrue(exceptionCaught);
|
||||
assertFalse(fs.exists(FSUtils.getNamespaceDir(rootDir, nspDesc.getName())));
|
||||
}
|
||||
nspDesc =
|
||||
NamespaceDescriptor.create(prefix + "vq2")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "-456")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
|
||||
try {
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
} catch (Exception exp) {
|
||||
LOG.warn(exp);
|
||||
exceptionCaught = true;
|
||||
} finally {
|
||||
assertTrue(exceptionCaught);
|
||||
assertFalse(fs.exists(FSUtils.getNamespaceDir(rootDir, nspDesc.getName())));
|
||||
}
|
||||
nspDesc =
|
||||
NamespaceDescriptor.create(prefix + "vq3")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "10")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "sciigd").build();
|
||||
try {
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
} catch (Exception exp) {
|
||||
LOG.warn(exp);
|
||||
exceptionCaught = true;
|
||||
} finally {
|
||||
assertTrue(exceptionCaught);
|
||||
assertFalse(fs.exists(FSUtils.getNamespaceDir(rootDir, nspDesc.getName())));
|
||||
}
|
||||
nspDesc =
|
||||
NamespaceDescriptor.create(prefix + "vq4")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "10")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "-1500").build();
|
||||
try {
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
} catch (Exception exp) {
|
||||
LOG.warn(exp);
|
||||
exceptionCaught = true;
|
||||
} finally {
|
||||
assertTrue(exceptionCaught);
|
||||
assertFalse(fs.exists(FSUtils.getNamespaceDir(rootDir, nspDesc.getName())));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteTable() throws Exception {
|
||||
String namespace = prefix + "_dummy";
|
||||
NamespaceDescriptor nspDesc =
|
||||
NamespaceDescriptor.create(namespace)
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "100")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "3").build();
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
assertNotNull("Namespace descriptor found null.", ADMIN.getNamespaceDescriptor(namespace));
|
||||
NamespaceTableAndRegionInfo stateInfo = getNamespaceState(nspDesc.getName());
|
||||
assertNotNull("Namespace state found null for " + namespace, stateInfo);
|
||||
HTableDescriptor tableDescOne =
|
||||
new HTableDescriptor(TableName.valueOf(namespace + TableName.NAMESPACE_DELIM + "table1"));
|
||||
HTableDescriptor tableDescTwo =
|
||||
new HTableDescriptor(TableName.valueOf(namespace + TableName.NAMESPACE_DELIM + "table2"));
|
||||
ADMIN.createTable(tableDescOne);
|
||||
ADMIN.createTable(tableDescTwo, Bytes.toBytes("AAA"), Bytes.toBytes("ZZZ"), 5);
|
||||
stateInfo = getNamespaceState(nspDesc.getName());
|
||||
assertNotNull("Namespace state found to be null.", stateInfo);
|
||||
assertEquals(2, stateInfo.getTables().size());
|
||||
assertEquals(5, stateInfo.getRegionCountOfTable(tableDescTwo.getTableName()));
|
||||
assertEquals(6, stateInfo.getRegionCount());
|
||||
ADMIN.disableTable(tableDescOne.getTableName());
|
||||
deleteTable(tableDescOne.getTableName());
|
||||
stateInfo = getNamespaceState(nspDesc.getName());
|
||||
assertNotNull("Namespace state found to be null.", stateInfo);
|
||||
assertEquals(5, stateInfo.getRegionCount());
|
||||
assertEquals(1, stateInfo.getTables().size());
|
||||
ADMIN.disableTable(tableDescTwo.getTableName());
|
||||
deleteTable(tableDescTwo.getTableName());
|
||||
ADMIN.deleteNamespace(namespace);
|
||||
stateInfo = getNamespaceState(namespace);
|
||||
assertNull("Namespace state not found to be null.", stateInfo);
|
||||
}
|
||||
|
||||
public static class CPRegionServerObserver extends BaseRegionServerObserver {
|
||||
private volatile boolean shouldFailMerge = false;
|
||||
|
||||
public void failMerge(boolean fail) {
|
||||
shouldFailMerge = fail;
|
||||
}
|
||||
|
||||
private boolean triggered = false;
|
||||
|
||||
public synchronized void waitUtilTriggered() throws InterruptedException {
|
||||
while (!triggered) {
|
||||
wait();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
Region regionA, Region regionB) throws IOException {
|
||||
triggered = true;
|
||||
notifyAll();
|
||||
if (shouldFailMerge) {
|
||||
throw new IOException("fail merge");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionMerge() throws Exception {
|
||||
String nsp1 = prefix + "_regiontest";
|
||||
NamespaceDescriptor nspDesc =
|
||||
NamespaceDescriptor.create(nsp1)
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "3")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
final TableName tableTwo = TableName.valueOf(nsp1 + TableName.NAMESPACE_DELIM + "table2");
|
||||
byte[] columnFamily = Bytes.toBytes("info");
|
||||
HTableDescriptor tableDescOne = new HTableDescriptor(tableTwo);
|
||||
tableDescOne.addFamily(new HColumnDescriptor(columnFamily));
|
||||
final int initialRegions = 3;
|
||||
ADMIN.createTable(tableDescOne, Bytes.toBytes("1"), Bytes.toBytes("2000"), initialRegions);
|
||||
try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||
Table table = connection.getTable(tableTwo)) {
|
||||
UTIL.loadNumericRows(table, Bytes.toBytes("info"), 1000, 1999);
|
||||
}
|
||||
ADMIN.flush(tableTwo);
|
||||
List<HRegionInfo> hris = ADMIN.getTableRegions(tableTwo);
|
||||
Collections.sort(hris);
|
||||
// merge the two regions
|
||||
final Set<String> encodedRegionNamesToMerge =
|
||||
Sets.newHashSet(hris.get(0).getEncodedName(), hris.get(1).getEncodedName());
|
||||
ADMIN.mergeRegions(hris.get(0).getEncodedNameAsBytes(), hris.get(1).getEncodedNameAsBytes(),
|
||||
false);
|
||||
waitForMergeToComplete(tableTwo, encodedRegionNamesToMerge);
|
||||
hris = ADMIN.getTableRegions(tableTwo);
|
||||
assertEquals(initialRegions - 1, hris.size());
|
||||
Collections.sort(hris);
|
||||
|
||||
final HRegionInfo hriToSplit = hris.get(1);
|
||||
ADMIN.split(tableTwo, Bytes.toBytes("500"));
|
||||
|
||||
UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
|
||||
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
RegionStates regionStates =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
|
||||
for (HRegionInfo hri : ADMIN.getTableRegions(tableTwo)) {
|
||||
if (hri.getEncodedName().equals(hriToSplit.getEncodedName())) {
|
||||
return false;
|
||||
}
|
||||
if (!regionStates.isRegionInState(hri, RegionState.State.OPEN)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
RegionStates regionStates =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
|
||||
for (HRegionInfo hri : ADMIN.getTableRegions(tableTwo)) {
|
||||
if (hri.getEncodedName().equals(hriToSplit.getEncodedName())) {
|
||||
return hriToSplit + " which is expected to be split is still online";
|
||||
}
|
||||
if (!regionStates.isRegionInState(hri, RegionState.State.OPEN)) {
|
||||
return hri + " is still in not opened";
|
||||
}
|
||||
}
|
||||
return "Unknown";
|
||||
}
|
||||
});
|
||||
hris = ADMIN.getTableRegions(tableTwo);
|
||||
assertEquals(initialRegions, hris.size());
|
||||
Collections.sort(hris);
|
||||
|
||||
// fail region merge through Coprocessor hook
|
||||
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
|
||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
||||
RegionServerCoprocessorHost cpHost = regionServer.getRegionServerCoprocessorHost();
|
||||
Coprocessor coprocessor = cpHost.findCoprocessor(CPRegionServerObserver.class.getName());
|
||||
CPRegionServerObserver regionServerObserver = (CPRegionServerObserver) coprocessor;
|
||||
regionServerObserver.failMerge(true);
|
||||
regionServerObserver.triggered = false;
|
||||
|
||||
ADMIN.mergeRegions(hris.get(1).getEncodedNameAsBytes(), hris.get(2).getEncodedNameAsBytes(),
|
||||
false);
|
||||
regionServerObserver.waitUtilTriggered();
|
||||
hris = ADMIN.getTableRegions(tableTwo);
|
||||
assertEquals(initialRegions, hris.size());
|
||||
Collections.sort(hris);
|
||||
// verify that we cannot split
|
||||
HRegionInfo hriToSplit2 = hris.get(1);
|
||||
ADMIN.split(tableTwo,
|
||||
TableInputFormatBase.getSplitKey(hriToSplit2.getStartKey(), hriToSplit2.getEndKey(), true));
|
||||
waitForMergeToComplete(tableTwo, encodedRegionNamesToMerge);
|
||||
assertEquals(initialRegions, ADMIN.getTableRegions(tableTwo).size());
|
||||
}
|
||||
|
||||
private void waitForMergeToComplete(final TableName tableTwo,
|
||||
final Set<String> encodedRegionNamesToMerge) throws Exception {
|
||||
UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
|
||||
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
RegionStates regionStates =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
|
||||
for (HRegionInfo hri : ADMIN.getTableRegions(tableTwo)) {
|
||||
if (encodedRegionNamesToMerge.contains(hri.getEncodedName())) {
|
||||
return false;
|
||||
}
|
||||
if (!regionStates.isRegionInState(hri, RegionState.State.OPEN)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
RegionStates regionStates =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
|
||||
for (HRegionInfo hri : ADMIN.getTableRegions(tableTwo)) {
|
||||
if (encodedRegionNamesToMerge.contains(hri.getEncodedName())) {
|
||||
return hri + " which is expected to be merged is still online";
|
||||
}
|
||||
if (!regionStates.isRegionInState(hri, RegionState.State.OPEN)) {
|
||||
return hri + " is still in not opened";
|
||||
}
|
||||
}
|
||||
return "Unknown";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionOperations() throws Exception {
|
||||
String nsp1 = prefix + "_regiontest";
|
||||
NamespaceDescriptor nspDesc =
|
||||
NamespaceDescriptor.create(nsp1)
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "2")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
boolean constraintViolated = false;
|
||||
final TableName tableOne = TableName.valueOf(nsp1 + TableName.NAMESPACE_DELIM + "table1");
|
||||
byte[] columnFamily = Bytes.toBytes("info");
|
||||
HTableDescriptor tableDescOne = new HTableDescriptor(tableOne);
|
||||
tableDescOne.addFamily(new HColumnDescriptor(columnFamily));
|
||||
NamespaceTableAndRegionInfo stateInfo;
|
||||
try {
|
||||
ADMIN.createTable(tableDescOne, Bytes.toBytes("1"), Bytes.toBytes("1000"), 7);
|
||||
} catch (Exception exp) {
|
||||
assertTrue(exp instanceof DoNotRetryIOException);
|
||||
LOG.info(exp);
|
||||
constraintViolated = true;
|
||||
} finally {
|
||||
assertTrue(constraintViolated);
|
||||
}
|
||||
assertFalse(ADMIN.tableExists(tableOne));
|
||||
// This call will pass.
|
||||
ADMIN.createTable(tableDescOne);
|
||||
Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||
HTable htable = (HTable) connection.getTable(tableOne);
|
||||
UTIL.loadNumericRows(htable, Bytes.toBytes("info"), 1, 1000);
|
||||
ADMIN.flush(tableOne);
|
||||
stateInfo = getNamespaceState(nsp1);
|
||||
assertEquals(1, stateInfo.getTables().size());
|
||||
assertEquals(1, stateInfo.getRegionCount());
|
||||
restartMaster();
|
||||
ADMIN.split(tableOne, Bytes.toBytes("500"));
|
||||
HRegion actualRegion = UTIL.getHBaseCluster().getRegions(tableOne).get(0);
|
||||
CustomObserver observer =
|
||||
(CustomObserver) actualRegion.getCoprocessorHost().findCoprocessor(
|
||||
CustomObserver.class.getName());
|
||||
assertNotNull(observer);
|
||||
observer.postSplit.await();
|
||||
assertEquals(2, ADMIN.getTableRegions(tableOne).size());
|
||||
actualRegion = UTIL.getHBaseCluster().getRegions(tableOne).get(0);
|
||||
observer =
|
||||
(CustomObserver) actualRegion.getCoprocessorHost().findCoprocessor(
|
||||
CustomObserver.class.getName());
|
||||
assertNotNull(observer);
|
||||
ADMIN.split(
|
||||
tableOne,
|
||||
getSplitKey(actualRegion.getRegionInfo().getStartKey(), actualRegion.getRegionInfo()
|
||||
.getEndKey()));
|
||||
observer.postSplit.await();
|
||||
// Make sure no regions have been added.
|
||||
List<HRegionInfo> hris = ADMIN.getTableRegions(tableOne);
|
||||
assertEquals(2, hris.size());
|
||||
assertTrue("split completed", observer.preSplitBeforePONR.getCount() == 1);
|
||||
|
||||
htable.close();
|
||||
}
|
||||
|
||||
private NamespaceTableAndRegionInfo getNamespaceState(String namespace) throws KeeperException,
|
||||
IOException {
|
||||
return getQuotaManager().getState(namespace);
|
||||
}
|
||||
|
||||
byte[] getSplitKey(byte[] startKey, byte[] endKey) {
|
||||
String skey = Bytes.toString(startKey);
|
||||
int key;
|
||||
if (StringUtils.isBlank(skey)) {
|
||||
key = Integer.parseInt(Bytes.toString(endKey)) / 2;
|
||||
} else {
|
||||
key = (int) (Integer.parseInt(skey) * 1.5);
|
||||
}
|
||||
return Bytes.toBytes("" + key);
|
||||
}
|
||||
|
||||
public static class CustomObserver extends BaseRegionObserver {
|
||||
volatile CountDownLatch postSplit;
|
||||
volatile CountDownLatch preSplitBeforePONR;
|
||||
|
||||
@Override
|
||||
public void postCompleteSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
postSplit.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
byte[] splitKey, List<Mutation> metaEntries) throws IOException {
|
||||
preSplitBeforePONR.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment e) throws IOException {
|
||||
postSplit = new CountDownLatch(1);
|
||||
preSplitBeforePONR = new CountDownLatch(1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStatePreserve() throws Exception {
|
||||
final String nsp1 = prefix + "_testStatePreserve";
|
||||
NamespaceDescriptor nspDesc =
|
||||
NamespaceDescriptor.create(nsp1)
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "20")
|
||||
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "10").build();
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
TableName tableOne = TableName.valueOf(nsp1 + TableName.NAMESPACE_DELIM + "table1");
|
||||
TableName tableTwo = TableName.valueOf(nsp1 + TableName.NAMESPACE_DELIM + "table2");
|
||||
TableName tableThree = TableName.valueOf(nsp1 + TableName.NAMESPACE_DELIM + "table3");
|
||||
HTableDescriptor tableDescOne = new HTableDescriptor(tableOne);
|
||||
HTableDescriptor tableDescTwo = new HTableDescriptor(tableTwo);
|
||||
HTableDescriptor tableDescThree = new HTableDescriptor(tableThree);
|
||||
ADMIN.createTable(tableDescOne, Bytes.toBytes("1"), Bytes.toBytes("1000"), 3);
|
||||
ADMIN.createTable(tableDescTwo, Bytes.toBytes("1"), Bytes.toBytes("1000"), 3);
|
||||
ADMIN.createTable(tableDescThree, Bytes.toBytes("1"), Bytes.toBytes("1000"), 4);
|
||||
ADMIN.disableTable(tableThree);
|
||||
deleteTable(tableThree);
|
||||
// wait for chore to complete
|
||||
UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return (getNamespaceState(nsp1).getTables().size() == 2);
|
||||
}
|
||||
});
|
||||
NamespaceTableAndRegionInfo before = getNamespaceState(nsp1);
|
||||
restartMaster();
|
||||
NamespaceTableAndRegionInfo after = getNamespaceState(nsp1);
|
||||
assertEquals("Expected: " + before.getTables() + " Found: " + after.getTables(), before
|
||||
.getTables().size(), after.getTables().size());
|
||||
}
|
||||
|
||||
private static void waitForQuotaEnabled() throws Exception {
|
||||
UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
HMaster master = UTIL.getHBaseCluster().getMaster();
|
||||
if (master == null) {
|
||||
return false;
|
||||
}
|
||||
MasterQuotaManager quotaManager = master.getMasterQuotaManager();
|
||||
return quotaManager != null && quotaManager.isQuotaEnabled();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void restartMaster() throws Exception {
|
||||
UTIL.getHBaseCluster().getMaster(0).stop("Stopping to start again");
|
||||
UTIL.getHBaseCluster().waitOnMaster(0);
|
||||
UTIL.getHBaseCluster().startMaster();
|
||||
waitForQuotaEnabled();
|
||||
}
|
||||
|
||||
private NamespaceAuditor getQuotaManager() {
|
||||
return UTIL.getHBaseCluster().getMaster().getMasterQuotaManager().getNamespaceQuotaManager();
|
||||
}
|
||||
|
||||
public static class MasterSyncObserver extends BaseMasterObserver {
|
||||
volatile CountDownLatch tableDeletionLatch;
|
||||
|
||||
@Override
|
||||
public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
TableName tableName) throws IOException {
|
||||
tableDeletionLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteTableHandler(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
TableName tableName) throws IOException {
|
||||
tableDeletionLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteTable(final TableName tableName) throws Exception {
|
||||
// NOTE: We need a latch because admin is not sync,
|
||||
// so the postOp coprocessor method may be called after the admin operation returned.
|
||||
MasterSyncObserver observer =
|
||||
(MasterSyncObserver) UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost()
|
||||
.findCoprocessor(MasterSyncObserver.class.getName());
|
||||
ADMIN.deleteTable(tableName);
|
||||
observer.tableDeletionLatch.await();
|
||||
}
|
||||
|
||||
@Test(expected = QuotaExceededException.class, timeout = 30000)
|
||||
public void testExceedTableQuotaInNamespace() throws Exception {
|
||||
String nsp = prefix + "_testExceedTableQuotaInNamespace";
|
||||
NamespaceDescriptor nspDesc =
|
||||
NamespaceDescriptor.create(nsp).addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "1")
|
||||
.build();
|
||||
ADMIN.createNamespace(nspDesc);
|
||||
assertNotNull("Namespace descriptor found null.", ADMIN.getNamespaceDescriptor(nsp));
|
||||
assertEquals(ADMIN.listNamespaceDescriptors().length, 3);
|
||||
HTableDescriptor tableDescOne =
|
||||
new HTableDescriptor(TableName.valueOf(nsp + TableName.NAMESPACE_DELIM + "table1"));
|
||||
HTableDescriptor tableDescTwo =
|
||||
new HTableDescriptor(TableName.valueOf(nsp + TableName.NAMESPACE_DELIM + "table2"));
|
||||
ADMIN.createTable(tableDescOne);
|
||||
ADMIN.createTable(tableDescTwo, Bytes.toBytes("AAA"), Bytes.toBytes("ZZZ"), 4);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.namespace;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestZKLessNamespaceAuditor extends TestNamespaceAuditor {
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void before() throws Exception {
|
||||
UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", false);
|
||||
setupOnce();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TestNamespaceAuditor.tearDown();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue