HBASE-16998 Implement Master-side analysis of region space reports
Adds a new Chore to the Master that analyzes the reports that are sent by RegionServers. The Master must then, for all tables with quotas, determine the tables that are violating quotas and move those tables into violation. Similarly, tables no longer violating the quota can be moved out of violation. The Chore is the "stateful" bit, managing which tables are and are not in violation. Everything else is just performing computation and informing the Chore on the updated state. Added InterfaceAudience annotations and clean up the QuotaObserverChore constructor. Cleaned up some javadoc and QuotaObserverChore. Reuse the QuotaViolationStore impl objects.
This commit is contained in:
parent
7fb0ac26e3
commit
533470f8c8
|
@ -22,6 +22,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -54,11 +55,23 @@ public class QuotaRetriever implements Closeable, Iterable<QuotaSettings> {
|
|||
private Connection connection;
|
||||
private Table table;
|
||||
|
||||
private QuotaRetriever() {
|
||||
/**
|
||||
* Should QutoaRetriever manage the state of the connection, or leave it be.
|
||||
*/
|
||||
private boolean isManagedConnection = false;
|
||||
|
||||
QuotaRetriever() {
|
||||
}
|
||||
|
||||
void init(final Configuration conf, final Scan scan) throws IOException {
|
||||
this.connection = ConnectionFactory.createConnection(conf);
|
||||
// Set this before creating the connection and passing it down to make sure
|
||||
// it's cleaned up if we fail to construct the Scanner.
|
||||
this.isManagedConnection = true;
|
||||
init(ConnectionFactory.createConnection(conf), scan);
|
||||
}
|
||||
|
||||
void init(final Connection conn, final Scan scan) throws IOException {
|
||||
this.connection = Objects.requireNonNull(conn);
|
||||
this.table = this.connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME);
|
||||
try {
|
||||
scanner = table.getScanner(scan);
|
||||
|
@ -77,11 +90,15 @@ public class QuotaRetriever implements Closeable, Iterable<QuotaSettings> {
|
|||
this.table.close();
|
||||
this.table = null;
|
||||
}
|
||||
// Null out the connection on close() even if we didn't explicitly close it
|
||||
// to maintain typical semantics.
|
||||
if (isManagedConnection) {
|
||||
if (this.connection != null) {
|
||||
this.connection.close();
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
this.connection = null;
|
||||
}
|
||||
|
||||
public QuotaSettings next() throws IOException {
|
||||
if (cache.isEmpty()) {
|
||||
|
|
|
@ -134,6 +134,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
|
@ -372,6 +375,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
// it is assigned after 'initialized' guard set to true, so should be volatile
|
||||
private volatile MasterQuotaManager quotaManager;
|
||||
private SpaceQuotaViolationNotifier spaceQuotaViolationNotifier;
|
||||
private QuotaObserverChore quotaObserverChore;
|
||||
|
||||
private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
|
||||
private WALProcedureStore procedureStore;
|
||||
|
@ -899,6 +904,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
status.setStatus("Starting quota manager");
|
||||
initQuotaManager();
|
||||
this.spaceQuotaViolationNotifier = new SpaceQuotaViolationNotifierForTest();
|
||||
this.quotaObserverChore = new QuotaObserverChore(this);
|
||||
// Start the chore to read the region FS space reports and act on them
|
||||
getChoreService().scheduleChore(quotaObserverChore);
|
||||
|
||||
// clear the dead servers with same host name and port of online server because we are not
|
||||
// removing dead server with same hostname and port of rs which is trying to check in before
|
||||
|
@ -1209,6 +1218,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
if (this.periodicDoMetricsChore != null) {
|
||||
periodicDoMetricsChore.cancel();
|
||||
}
|
||||
if (this.quotaObserverChore != null) {
|
||||
quotaObserverChore.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3346,4 +3358,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
public LockManager getLockManager() {
|
||||
return lockManager;
|
||||
}
|
||||
|
||||
public QuotaObserverChore getQuotaObserverChore() {
|
||||
return this.quotaObserverChore;
|
||||
}
|
||||
|
||||
public SpaceQuotaViolationNotifier getSpaceQuotaViolationNotifier() {
|
||||
return this.spaceQuotaViolationNotifier;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -523,6 +523,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
|
||||
public void addRegionSize(HRegionInfo hri, long size) {
|
||||
// TODO Make proper API
|
||||
// TODO Prevent from growing indefinitely
|
||||
regionSizes.put(hri, size);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* {@link QuotaViolationStore} implementation for namespaces.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class NamespaceQuotaViolationStore implements QuotaViolationStore<String> {
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final ReadLock rlock = lock.readLock();
|
||||
private final WriteLock wlock = lock.writeLock();
|
||||
|
||||
private final Connection conn;
|
||||
private final QuotaObserverChore chore;
|
||||
private Map<HRegionInfo,Long> regionUsage;
|
||||
|
||||
public NamespaceQuotaViolationStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) {
|
||||
this.conn = Objects.requireNonNull(conn);
|
||||
this.chore = Objects.requireNonNull(chore);
|
||||
this.regionUsage = Objects.requireNonNull(regionUsage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SpaceQuota getSpaceQuota(String namespace) throws IOException {
|
||||
Quotas quotas = getQuotaForNamespace(namespace);
|
||||
if (null != quotas && quotas.hasSpace()) {
|
||||
return quotas.getSpace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the namespace quota. Visible for mocking/testing.
|
||||
*/
|
||||
Quotas getQuotaForNamespace(String namespace) throws IOException {
|
||||
return QuotaTableUtil.getNamespaceQuota(conn, namespace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ViolationState getCurrentState(String namespace) {
|
||||
// Defer the "current state" to the chore
|
||||
return this.chore.getNamespaceQuotaViolation(namespace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ViolationState getTargetState(String subject, SpaceQuota spaceQuota) {
|
||||
rlock.lock();
|
||||
try {
|
||||
final long sizeLimitInBytes = spaceQuota.getSoftLimit();
|
||||
long sum = 0L;
|
||||
for (Entry<HRegionInfo,Long> entry : filterBySubject(subject)) {
|
||||
sum += entry.getValue();
|
||||
if (sum > sizeLimitInBytes) {
|
||||
// Short-circuit early
|
||||
return ViolationState.IN_VIOLATION;
|
||||
}
|
||||
}
|
||||
// Observance is defined as the size of the table being less than the limit
|
||||
return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : ViolationState.IN_VIOLATION;
|
||||
} finally {
|
||||
rlock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Entry<HRegionInfo,Long>> filterBySubject(String namespace) {
|
||||
rlock.lock();
|
||||
try {
|
||||
return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() {
|
||||
@Override
|
||||
public boolean apply(Entry<HRegionInfo,Long> input) {
|
||||
return namespace.equals(input.getKey().getTable().getNamespaceAsString());
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
rlock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCurrentState(String namespace, ViolationState state) {
|
||||
// Defer the "current state" to the chore
|
||||
this.chore.setNamespaceQuotaViolation(namespace, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
|
||||
wlock.lock();
|
||||
try {
|
||||
this.regionUsage = Objects.requireNonNull(regionUsage);
|
||||
} finally {
|
||||
wlock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,618 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Multimap;
|
||||
|
||||
/**
|
||||
* Reads the currently received Region filesystem-space use reports and acts on those which
|
||||
* violate a defined quota.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class QuotaObserverChore extends ScheduledChore {
|
||||
private static final Log LOG = LogFactory.getLog(QuotaObserverChore.class);
|
||||
static final String VIOLATION_OBSERVER_CHORE_PERIOD_KEY =
|
||||
"hbase.master.quotas.violation.observer.chore.period";
|
||||
static final int VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
|
||||
|
||||
static final String VIOLATION_OBSERVER_CHORE_DELAY_KEY =
|
||||
"hbase.master.quotas.violation.observer.chore.delay";
|
||||
static final long VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
|
||||
|
||||
static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY =
|
||||
"hbase.master.quotas.violation.observer.chore.timeunit";
|
||||
static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
|
||||
|
||||
static final String VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY =
|
||||
"hbase.master.quotas.violation.observer.report.percent";
|
||||
static final double VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
|
||||
|
||||
private final Connection conn;
|
||||
private final Configuration conf;
|
||||
private final MasterQuotaManager quotaManager;
|
||||
/*
|
||||
* Callback that changes in quota violation are passed to.
|
||||
*/
|
||||
private final SpaceQuotaViolationNotifier violationNotifier;
|
||||
|
||||
/*
|
||||
* Preserves the state of quota violations for tables and namespaces
|
||||
*/
|
||||
private final Map<TableName,ViolationState> tableQuotaViolationStates;
|
||||
private final Map<String,ViolationState> namespaceQuotaViolationStates;
|
||||
|
||||
/*
|
||||
* Encapsulates logic for moving tables/namespaces into or out of quota violation
|
||||
*/
|
||||
private QuotaViolationStore<TableName> tableViolationStore;
|
||||
private QuotaViolationStore<String> namespaceViolationStore;
|
||||
|
||||
public QuotaObserverChore(HMaster master) {
|
||||
this(
|
||||
master.getConnection(), master.getConfiguration(),
|
||||
master.getSpaceQuotaViolationNotifier(), master.getMasterQuotaManager(),
|
||||
master);
|
||||
}
|
||||
|
||||
QuotaObserverChore(
|
||||
Connection conn, Configuration conf, SpaceQuotaViolationNotifier violationNotifier,
|
||||
MasterQuotaManager quotaManager, Stoppable stopper) {
|
||||
super(
|
||||
QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
|
||||
getInitialDelay(conf), getTimeUnit(conf));
|
||||
this.conn = conn;
|
||||
this.conf = conf;
|
||||
this.quotaManager = quotaManager;
|
||||
this.violationNotifier = violationNotifier;
|
||||
this.tableQuotaViolationStates = new HashMap<>();
|
||||
this.namespaceQuotaViolationStates = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
try {
|
||||
_chore();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to process quota reports and update quota violation state. Will retry.", e);
|
||||
}
|
||||
}
|
||||
|
||||
void _chore() throws IOException {
|
||||
// Get the total set of tables that have quotas defined. Includes table quotas
|
||||
// and tables included by namespace quotas.
|
||||
TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found following tables with quotas: " + tablesWithQuotas);
|
||||
}
|
||||
|
||||
// The current "view" of region space use. Used henceforth.
|
||||
final Map<HRegionInfo,Long> reportedRegionSpaceUse = quotaManager.snapshotRegionSizes();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports");
|
||||
}
|
||||
|
||||
// Create the stores to track table and namespace violations
|
||||
initializeViolationStores(reportedRegionSpaceUse);
|
||||
|
||||
// Filter out tables for which we don't have adequate regionspace reports yet.
|
||||
// Important that we do this after we instantiate the stores above
|
||||
tablesWithQuotas.filterInsufficientlyReportedTables(tableViolationStore);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Filtered insufficiently reported tables, left with " +
|
||||
reportedRegionSpaceUse.size() + " regions reported");
|
||||
}
|
||||
|
||||
// Transition each table to/from quota violation based on the current and target state.
|
||||
// Only table quotas are enacted.
|
||||
final Set<TableName> tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables();
|
||||
processTablesWithQuotas(tablesWithTableQuotas);
|
||||
|
||||
// For each Namespace quota, transition each table in the namespace in or out of violation
|
||||
// only if a table quota violation policy has not already been applied.
|
||||
final Set<String> namespacesWithQuotas = tablesWithQuotas.getNamespacesWithQuotas();
|
||||
final Multimap<String,TableName> tablesByNamespace = tablesWithQuotas.getTablesByNamespace();
|
||||
processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
|
||||
}
|
||||
|
||||
void initializeViolationStores(Map<HRegionInfo,Long> regionSizes) {
|
||||
Map<HRegionInfo,Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
|
||||
if (null == tableViolationStore) {
|
||||
tableViolationStore = new TableQuotaViolationStore(conn, this, immutableRegionSpaceUse);
|
||||
} else {
|
||||
tableViolationStore.setRegionUsage(immutableRegionSpaceUse);
|
||||
}
|
||||
if (null == namespaceViolationStore) {
|
||||
namespaceViolationStore = new NamespaceQuotaViolationStore(
|
||||
conn, this, immutableRegionSpaceUse);
|
||||
} else {
|
||||
namespaceViolationStore.setRegionUsage(immutableRegionSpaceUse);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes each {@code TableName} which has a quota defined and moves it in or out of
|
||||
* violation based on the space use.
|
||||
*
|
||||
* @param tablesWithTableQuotas The HBase tables which have quotas defined
|
||||
*/
|
||||
void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
|
||||
for (TableName table : tablesWithTableQuotas) {
|
||||
final SpaceQuota spaceQuota = tableViolationStore.getSpaceQuota(table);
|
||||
if (null == spaceQuota) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unexpectedly did not find a space quota for " + table
|
||||
+ ", maybe it was recently deleted.");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
final ViolationState currentState = tableViolationStore.getCurrentState(table);
|
||||
final ViolationState targetState = tableViolationStore.getTargetState(table, spaceQuota);
|
||||
|
||||
if (currentState == ViolationState.IN_VIOLATION) {
|
||||
if (targetState == ViolationState.IN_OBSERVANCE) {
|
||||
LOG.info(table + " moving into observance of table space quota.");
|
||||
transitionTableToObservance(table);
|
||||
tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE);
|
||||
} else if (targetState == ViolationState.IN_VIOLATION) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(table + " remains in violation of quota.");
|
||||
}
|
||||
tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION);
|
||||
}
|
||||
} else if (currentState == ViolationState.IN_OBSERVANCE) {
|
||||
if (targetState == ViolationState.IN_VIOLATION) {
|
||||
LOG.info(table + " moving into violation of table space quota.");
|
||||
transitionTableToViolation(table, getViolationPolicy(spaceQuota));
|
||||
tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION);
|
||||
} else if (targetState == ViolationState.IN_OBSERVANCE) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(table + " remains in observance of quota.");
|
||||
}
|
||||
tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes each namespace which has a quota defined and moves all of the tables contained
|
||||
* in that namespace into or out of violation of the quota. Tables which are already in
|
||||
* violation of a quota at the table level which <em>also</em> have a reside in a namespace
|
||||
* with a violated quota will not have the namespace quota enacted. The table quota takes
|
||||
* priority over the namespace quota.
|
||||
*
|
||||
* @param namespacesWithQuotas The set of namespaces that have quotas defined
|
||||
* @param tablesByNamespace A mapping of namespaces and the tables contained in those namespaces
|
||||
*/
|
||||
void processNamespacesWithQuotas(
|
||||
final Set<String> namespacesWithQuotas,
|
||||
final Multimap<String,TableName> tablesByNamespace) throws IOException {
|
||||
for (String namespace : namespacesWithQuotas) {
|
||||
// Get the quota definition for the namespace
|
||||
final SpaceQuota spaceQuota = namespaceViolationStore.getSpaceQuota(namespace);
|
||||
if (null == spaceQuota) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Could not get Namespace space quota for " + namespace
|
||||
+ ", maybe it was recently deleted.");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
final ViolationState currentState = namespaceViolationStore.getCurrentState(namespace);
|
||||
final ViolationState targetState = namespaceViolationStore.getTargetState(namespace, spaceQuota);
|
||||
// When in observance, check if we need to move to violation.
|
||||
if (ViolationState.IN_OBSERVANCE == currentState) {
|
||||
if (ViolationState.IN_VIOLATION == targetState) {
|
||||
for (TableName tableInNS : tablesByNamespace.get(namespace)) {
|
||||
if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) {
|
||||
// Table-level quota violation policy is being applied here.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Not activating Namespace violation policy because Table violation"
|
||||
+ " policy is already in effect for " + tableInNS);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
LOG.info(tableInNS + " moving into violation of namespace space quota");
|
||||
transitionTableToViolation(tableInNS, getViolationPolicy(spaceQuota));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// still in observance
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(namespace + " remains in observance of quota.");
|
||||
}
|
||||
}
|
||||
} else if (ViolationState.IN_VIOLATION == currentState) {
|
||||
// When in violation, check if we need to move to observance.
|
||||
if (ViolationState.IN_OBSERVANCE == targetState) {
|
||||
for (TableName tableInNS : tablesByNamespace.get(namespace)) {
|
||||
if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) {
|
||||
// Table-level quota violation policy is being applied here.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Not activating Namespace violation policy because Table violation"
|
||||
+ " policy is already in effect for " + tableInNS);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
LOG.info(tableInNS + " moving into observance of namespace space quota");
|
||||
transitionTableToObservance(tableInNS);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Remains in violation
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(namespace + " remains in violation of quota.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the set of all tables that have quotas defined. This includes tables with quotas
|
||||
* explicitly set on them, in addition to tables that exist namespaces which have a quota
|
||||
* defined.
|
||||
*/
|
||||
TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException {
|
||||
final Scan scan = QuotaTableUtil.makeScan(null);
|
||||
final QuotaRetriever scanner = new QuotaRetriever();
|
||||
final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(conn, conf);
|
||||
try {
|
||||
scanner.init(conn, scan);
|
||||
for (QuotaSettings quotaSettings : scanner) {
|
||||
// Only one of namespace and tablename should be 'null'
|
||||
final String namespace = quotaSettings.getNamespace();
|
||||
final TableName tableName = quotaSettings.getTableName();
|
||||
if (QuotaType.SPACE != quotaSettings.getQuotaType()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (null != namespace) {
|
||||
assert null == tableName;
|
||||
// Collect all of the tables in the namespace
|
||||
TableName[] tablesInNS = conn.getAdmin()
|
||||
.listTableNamesByNamespace(namespace);
|
||||
for (TableName tableUnderNs : tablesInNS) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Adding " + tableUnderNs + " under " + namespace
|
||||
+ " as having a namespace quota");
|
||||
}
|
||||
tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs);
|
||||
}
|
||||
} else {
|
||||
assert null != tableName;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Adding " + tableName + " as having table quota.");
|
||||
}
|
||||
// namespace is already null, must be a non-null tableName
|
||||
tablesWithQuotas.addTableQuotaTable(tableName);
|
||||
}
|
||||
}
|
||||
return tablesWithQuotas;
|
||||
} finally {
|
||||
if (null != scanner) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
QuotaViolationStore<TableName> getTableViolationStore() {
|
||||
return tableViolationStore;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
QuotaViolationStore<String> getNamespaceViolationStore() {
|
||||
return namespaceViolationStore;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions the given table to violation of its quota, enabling the violation policy.
|
||||
*/
|
||||
private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) {
|
||||
this.violationNotifier.transitionTableToViolation(table, violationPolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions the given table to observance of its quota, disabling the violation policy.
|
||||
*/
|
||||
private void transitionTableToObservance(TableName table) {
|
||||
this.violationNotifier.transitionTableToObservance(table);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the {@link ViolationState} for the given table.
|
||||
*/
|
||||
ViolationState getTableQuotaViolation(TableName table) {
|
||||
// TODO Can one instance of a Chore be executed concurrently?
|
||||
ViolationState state = this.tableQuotaViolationStates.get(table);
|
||||
if (null == state) {
|
||||
// No tracked state implies observance.
|
||||
return ViolationState.IN_OBSERVANCE;
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the quota violation state for the given table.
|
||||
*/
|
||||
void setTableQuotaViolation(TableName table, ViolationState state) {
|
||||
this.tableQuotaViolationStates.put(table, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the {@link ViolationState} for the given namespace.
|
||||
*/
|
||||
ViolationState getNamespaceQuotaViolation(String namespace) {
|
||||
// TODO Can one instance of a Chore be executed concurrently?
|
||||
ViolationState state = this.namespaceQuotaViolationStates.get(namespace);
|
||||
if (null == state) {
|
||||
// No tracked state implies observance.
|
||||
return ViolationState.IN_OBSERVANCE;
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the quota violation state for the given namespace.
|
||||
*/
|
||||
void setNamespaceQuotaViolation(String namespace, ViolationState state) {
|
||||
this.namespaceQuotaViolationStates.put(namespace, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the {@link SpaceViolationPolicy} from the serialized {@link Quotas} protobuf.
|
||||
* @throws IllegalArgumentException If the SpaceQuota lacks a ViolationPolicy
|
||||
*/
|
||||
SpaceViolationPolicy getViolationPolicy(SpaceQuota spaceQuota) {
|
||||
if (!spaceQuota.hasViolationPolicy()) {
|
||||
throw new IllegalArgumentException("SpaceQuota had no associated violation policy: "
|
||||
+ spaceQuota);
|
||||
}
|
||||
return ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the period for the chore from the configuration.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured chore period or the default value.
|
||||
*/
|
||||
static int getPeriod(Configuration conf) {
|
||||
return conf.getInt(VIOLATION_OBSERVER_CHORE_PERIOD_KEY,
|
||||
VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the initial delay for the chore from the configuration.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured chore initial delay or the default value.
|
||||
*/
|
||||
static long getInitialDelay(Configuration conf) {
|
||||
return conf.getLong(VIOLATION_OBSERVER_CHORE_DELAY_KEY,
|
||||
VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the time unit for the chore period and initial delay from the configuration. The
|
||||
* configuration value for {@link #VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
|
||||
* a {@link TimeUnit} value.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured time unit for the chore period and initial delay or the default value.
|
||||
*/
|
||||
static TimeUnit getTimeUnit(Configuration conf) {
|
||||
return TimeUnit.valueOf(conf.get(VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY,
|
||||
VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the percent of Regions for a table to have been reported to enable quota violation
|
||||
* state change.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The percent of regions reported to use.
|
||||
*/
|
||||
static Double getRegionReportPercent(Configuration conf) {
|
||||
return conf.getDouble(VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY,
|
||||
VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* A container which encapsulates the tables which have a table quota and the tables which
|
||||
* are contained in a namespace which have a namespace quota.
|
||||
*/
|
||||
static class TablesWithQuotas {
|
||||
private final Set<TableName> tablesWithTableQuotas = new HashSet<>();
|
||||
private final Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
|
||||
private final Connection conn;
|
||||
private final Configuration conf;
|
||||
|
||||
public TablesWithQuotas(Connection conn, Configuration conf) {
|
||||
this.conn = Objects.requireNonNull(conn);
|
||||
this.conf = Objects.requireNonNull(conf);
|
||||
}
|
||||
|
||||
Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a table with a table quota.
|
||||
*/
|
||||
public void addTableQuotaTable(TableName tn) {
|
||||
tablesWithTableQuotas.add(tn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a table with a namespace quota.
|
||||
*/
|
||||
public void addNamespaceQuotaTable(TableName tn) {
|
||||
tablesWithNamespaceQuotas.add(tn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the given table has a table quota.
|
||||
*/
|
||||
public boolean hasTableQuota(TableName tn) {
|
||||
return tablesWithTableQuotas.contains(tn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the table exists in a namespace with a namespace quota.
|
||||
*/
|
||||
public boolean hasNamespaceQuota(TableName tn) {
|
||||
return tablesWithNamespaceQuotas.contains(tn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an unmodifiable view of all tables with table quotas.
|
||||
*/
|
||||
public Set<TableName> getTableQuotaTables() {
|
||||
return Collections.unmodifiableSet(tablesWithTableQuotas);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an unmodifiable view of all tables in namespaces that have
|
||||
* namespace quotas.
|
||||
*/
|
||||
public Set<TableName> getNamespaceQuotaTables() {
|
||||
return Collections.unmodifiableSet(tablesWithNamespaceQuotas);
|
||||
}
|
||||
|
||||
public Set<String> getNamespacesWithQuotas() {
|
||||
Set<String> namespaces = new HashSet<>();
|
||||
for (TableName tn : tablesWithNamespaceQuotas) {
|
||||
namespaces.add(tn.getNamespaceAsString());
|
||||
}
|
||||
return namespaces;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a view of all tables that reside in a namespace with a namespace
|
||||
* quota, grouped by the namespace itself.
|
||||
*/
|
||||
public Multimap<String,TableName> getTablesByNamespace() {
|
||||
Multimap<String,TableName> tablesByNS = HashMultimap.create();
|
||||
for (TableName tn : tablesWithNamespaceQuotas) {
|
||||
tablesByNS.put(tn.getNamespaceAsString(), tn);
|
||||
}
|
||||
return tablesByNS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters out all tables for which the Master currently doesn't have enough region space
|
||||
* reports received from RegionServers yet.
|
||||
*/
|
||||
public void filterInsufficientlyReportedTables(QuotaViolationStore<TableName> tableStore)
|
||||
throws IOException {
|
||||
final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
|
||||
Set<TableName> tablesToRemove = new HashSet<>();
|
||||
for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) {
|
||||
// Don't recompute a table we've already computed
|
||||
if (tablesToRemove.contains(table)) {
|
||||
continue;
|
||||
}
|
||||
final int numRegionsInTable = getNumRegions(table);
|
||||
// If the table doesn't exist (no regions), bail out.
|
||||
if (0 == numRegionsInTable) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Filtering " + table + " because no regions were reported");
|
||||
}
|
||||
tablesToRemove.add(table);
|
||||
continue;
|
||||
}
|
||||
final int reportedRegionsInQuota = getNumReportedRegions(table, tableStore);
|
||||
final double ratioReported = ((double) reportedRegionsInQuota) / numRegionsInTable;
|
||||
if (ratioReported < percentRegionsReportedThreshold) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota + " of " +
|
||||
numRegionsInTable + " were reported.");
|
||||
}
|
||||
tablesToRemove.add(table);
|
||||
} else if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota + " of " +
|
||||
numRegionsInTable + " were reported.");
|
||||
}
|
||||
}
|
||||
for (TableName tableToRemove : tablesToRemove) {
|
||||
tablesWithTableQuotas.remove(tableToRemove);
|
||||
tablesWithNamespaceQuotas.remove(tableToRemove);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the total number of regions in a table.
|
||||
*/
|
||||
int getNumRegions(TableName table) throws IOException {
|
||||
List<HRegionInfo> regions = this.conn.getAdmin().getTableRegions(table);
|
||||
if (null == regions) {
|
||||
return 0;
|
||||
}
|
||||
return regions.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the number of regions reported for a table.
|
||||
*/
|
||||
int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore)
|
||||
throws IOException {
|
||||
return Iterables.size(tableStore.filterBySubject(table));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(32);
|
||||
sb.append(getClass().getSimpleName())
|
||||
.append(": tablesWithTableQuotas=")
|
||||
.append(this.tablesWithTableQuotas)
|
||||
.append(", tablesWithNamespaceQuotas=")
|
||||
.append(this.tablesWithNamespaceQuotas);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
|
||||
/**
|
||||
* A common interface for computing and storing space quota observance/violation for entities.
|
||||
*
|
||||
* An entity is presently a table or a namespace.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface QuotaViolationStore<T> {
|
||||
|
||||
/**
|
||||
* The current state of a table with respect to the policy set forth by a quota.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public enum ViolationState {
|
||||
IN_VIOLATION,
|
||||
IN_OBSERVANCE,
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the Quota for the given {@code subject}. May be null.
|
||||
*
|
||||
* @param subject The object for which the quota should be fetched
|
||||
*/
|
||||
SpaceQuota getSpaceQuota(T subject) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the current {@link ViolationState} for the given {@code subject}.
|
||||
*
|
||||
* @param subject The object which the quota violation state should be fetched
|
||||
*/
|
||||
ViolationState getCurrentState(T subject);
|
||||
|
||||
/**
|
||||
* Computes the target {@link ViolationState} for the given {@code subject} and
|
||||
* {@code spaceQuota}.
|
||||
*
|
||||
* @param subject The object which to determine the target quota violation state of
|
||||
* @param spaceQuota The quota "definition" for the {@code subject}
|
||||
*/
|
||||
ViolationState getTargetState(T subject, SpaceQuota spaceQuota);
|
||||
|
||||
/**
|
||||
* Filters the provided <code>regions</code>, returning those which match the given
|
||||
* <code>subject</code>.
|
||||
*
|
||||
* @param subject The filter criteria. Only regions belonging to this parameter will be returned
|
||||
*/
|
||||
Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject);
|
||||
|
||||
/**
|
||||
* Persists the current {@link ViolationState} for the {@code subject}.
|
||||
*
|
||||
* @param subject The object which the {@link ViolationState} is being persisted for
|
||||
* @param state The current {@link ViolationState} of the {@code subject}
|
||||
*/
|
||||
void setCurrentState(T subject, ViolationState state);
|
||||
|
||||
/**
|
||||
* Updates {@code this} with the latest snapshot of filesystem use by region.
|
||||
*
|
||||
* @param regionUsage A map of {@code HRegionInfo} objects to their filesystem usage in bytes
|
||||
*/
|
||||
void setRegionUsage(Map<HRegionInfo,Long> regionUsage);
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* An interface which abstract away the action taken to enable or disable
|
||||
* a space quota violation policy across the HBase cluster.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface SpaceQuotaViolationNotifier {
|
||||
|
||||
/**
|
||||
* Instructs the cluster that the given table is in violation of a space quota. The
|
||||
* provided violation policy is the action which should be taken on the table.
|
||||
*
|
||||
* @param tableName The name of the table in violation of the quota.
|
||||
* @param violationPolicy The policy which should be enacted on the table.
|
||||
*/
|
||||
void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy);
|
||||
|
||||
/**
|
||||
* Instructs the cluster that the given table is in observance of any applicable space quota.
|
||||
*
|
||||
* @param tableName The name of the table in observance.
|
||||
*/
|
||||
void transitionTableToObservance(TableName tableName);
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A SpaceQuotaViolationNotifier implementation for verifying testing.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNotifier {
|
||||
|
||||
private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) {
|
||||
tablesInViolation.put(tableName, violationPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transitionTableToObservance(TableName tableName) {
|
||||
tablesInViolation.remove(tableName);
|
||||
}
|
||||
|
||||
public Map<TableName,SpaceViolationPolicy> snapshotTablesInViolation() {
|
||||
return new HashMap<>(this.tablesInViolation);
|
||||
}
|
||||
|
||||
public void clearTableViolations() {
|
||||
this.tablesInViolation.clear();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* {@link QuotaViolationStore} for tables.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableQuotaViolationStore implements QuotaViolationStore<TableName> {
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final ReadLock rlock = lock.readLock();
|
||||
private final WriteLock wlock = lock.writeLock();
|
||||
|
||||
private final Connection conn;
|
||||
private final QuotaObserverChore chore;
|
||||
private Map<HRegionInfo,Long> regionUsage;
|
||||
|
||||
public TableQuotaViolationStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) {
|
||||
this.conn = Objects.requireNonNull(conn);
|
||||
this.chore = Objects.requireNonNull(chore);
|
||||
this.regionUsage = Objects.requireNonNull(regionUsage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
|
||||
Quotas quotas = getQuotaForTable(subject);
|
||||
if (null != quotas && quotas.hasSpace()) {
|
||||
return quotas.getSpace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
/**
|
||||
* Fetches the table quota. Visible for mocking/testing.
|
||||
*/
|
||||
Quotas getQuotaForTable(TableName table) throws IOException {
|
||||
return QuotaTableUtil.getTableQuota(conn, table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ViolationState getCurrentState(TableName table) {
|
||||
// Defer the "current state" to the chore
|
||||
return chore.getTableQuotaViolation(table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ViolationState getTargetState(TableName table, SpaceQuota spaceQuota) {
|
||||
rlock.lock();
|
||||
try {
|
||||
final long sizeLimitInBytes = spaceQuota.getSoftLimit();
|
||||
long sum = 0L;
|
||||
for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) {
|
||||
sum += entry.getValue();
|
||||
if (sum > sizeLimitInBytes) {
|
||||
// Short-circuit early
|
||||
return ViolationState.IN_VIOLATION;
|
||||
}
|
||||
}
|
||||
// Observance is defined as the size of the table being less than the limit
|
||||
return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : ViolationState.IN_VIOLATION;
|
||||
} finally {
|
||||
rlock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) {
|
||||
rlock.lock();
|
||||
try {
|
||||
return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() {
|
||||
@Override
|
||||
public boolean apply(Entry<HRegionInfo,Long> input) {
|
||||
return table.equals(input.getKey().getTable());
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
rlock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCurrentState(TableName table, ViolationState state) {
|
||||
// Defer the "current state" to the chore
|
||||
this.chore.setTableQuotaViolation(table, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
|
||||
wlock.lock();
|
||||
try {
|
||||
this.regionUsage = Objects.requireNonNull(regionUsage);
|
||||
} finally {
|
||||
wlock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static com.google.common.collect.Iterables.size;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* Test class for {@link NamespaceQuotaViolationStore}.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestNamespaceQuotaViolationStore {
|
||||
private static final long ONE_MEGABYTE = 1024L * 1024L;
|
||||
|
||||
private Connection conn;
|
||||
private QuotaObserverChore chore;
|
||||
private Map<HRegionInfo, Long> regionReports;
|
||||
private NamespaceQuotaViolationStore store;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
conn = mock(Connection.class);
|
||||
chore = mock(QuotaObserverChore.class);
|
||||
regionReports = new HashMap<>();
|
||||
store = new NamespaceQuotaViolationStore(conn, chore, regionReports);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSpaceQuota() throws Exception {
|
||||
NamespaceQuotaViolationStore mockStore = mock(NamespaceQuotaViolationStore.class);
|
||||
when(mockStore.getSpaceQuota(any(String.class))).thenCallRealMethod();
|
||||
|
||||
Quotas quotaWithSpace = Quotas.newBuilder().setSpace(
|
||||
SpaceQuota.newBuilder()
|
||||
.setSoftLimit(1024L)
|
||||
.setViolationPolicy(QuotaProtos.SpaceViolationPolicy.DISABLE)
|
||||
.build())
|
||||
.build();
|
||||
Quotas quotaWithoutSpace = Quotas.newBuilder().build();
|
||||
|
||||
AtomicReference<Quotas> quotaRef = new AtomicReference<>();
|
||||
when(mockStore.getQuotaForNamespace(any(String.class))).then(new Answer<Quotas>() {
|
||||
@Override
|
||||
public Quotas answer(InvocationOnMock invocation) throws Throwable {
|
||||
return quotaRef.get();
|
||||
}
|
||||
});
|
||||
|
||||
quotaRef.set(quotaWithSpace);
|
||||
assertEquals(quotaWithSpace.getSpace(), mockStore.getSpaceQuota("ns"));
|
||||
quotaRef.set(quotaWithoutSpace);
|
||||
assertNull(mockStore.getSpaceQuota("ns"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTargetViolationState() {
|
||||
final String NS = "ns";
|
||||
TableName tn1 = TableName.valueOf(NS, "tn1");
|
||||
TableName tn2 = TableName.valueOf(NS, "tn2");
|
||||
TableName tn3 = TableName.valueOf("tn3");
|
||||
SpaceQuota quota = SpaceQuota.newBuilder()
|
||||
.setSoftLimit(ONE_MEGABYTE)
|
||||
.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(SpaceViolationPolicy.DISABLE))
|
||||
.build();
|
||||
|
||||
// Create some junk data to filter. Makes sure it's so large that it would
|
||||
// immediately violate the quota.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)),
|
||||
5L * ONE_MEGABYTE);
|
||||
}
|
||||
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(0), Bytes.toBytes(1)), 1024L * 512L);
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L);
|
||||
|
||||
// Below the quota
|
||||
assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota));
|
||||
|
||||
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L);
|
||||
|
||||
// Equal to the quota is still in observance
|
||||
assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota));
|
||||
|
||||
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L);
|
||||
|
||||
// Exceeds the quota, should be in violation
|
||||
assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(NS, quota));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterRegionsByNamespace() {
|
||||
TableName tn1 = TableName.valueOf("foo");
|
||||
TableName tn2 = TableName.valueOf("sn", "bar");
|
||||
TableName tn3 = TableName.valueOf("ns", "foo");
|
||||
TableName tn4 = TableName.valueOf("ns", "bar");
|
||||
|
||||
assertEquals(0, size(store.filterBySubject("asdf")));
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L);
|
||||
}
|
||||
for (int i = 0; i < 3; i++) {
|
||||
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L);
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L);
|
||||
}
|
||||
for (int i = 0; i < 8; i++) {
|
||||
regionReports.put(new HRegionInfo(tn4, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L);
|
||||
}
|
||||
assertEquals(26, regionReports.size());
|
||||
assertEquals(5, size(store.filterBySubject(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR)));
|
||||
assertEquals(3, size(store.filterBySubject("sn")));
|
||||
assertEquals(18, size(store.filterBySubject("ns")));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* Non-HBase cluster unit tests for {@link QuotaObserverChore}.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestQuotaObserverChore {
|
||||
private Connection conn;
|
||||
private QuotaObserverChore chore;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
conn = mock(Connection.class);
|
||||
chore = mock(QuotaObserverChore.class);
|
||||
// Set up some rules to call the real method on the mock.
|
||||
when(chore.getViolationPolicy(any(SpaceQuota.class))).thenCallRealMethod();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumRegionsForTable() {
|
||||
TableName tn1 = TableName.valueOf("t1");
|
||||
TableName tn2 = TableName.valueOf("t2");
|
||||
TableName tn3 = TableName.valueOf("t3");
|
||||
|
||||
final int numTable1Regions = 10;
|
||||
final int numTable2Regions = 15;
|
||||
final int numTable3Regions = 8;
|
||||
Map<HRegionInfo,Long> regionReports = new HashMap<>();
|
||||
for (int i = 0; i < numTable1Regions; i++) {
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numTable2Regions; i++) {
|
||||
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numTable3Regions; i++) {
|
||||
regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L);
|
||||
}
|
||||
|
||||
TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionReports);
|
||||
when(chore.getTableViolationStore()).thenReturn(store);
|
||||
|
||||
assertEquals(numTable1Regions, Iterables.size(store.filterBySubject(tn1)));
|
||||
assertEquals(numTable2Regions, Iterables.size(store.filterBySubject(tn2)));
|
||||
assertEquals(numTable3Regions, Iterables.size(store.filterBySubject(tn3)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractViolationPolicy() {
|
||||
for (SpaceViolationPolicy policy : SpaceViolationPolicy.values()) {
|
||||
SpaceQuota spaceQuota = SpaceQuota.newBuilder()
|
||||
.setSoftLimit(1024L)
|
||||
.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
|
||||
.build();
|
||||
assertEquals(policy, chore.getViolationPolicy(spaceQuota));
|
||||
}
|
||||
SpaceQuota malformedQuota = SpaceQuota.newBuilder()
|
||||
.setSoftLimit(1024L)
|
||||
.build();
|
||||
try {
|
||||
chore.getViolationPolicy(malformedQuota);
|
||||
fail("Should have thrown an IllegalArgumentException.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Pass
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,596 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.NamespaceNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Multimap;
|
||||
|
||||
/**
|
||||
* Test class for {@link QuotaObserverChore} that uses a live HBase cluster.
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestQuotaObserverChoreWithMiniCluster {
|
||||
private static final Log LOG = LogFactory.getLog(TestQuotaObserverChoreWithMiniCluster.class);
|
||||
private static final int SIZE_PER_VALUE = 256;
|
||||
private static final String F1 = "f1";
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final AtomicLong COUNTER = new AtomicLong(0);
|
||||
private static final long ONE_MEGABYTE = 1024L * 1024L;
|
||||
private static final long DEFAULT_WAIT_MILLIS = 500;
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
private HMaster master;
|
||||
private QuotaObserverChore chore;
|
||||
private SpaceQuotaViolationNotifierForTest violationNotifier;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
|
||||
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
|
||||
conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000);
|
||||
conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000);
|
||||
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void removeAllQuotas() throws Exception {
|
||||
final Connection conn = TEST_UTIL.getConnection();
|
||||
// Wait for the quota table to be created
|
||||
if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
|
||||
do {
|
||||
LOG.debug("Quota table does not yet exist");
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME));
|
||||
} else {
|
||||
// Or, clean up any quotas from previous test runs.
|
||||
QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration());
|
||||
for (QuotaSettings quotaSettings : scanner) {
|
||||
final String namespace = quotaSettings.getNamespace();
|
||||
final TableName tableName = quotaSettings.getTableName();
|
||||
if (null != namespace) {
|
||||
LOG.debug("Deleting quota for namespace: " + namespace);
|
||||
QuotaUtil.deleteNamespaceQuota(conn, namespace);
|
||||
} else {
|
||||
assert null != tableName;
|
||||
LOG.debug("Deleting quota for table: "+ tableName);
|
||||
QuotaUtil.deleteTableQuota(conn, tableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
master = TEST_UTIL.getMiniHBaseCluster().getMaster();
|
||||
violationNotifier =
|
||||
(SpaceQuotaViolationNotifierForTest) master.getSpaceQuotaViolationNotifier();
|
||||
violationNotifier.clearTableViolations();
|
||||
chore = master.getQuotaObserverChore();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableViolatesQuota() throws Exception {
|
||||
TableName tn = createTableWithRegions(10);
|
||||
|
||||
final long sizeLimit = 2L * ONE_MEGABYTE;
|
||||
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
|
||||
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
|
||||
TEST_UTIL.getAdmin().setQuota(settings);
|
||||
|
||||
// Write more data than should be allowed
|
||||
writeData(tn, 3L * ONE_MEGABYTE);
|
||||
|
||||
Map<TableName,SpaceViolationPolicy> violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
while (violatedQuotas.isEmpty()) {
|
||||
LOG.info("Found no violated quotas, sleeping and retrying. Current reports: "
|
||||
+ master.getMasterQuotaManager().snapshotRegionSizes());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping.", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
}
|
||||
|
||||
Entry<TableName,SpaceViolationPolicy> entry = Iterables.getOnlyElement(violatedQuotas.entrySet());
|
||||
assertEquals(tn, entry.getKey());
|
||||
assertEquals(violationPolicy, entry.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamespaceViolatesQuota() throws Exception {
|
||||
final String namespace = testName.getMethodName();
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
// Ensure the namespace exists
|
||||
try {
|
||||
admin.getNamespaceDescriptor(namespace);
|
||||
} catch (NamespaceNotFoundException e) {
|
||||
NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
|
||||
admin.createNamespace(desc);
|
||||
}
|
||||
|
||||
TableName tn1 = createTableWithRegions(namespace, 5);
|
||||
TableName tn2 = createTableWithRegions(namespace, 5);
|
||||
TableName tn3 = createTableWithRegions(namespace, 5);
|
||||
|
||||
final long sizeLimit = 5L * ONE_MEGABYTE;
|
||||
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE;
|
||||
QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy);
|
||||
admin.setQuota(settings);
|
||||
|
||||
writeData(tn1, 2L * ONE_MEGABYTE);
|
||||
admin.flush(tn1);
|
||||
Map<TableName,SpaceViolationPolicy> violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// Check a few times to make sure we don't prematurely move to violation
|
||||
assertEquals("Should not see any quota violations after writing 2MB of data", 0, violatedQuotas.size());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping." , e);
|
||||
}
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
}
|
||||
|
||||
writeData(tn2, 2L * ONE_MEGABYTE);
|
||||
admin.flush(tn2);
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// Check a few times to make sure we don't prematurely move to violation
|
||||
assertEquals("Should not see any quota violations after writing 4MB of data", 0,
|
||||
violatedQuotas.size());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping." , e);
|
||||
}
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
}
|
||||
|
||||
// Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total)
|
||||
// and should push all three tables in the namespace into violation.
|
||||
writeData(tn3, 2L * ONE_MEGABYTE);
|
||||
admin.flush(tn3);
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
while (violatedQuotas.size() < 3) {
|
||||
LOG.debug("Saw fewer violations than desired (expected 3): " + violatedQuotas
|
||||
+ ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping.", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
}
|
||||
|
||||
SpaceViolationPolicy vp1 = violatedQuotas.remove(tn1);
|
||||
assertNotNull("tn1 should be in violation", vp1);
|
||||
assertEquals(violationPolicy, vp1);
|
||||
SpaceViolationPolicy vp2 = violatedQuotas.remove(tn2);
|
||||
assertNotNull("tn2 should be in violation", vp2);
|
||||
assertEquals(violationPolicy, vp2);
|
||||
SpaceViolationPolicy vp3 = violatedQuotas.remove(tn3);
|
||||
assertNotNull("tn3 should be in violation", vp3);
|
||||
assertEquals(violationPolicy, vp3);
|
||||
assertTrue("Unexpected additional quota violations: " + violatedQuotas, violatedQuotas.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableQuotaOverridesNamespaceQuota() throws Exception {
|
||||
final String namespace = testName.getMethodName();
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
// Ensure the namespace exists
|
||||
try {
|
||||
admin.getNamespaceDescriptor(namespace);
|
||||
} catch (NamespaceNotFoundException e) {
|
||||
NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
|
||||
admin.createNamespace(desc);
|
||||
}
|
||||
|
||||
TableName tn1 = createTableWithRegions(namespace, 5);
|
||||
TableName tn2 = createTableWithRegions(namespace, 5);
|
||||
|
||||
final long namespaceSizeLimit = 3L * ONE_MEGABYTE;
|
||||
final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE;
|
||||
QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace,
|
||||
namespaceSizeLimit, namespaceViolationPolicy);
|
||||
admin.setQuota(namespaceSettings);
|
||||
|
||||
writeData(tn1, 2L * ONE_MEGABYTE);
|
||||
admin.flush(tn1);
|
||||
Map<TableName,SpaceViolationPolicy> violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// Check a few times to make sure we don't prematurely move to violation
|
||||
assertEquals("Should not see any quota violations after writing 2MB of data", 0,
|
||||
violatedQuotas.size());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping." , e);
|
||||
}
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
}
|
||||
|
||||
writeData(tn2, 2L * ONE_MEGABYTE);
|
||||
admin.flush(tn2);
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
while (violatedQuotas.size() < 2) {
|
||||
LOG.debug("Saw fewer violations than desired (expected 2): " + violatedQuotas
|
||||
+ ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping.", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
}
|
||||
|
||||
SpaceViolationPolicy actualPolicyTN1 = violatedQuotas.get(tn1);
|
||||
assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
|
||||
assertEquals(namespaceViolationPolicy, actualPolicyTN1);
|
||||
SpaceViolationPolicy actualPolicyTN2 = violatedQuotas.get(tn2);
|
||||
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
||||
assertEquals(namespaceViolationPolicy, actualPolicyTN2);
|
||||
|
||||
// Override the namespace quota with a table quota
|
||||
final long tableSizeLimit = ONE_MEGABYTE;
|
||||
final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS;
|
||||
QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit,
|
||||
tableViolationPolicy);
|
||||
admin.setQuota(tableSettings);
|
||||
|
||||
// Keep checking for the table quota policy to override the namespace quota
|
||||
while (true) {
|
||||
violatedQuotas = violationNotifier.snapshotTablesInViolation();
|
||||
SpaceViolationPolicy actualTableViolationPolicy = violatedQuotas.get(tn1);
|
||||
assertNotNull("Violation policy should never be null", actualTableViolationPolicy);
|
||||
if (tableViolationPolicy != actualTableViolationPolicy) {
|
||||
LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
assertEquals(tableViolationPolicy, actualTableViolationPolicy);
|
||||
break;
|
||||
}
|
||||
|
||||
// This should not change with the introduction of the table quota for tn1
|
||||
actualPolicyTN2 = violatedQuotas.get(tn2);
|
||||
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
||||
assertEquals(namespaceViolationPolicy, actualPolicyTN2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllTablesWithQuotas() throws Exception {
|
||||
final Multimap<TableName, QuotaSettings> quotas = createTablesWithSpaceQuotas();
|
||||
Set<TableName> tablesWithQuotas = new HashSet<>();
|
||||
Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
|
||||
// Partition the tables with quotas by table and ns quota
|
||||
partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
|
||||
|
||||
TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
|
||||
assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
|
||||
assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcQuotaTablesAreFiltered() throws Exception {
|
||||
final Multimap<TableName, QuotaSettings> quotas = createTablesWithSpaceQuotas();
|
||||
Set<TableName> tablesWithQuotas = new HashSet<>();
|
||||
Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
|
||||
// Partition the tables with quotas by table and ns quota
|
||||
partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
|
||||
|
||||
TableName rpcQuotaTable = createTable();
|
||||
TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory
|
||||
.throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
|
||||
// The `rpcQuotaTable` should not be included in this Set
|
||||
TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
|
||||
assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
|
||||
assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterRegions() throws Exception {
|
||||
Map<TableName,Integer> mockReportedRegions = new HashMap<>();
|
||||
// Can't mock because of primitive int as a return type -- Mockito
|
||||
// can only handle an Integer.
|
||||
TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(),
|
||||
TEST_UTIL.getConfiguration()) {
|
||||
@Override
|
||||
int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore) {
|
||||
Integer i = mockReportedRegions.get(table);
|
||||
if (null == i) {
|
||||
return 0;
|
||||
}
|
||||
return i;
|
||||
}
|
||||
};
|
||||
|
||||
// Create the tables
|
||||
TableName tn1 = createTableWithRegions(20);
|
||||
TableName tn2 = createTableWithRegions(20);
|
||||
TableName tn3 = createTableWithRegions(20);
|
||||
|
||||
// Add them to the Tables with Quotas object
|
||||
tables.addTableQuotaTable(tn1);
|
||||
tables.addTableQuotaTable(tn2);
|
||||
tables.addTableQuotaTable(tn3);
|
||||
|
||||
// Mock the number of regions reported
|
||||
mockReportedRegions.put(tn1, 10); // 50%
|
||||
mockReportedRegions.put(tn2, 19); // 95%
|
||||
mockReportedRegions.put(tn3, 20); // 100%
|
||||
|
||||
// Argument is un-used
|
||||
tables.filterInsufficientlyReportedTables(null);
|
||||
// The default of 95% reported should prevent tn1 from appearing
|
||||
assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchSpaceQuota() throws Exception {
|
||||
Multimap<TableName,QuotaSettings> tables = createTablesWithSpaceQuotas();
|
||||
// Can pass in an empty map, we're not consulting it.
|
||||
chore.initializeViolationStores(Collections.emptyMap());
|
||||
// All tables that were created should have a quota defined.
|
||||
for (Entry<TableName,QuotaSettings> entry : tables.entries()) {
|
||||
final TableName table = entry.getKey();
|
||||
final QuotaSettings qs = entry.getValue();
|
||||
|
||||
assertTrue("QuotaSettings was an instance of " + qs.getClass(),
|
||||
qs instanceof SpaceLimitSettings);
|
||||
|
||||
SpaceQuota spaceQuota = null;
|
||||
if (null != qs.getTableName()) {
|
||||
spaceQuota = chore.getTableViolationStore().getSpaceQuota(table);
|
||||
assertNotNull("Could not find table space quota for " + table, spaceQuota);
|
||||
} else if (null != qs.getNamespace()) {
|
||||
spaceQuota = chore.getNamespaceViolationStore().getSpaceQuota(table.getNamespaceAsString());
|
||||
assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota);
|
||||
} else {
|
||||
fail("Expected table or namespace space quota");
|
||||
}
|
||||
|
||||
final SpaceLimitSettings sls = (SpaceLimitSettings) qs;
|
||||
assertEquals(sls.getProto().getQuota(), spaceQuota);
|
||||
}
|
||||
|
||||
TableName tableWithoutQuota = createTable();
|
||||
assertNull(chore.getTableViolationStore().getSpaceQuota(tableWithoutQuota));
|
||||
}
|
||||
|
||||
//
|
||||
// Helpers
|
||||
//
|
||||
|
||||
void writeData(TableName tn, long sizeInBytes) throws IOException {
|
||||
final Connection conn = TEST_UTIL.getConnection();
|
||||
final Table table = conn.getTable(tn);
|
||||
try {
|
||||
List<Put> updates = new ArrayList<>();
|
||||
long bytesToWrite = sizeInBytes;
|
||||
long rowKeyId = 0L;
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
final Random r = new Random();
|
||||
while (bytesToWrite > 0L) {
|
||||
sb.setLength(0);
|
||||
sb.append(Long.toString(rowKeyId));
|
||||
// Use the reverse counter as the rowKey to get even spread across all regions
|
||||
Put p = new Put(Bytes.toBytes(sb.reverse().toString()));
|
||||
byte[] value = new byte[SIZE_PER_VALUE];
|
||||
r.nextBytes(value);
|
||||
p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value);
|
||||
updates.add(p);
|
||||
|
||||
// Batch 50K worth of updates
|
||||
if (updates.size() > 50) {
|
||||
table.put(updates);
|
||||
updates.clear();
|
||||
}
|
||||
|
||||
// Just count the value size, ignore the size of rowkey + column
|
||||
bytesToWrite -= SIZE_PER_VALUE;
|
||||
rowKeyId++;
|
||||
}
|
||||
|
||||
// Write the final batch
|
||||
if (!updates.isEmpty()) {
|
||||
table.put(updates);
|
||||
}
|
||||
|
||||
LOG.debug("Data was written to HBase");
|
||||
// Push the data to disk.
|
||||
TEST_UTIL.getAdmin().flush(tn);
|
||||
LOG.debug("Data flushed to disk");
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
Multimap<TableName, QuotaSettings> createTablesWithSpaceQuotas() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final Multimap<TableName, QuotaSettings> tablesWithQuotas = HashMultimap.create();
|
||||
|
||||
final TableName tn1 = createTable();
|
||||
final TableName tn2 = createTable();
|
||||
|
||||
NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + COUNTER.getAndIncrement()).build();
|
||||
admin.createNamespace(nd);
|
||||
final TableName tn3 = createTableInNamespace(nd);
|
||||
final TableName tn4 = createTableInNamespace(nd);
|
||||
final TableName tn5 = createTableInNamespace(nd);
|
||||
|
||||
final long sizeLimit1 = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
|
||||
final SpaceViolationPolicy violationPolicy1 = SpaceViolationPolicy.NO_WRITES;
|
||||
QuotaSettings qs1 = QuotaSettingsFactory.limitTableSpace(tn1, sizeLimit1, violationPolicy1);
|
||||
tablesWithQuotas.put(tn1, qs1);
|
||||
admin.setQuota(qs1);
|
||||
|
||||
final long sizeLimit2 = 1024L * 1024L * 1024L * 200L; // 200GB
|
||||
final SpaceViolationPolicy violationPolicy2 = SpaceViolationPolicy.NO_WRITES_COMPACTIONS;
|
||||
QuotaSettings qs2 = QuotaSettingsFactory.limitTableSpace(tn2, sizeLimit2, violationPolicy2);
|
||||
tablesWithQuotas.put(tn2, qs2);
|
||||
admin.setQuota(qs2);
|
||||
|
||||
final long sizeLimit3 = 1024L * 1024L * 1024L * 1024L * 100L; // 100TB
|
||||
final SpaceViolationPolicy violationPolicy3 = SpaceViolationPolicy.NO_INSERTS;
|
||||
QuotaSettings qs3 = QuotaSettingsFactory.limitNamespaceSpace(nd.getName(), sizeLimit3, violationPolicy3);
|
||||
tablesWithQuotas.put(tn3, qs3);
|
||||
tablesWithQuotas.put(tn4, qs3);
|
||||
tablesWithQuotas.put(tn5, qs3);
|
||||
admin.setQuota(qs3);
|
||||
|
||||
final long sizeLimit4 = 1024L * 1024L * 1024L * 5L; // 5GB
|
||||
final SpaceViolationPolicy violationPolicy4 = SpaceViolationPolicy.NO_INSERTS;
|
||||
QuotaSettings qs4 = QuotaSettingsFactory.limitTableSpace(tn5, sizeLimit4, violationPolicy4);
|
||||
// Override the ns quota for tn5, import edge-case to catch table quota taking
|
||||
// precedence over ns quota.
|
||||
tablesWithQuotas.put(tn5, qs4);
|
||||
admin.setQuota(qs4);
|
||||
|
||||
return tablesWithQuotas;
|
||||
}
|
||||
|
||||
TableName createTable() throws Exception {
|
||||
return createTableWithRegions(1);
|
||||
}
|
||||
|
||||
TableName createTableWithRegions(int numRegions) throws Exception {
|
||||
return createTableWithRegions(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, numRegions);
|
||||
}
|
||||
|
||||
TableName createTableWithRegions(String namespace, int numRegions) throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final TableName tn = TableName.valueOf(namespace, testName.getMethodName() + COUNTER.getAndIncrement());
|
||||
|
||||
// Delete the old table
|
||||
if (admin.tableExists(tn)) {
|
||||
admin.disableTable(tn);
|
||||
admin.deleteTable(tn);
|
||||
}
|
||||
|
||||
// Create the table
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tn);
|
||||
tableDesc.addFamily(new HColumnDescriptor(F1));
|
||||
if (numRegions == 1) {
|
||||
admin.createTable(tableDesc);
|
||||
} else {
|
||||
admin.createTable(tableDesc, Bytes.toBytes("0"), Bytes.toBytes("9"), numRegions);
|
||||
}
|
||||
return tn;
|
||||
}
|
||||
|
||||
TableName createTableInNamespace(NamespaceDescriptor nd) throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final TableName tn = TableName.valueOf(nd.getName(),
|
||||
testName.getMethodName() + COUNTER.getAndIncrement());
|
||||
|
||||
// Delete the old table
|
||||
if (admin.tableExists(tn)) {
|
||||
admin.disableTable(tn);
|
||||
admin.deleteTable(tn);
|
||||
}
|
||||
|
||||
// Create the table
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tn);
|
||||
tableDesc.addFamily(new HColumnDescriptor(F1));
|
||||
|
||||
admin.createTable(tableDesc);
|
||||
return tn;
|
||||
}
|
||||
|
||||
void partitionTablesByQuotaTarget(Multimap<TableName,QuotaSettings> quotas,
|
||||
Set<TableName> tablesWithTableQuota, Set<TableName> tablesWithNamespaceQuota) {
|
||||
// Partition the tables with quotas by table and ns quota
|
||||
for (Entry<TableName, QuotaSettings> entry : quotas.entries()) {
|
||||
SpaceLimitSettings settings = (SpaceLimitSettings) entry.getValue();
|
||||
TableName tn = entry.getKey();
|
||||
if (null != settings.getTableName()) {
|
||||
tablesWithTableQuota.add(tn);
|
||||
}
|
||||
if (null != settings.getNamespace()) {
|
||||
tablesWithNamespaceQuota.add(tn);
|
||||
}
|
||||
|
||||
if (null == settings.getTableName() && null == settings.getNamespace()) {
|
||||
fail("Unexpected table name with null tableName and namespace: " + tn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,14 +23,11 @@ import static org.junit.Assert.assertEquals;
|
|||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
|
||||
|
@ -50,7 +47,6 @@ import org.junit.rules.TestName;
|
|||
*/
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestQuotaTableUtil {
|
||||
private static final Log LOG = LogFactory.getLog(TestQuotaTableUtil.class);
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private Connection connection;
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static com.google.common.collect.Iterables.size;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* Test class for {@link TableQuotaViolationStore}.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestTableQuotaViolationStore {
|
||||
private static final long ONE_MEGABYTE = 1024L * 1024L;
|
||||
|
||||
private Connection conn;
|
||||
private QuotaObserverChore chore;
|
||||
private Map<HRegionInfo, Long> regionReports;
|
||||
private TableQuotaViolationStore store;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
conn = mock(Connection.class);
|
||||
chore = mock(QuotaObserverChore.class);
|
||||
regionReports = new HashMap<>();
|
||||
store = new TableQuotaViolationStore(conn, chore, regionReports);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterRegionsByTable() throws Exception {
|
||||
TableName tn1 = TableName.valueOf("foo");
|
||||
TableName tn2 = TableName.valueOf("bar");
|
||||
TableName tn3 = TableName.valueOf("ns", "foo");
|
||||
|
||||
assertEquals(0, size(store.filterBySubject(tn1)));
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L);
|
||||
}
|
||||
for (int i = 0; i < 3; i++) {
|
||||
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L);
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L);
|
||||
}
|
||||
assertEquals(18, regionReports.size());
|
||||
assertEquals(5, size(store.filterBySubject(tn1)));
|
||||
assertEquals(3, size(store.filterBySubject(tn2)));
|
||||
assertEquals(10, size(store.filterBySubject(tn3)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTargetViolationState() {
|
||||
TableName tn1 = TableName.valueOf("violation1");
|
||||
TableName tn2 = TableName.valueOf("observance1");
|
||||
TableName tn3 = TableName.valueOf("observance2");
|
||||
SpaceQuota quota = SpaceQuota.newBuilder()
|
||||
.setSoftLimit(1024L * 1024L)
|
||||
.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(SpaceViolationPolicy.DISABLE))
|
||||
.build();
|
||||
|
||||
// Create some junk data to filter. Makes sure it's so large that it would
|
||||
// immediately violate the quota.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i + 1)),
|
||||
5L * ONE_MEGABYTE);
|
||||
regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)),
|
||||
5L * ONE_MEGABYTE);
|
||||
}
|
||||
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(0), Bytes.toBytes(1)), 1024L * 512L);
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L);
|
||||
|
||||
// Below the quota
|
||||
assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, quota));
|
||||
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L);
|
||||
|
||||
// Equal to the quota is still in observance
|
||||
assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, quota));
|
||||
|
||||
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L);
|
||||
|
||||
// Exceeds the quota, should be in violation
|
||||
assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(tn1, quota));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSpaceQuota() throws Exception {
|
||||
TableQuotaViolationStore mockStore = mock(TableQuotaViolationStore.class);
|
||||
when(mockStore.getSpaceQuota(any(TableName.class))).thenCallRealMethod();
|
||||
|
||||
Quotas quotaWithSpace = Quotas.newBuilder().setSpace(
|
||||
SpaceQuota.newBuilder()
|
||||
.setSoftLimit(1024L)
|
||||
.setViolationPolicy(QuotaProtos.SpaceViolationPolicy.DISABLE)
|
||||
.build())
|
||||
.build();
|
||||
Quotas quotaWithoutSpace = Quotas.newBuilder().build();
|
||||
|
||||
AtomicReference<Quotas> quotaRef = new AtomicReference<>();
|
||||
when(mockStore.getQuotaForTable(any(TableName.class))).then(new Answer<Quotas>() {
|
||||
@Override
|
||||
public Quotas answer(InvocationOnMock invocation) throws Throwable {
|
||||
return quotaRef.get();
|
||||
}
|
||||
});
|
||||
|
||||
quotaRef.set(quotaWithSpace);
|
||||
assertEquals(quotaWithSpace.getSpace(), mockStore.getSpaceQuota(TableName.valueOf("foo")));
|
||||
quotaRef.set(quotaWithoutSpace);
|
||||
assertNull(mockStore.getSpaceQuota(TableName.valueOf("foo")));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,198 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Multimap;
|
||||
|
||||
/**
|
||||
* Non-HBase cluster unit tests for {@link TablesWithQuotas}.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestTablesWithQuotas {
|
||||
private Connection conn;
|
||||
private Configuration conf;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
conn = mock(Connection.class);
|
||||
conf = HBaseConfiguration.create();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testImmutableGetters() {
|
||||
Set<TableName> tablesWithTableQuotas = new HashSet<>();
|
||||
Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
|
||||
final TablesWithQuotas tables = new TablesWithQuotas(conn, conf);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
TableName tn = TableName.valueOf("tn" + i);
|
||||
tablesWithTableQuotas.add(tn);
|
||||
tables.addTableQuotaTable(tn);
|
||||
}
|
||||
for (int i = 0; i < 3; i++) {
|
||||
TableName tn = TableName.valueOf("tn_ns" + i);
|
||||
tablesWithNamespaceQuotas.add(tn);
|
||||
tables.addNamespaceQuotaTable(tn);
|
||||
}
|
||||
Set<TableName> actualTableQuotaTables = tables.getTableQuotaTables();
|
||||
Set<TableName> actualNamespaceQuotaTables = tables.getNamespaceQuotaTables();
|
||||
assertEquals(tablesWithTableQuotas, actualTableQuotaTables);
|
||||
assertEquals(tablesWithNamespaceQuotas, actualNamespaceQuotaTables);
|
||||
try {
|
||||
actualTableQuotaTables.add(null);
|
||||
fail("Should not be able to add an element");
|
||||
} catch (UnsupportedOperationException e) {
|
||||
// pass
|
||||
}
|
||||
try {
|
||||
actualNamespaceQuotaTables.add(null);
|
||||
fail("Should not be able to add an element");
|
||||
} catch (UnsupportedOperationException e) {
|
||||
// pass
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsufficientlyReportedTableFiltering() throws Exception {
|
||||
final Map<TableName,Integer> reportedRegions = new HashMap<>();
|
||||
final Map<TableName,Integer> actualRegions = new HashMap<>();
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
conf.setDouble(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY, 0.95);
|
||||
|
||||
TableName tooFewRegionsTable = TableName.valueOf("tn1");
|
||||
TableName sufficientRegionsTable = TableName.valueOf("tn2");
|
||||
TableName tooFewRegionsNamespaceTable = TableName.valueOf("ns1", "tn2");
|
||||
TableName sufficientRegionsNamespaceTable = TableName.valueOf("ns1", "tn2");
|
||||
final TablesWithQuotas tables = new TablesWithQuotas(conn, conf) {
|
||||
@Override
|
||||
Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
int getNumRegions(TableName tableName) {
|
||||
return actualRegions.get(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore) {
|
||||
return reportedRegions.get(table);
|
||||
}
|
||||
};
|
||||
tables.addTableQuotaTable(tooFewRegionsTable);
|
||||
tables.addTableQuotaTable(sufficientRegionsTable);
|
||||
tables.addNamespaceQuotaTable(tooFewRegionsNamespaceTable);
|
||||
tables.addNamespaceQuotaTable(sufficientRegionsNamespaceTable);
|
||||
|
||||
reportedRegions.put(tooFewRegionsTable, 5);
|
||||
actualRegions.put(tooFewRegionsTable, 10);
|
||||
reportedRegions.put(sufficientRegionsTable, 19);
|
||||
actualRegions.put(sufficientRegionsTable, 20);
|
||||
reportedRegions.put(tooFewRegionsNamespaceTable, 9);
|
||||
actualRegions.put(tooFewRegionsNamespaceTable, 10);
|
||||
reportedRegions.put(sufficientRegionsNamespaceTable, 98);
|
||||
actualRegions.put(sufficientRegionsNamespaceTable, 100);
|
||||
|
||||
// Unused argument
|
||||
tables.filterInsufficientlyReportedTables(null);
|
||||
Set<TableName> filteredTablesWithTableQuotas = tables.getTableQuotaTables();
|
||||
assertEquals(Collections.singleton(sufficientRegionsTable), filteredTablesWithTableQuotas);
|
||||
Set<TableName> filteredTablesWithNamespaceQutoas = tables.getNamespaceQuotaTables();
|
||||
assertEquals(Collections.singleton(sufficientRegionsNamespaceTable), filteredTablesWithNamespaceQutoas);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTablesByNamespace() {
|
||||
final TablesWithQuotas tables = new TablesWithQuotas(conn, conf);
|
||||
tables.addTableQuotaTable(TableName.valueOf("ignored1"));
|
||||
tables.addTableQuotaTable(TableName.valueOf("ignored2"));
|
||||
tables.addNamespaceQuotaTable(TableName.valueOf("ns1", "t1"));
|
||||
tables.addNamespaceQuotaTable(TableName.valueOf("ns1", "t2"));
|
||||
tables.addNamespaceQuotaTable(TableName.valueOf("ns1", "t3"));
|
||||
tables.addNamespaceQuotaTable(TableName.valueOf("ns2", "t1"));
|
||||
tables.addNamespaceQuotaTable(TableName.valueOf("ns2", "t2"));
|
||||
|
||||
Multimap<String,TableName> tablesByNamespace = tables.getTablesByNamespace();
|
||||
Collection<TableName> tablesInNs = tablesByNamespace.get("ns1");
|
||||
assertEquals(3, tablesInNs.size());
|
||||
assertTrue("Unexpected results for ns1: " + tablesInNs,
|
||||
tablesInNs.containsAll(Arrays.asList(
|
||||
TableName.valueOf("ns1", "t1"),
|
||||
TableName.valueOf("ns1", "t2"),
|
||||
TableName.valueOf("ns1", "t3"))));
|
||||
tablesInNs = tablesByNamespace.get("ns2");
|
||||
assertEquals(2, tablesInNs.size());
|
||||
assertTrue("Unexpected results for ns2: " + tablesInNs,
|
||||
tablesInNs.containsAll(Arrays.asList(
|
||||
TableName.valueOf("ns2", "t1"),
|
||||
TableName.valueOf("ns2", "t2"))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilteringMissingTables() throws Exception {
|
||||
final TableName missingTable = TableName.valueOf("doesNotExist");
|
||||
// Set up Admin to return null (match the implementation)
|
||||
Admin admin = mock(Admin.class);
|
||||
when(conn.getAdmin()).thenReturn(admin);
|
||||
when(admin.getTableRegions(missingTable)).thenReturn(null);
|
||||
|
||||
QuotaObserverChore chore = mock(QuotaObserverChore.class);
|
||||
Map<HRegionInfo,Long> regionUsage = new HashMap<>();
|
||||
TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionUsage);
|
||||
|
||||
// A super dirty hack to verify that, after getting no regions for our table,
|
||||
// we bail out and start processing the next element (which there is none).
|
||||
final TablesWithQuotas tables = new TablesWithQuotas(conn, conf) {
|
||||
@Override
|
||||
int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore) {
|
||||
throw new RuntimeException("Should should not reach here");
|
||||
}
|
||||
};
|
||||
tables.addTableQuotaTable(missingTable);
|
||||
|
||||
tables.filterInsufficientlyReportedTables(store);
|
||||
|
||||
final Set<TableName> tablesWithQuotas = tables.getTableQuotaTables();
|
||||
assertTrue(
|
||||
"Expected to find no tables, but found " + tablesWithQuotas, tablesWithQuotas.isEmpty());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue