HBASE-16999 Implement master and regionserver synchronization of quota state

* Implement the RegionServer reading violation from the quota table
* Implement the Master reporting violations to the quota table
* RegionServers need to track its enforced policies
This commit is contained in:
Josh Elser 2016-11-18 15:38:19 -05:00
parent 533470f8c8
commit 98b4181f43
21 changed files with 1085 additions and 30 deletions

View File

@ -24,16 +24,20 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@ -44,7 +48,12 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
@ -53,9 +62,8 @@ import org.apache.hadoop.hbase.util.Strings;
* <pre>
* ROW-KEY FAM/QUAL DATA
* n.&lt;namespace&gt; q:s &lt;global-quotas&gt;
* n.&lt;namespace&gt; u:du &lt;size in bytes&gt;
* t.&lt;table&gt; q:s &lt;global-quotas&gt;
* t.&lt;table&gt; u:du &lt;size in bytes&gt;
* t.&lt;table&gt; u:v &lt;space violation policy&gt;
* u.&lt;user&gt; q:s &lt;global-quotas&gt;
* u.&lt;user&gt; q:s.&lt;table&gt; &lt;table-quotas&gt;
* u.&lt;user&gt; q:s.&lt;ns&gt;: &lt;namespace-quotas&gt;
@ -74,7 +82,7 @@ public class QuotaTableUtil {
protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
protected static final byte[] QUOTA_QUALIFIER_DISKUSAGE = Bytes.toBytes("du");
protected static final byte[] QUOTA_QUALIFIER_VIOLATION = Bytes.toBytes("v");
protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
@ -203,6 +211,51 @@ public class QuotaTableUtil {
return filterList;
}
/**
* Creates a {@link Scan} which returns only quota violations from the quota table.
*/
public static Scan makeQuotaViolationScan() {
Scan s = new Scan();
// Limit to "u:v" column
s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
// Limit rowspace to the "t:" prefix
s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
return s;
}
/**
* Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
* {@link Result} and adds them to the given {@link Map}. If the result does not contain
* the expected information or the serialized policy in the value is invalid, this method
* will throw an {@link IllegalArgumentException}.
*
* @param result A row from the quota table.
* @param policies A map of policies to add the result of this method into.
*/
public static void extractViolationPolicy(
Result result, Map<TableName,SpaceViolationPolicy> policies) {
byte[] row = Objects.requireNonNull(result).getRow();
if (null == row) {
throw new IllegalArgumentException("Provided result had a null row");
}
final TableName targetTableName = getTableFromRowKey(row);
Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
if (null == c) {
throw new IllegalArgumentException("Result did not contain the expected column "
+ Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_VIOLATION)
+ ", " + result.toString());
}
ByteString buffer = UnsafeByteOperations.unsafeWrap(
c.getValueArray(), c.getValueOffset(), c.getValueLength());
try {
SpaceQuota quota = SpaceQuota.parseFrom(buffer);
policies.put(targetTableName, getViolationPolicy(quota));
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Result did not contain a valid SpaceQuota protocol buffer message", e);
}
}
public static interface UserQuotasVisitor {
void visitUserQuotas(final String userName, final Quotas quotas)
throws IOException;
@ -329,6 +382,26 @@ public class QuotaTableUtil {
}
}
/**
* Creates a {@link Put} to enable the given <code>policy</code> on the <code>table</code>.
*/
public static Put createEnableViolationPolicyUpdate(
TableName tableName, SpaceViolationPolicy policy) {
Put p = new Put(getTableRowKey(tableName));
SpaceQuota quota = getProtoViolationPolicy(policy);
p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION, quota.toByteArray());
return p;
}
/**
* Creates a {@link Delete} to remove a policy on the given <code>table</code>.
*/
public static Delete createRemoveViolationPolicyUpdate(TableName tableName) {
Delete d = new Delete(getTableRowKey(tableName));
d.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
return d;
}
/* =========================================================================
* Quotas protobuf helpers
*/
@ -450,4 +523,17 @@ public class QuotaTableUtil {
protected static String getUserFromRowKey(final byte[] key) {
return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
}
protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
return SpaceQuota.newBuilder()
.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
.build();
}
protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
if (!proto.hasViolationPolicy()) {
throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
}
return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
}
}

View File

@ -135,8 +135,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier;
import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest;
import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierFactory;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -152,10 +153,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Addressing;
@ -904,7 +908,7 @@ public class HMaster extends HRegionServer implements MasterServices {
status.setStatus("Starting quota manager");
initQuotaManager();
this.spaceQuotaViolationNotifier = new SpaceQuotaViolationNotifierForTest();
this.spaceQuotaViolationNotifier = createQuotaViolationNotifier();
this.quotaObserverChore = new QuotaObserverChore(this);
// Start the chore to read the region FS space reports and act on them
getChoreService().scheduleChore(quotaObserverChore);
@ -995,6 +999,13 @@ public class HMaster extends HRegionServer implements MasterServices {
this.quotaManager = quotaManager;
}
SpaceQuotaViolationNotifier createQuotaViolationNotifier() {
SpaceQuotaViolationNotifier notifier =
SpaceQuotaViolationNotifierFactory.getInstance().create(getConfiguration());
notifier.initialize(getClusterConnection());
return notifier;
}
boolean isCatalogJanitorEnabled() {
return catalogJanitorChore != null ?
catalogJanitorChore.getEnabled() : false;
@ -2199,6 +2210,26 @@ public class HMaster extends HRegionServer implements MasterServices {
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
// Normally, it would make sense for this authorization check to exist inside
// AccessController, but because the authorization check is done based on internal state
// (rather than explicit permissions) we'll do the check here instead of in the
// coprocessor.
MasterQuotaManager quotaManager = getMasterQuotaManager();
if (null != quotaManager) {
if (quotaManager.isQuotaEnabled()) {
Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName);
if (null != quotaForTable && quotaForTable.hasSpace()) {
SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy();
if (SpaceViolationPolicy.DISABLE == policy) {
throw new AccessDeniedException("Enabling the table '" + tableName
+ "' is disallowed due to a violated space quota.");
}
}
} else if (LOG.isTraceEnabled()) {
LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
}
}
LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
// Execute the operation asynchronously - client will check the progress of the operation

View File

@ -352,14 +352,15 @@ public class QuotaObserverChore extends ScheduledChore {
/**
* Transitions the given table to violation of its quota, enabling the violation policy.
*/
private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) {
private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy)
throws IOException {
this.violationNotifier.transitionTableToViolation(table, violationPolicy);
}
/**
* Transitions the given table to observance of its quota, disabling the violation policy.
*/
private void transitionTableToObservance(TableName table) {
private void transitionTableToObservance(TableName table) throws IOException {
this.violationNotifier.transitionTableToObservance(table);
}

View File

@ -46,14 +46,14 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RegionServerQuotaManager {
private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
public class RegionServerRpcQuotaManager {
private static final Log LOG = LogFactory.getLog(RegionServerRpcQuotaManager.class);
private final RegionServerServices rsServices;
private QuotaCache quotaCache = null;
public RegionServerQuotaManager(final RegionServerServices rsServices) {
public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
this.rsServices = rsServices;
}
@ -63,7 +63,7 @@ public class RegionServerQuotaManager {
return;
}
LOG.info("Initializing quota support");
LOG.info("Initializing RPC quota support");
// Initialize quota cache
quotaCache = new QuotaCache(rsServices);

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A manager for filesystem space quotas in the RegionServer.
*
* This class is responsible for reading quota violation policies from the quota
* table and then enacting them on the given table.
*/
@InterfaceAudience.Private
public class RegionServerSpaceQuotaManager {
private static final Log LOG = LogFactory.getLog(RegionServerSpaceQuotaManager.class);
private final RegionServerServices rsServices;
private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher;
private Map<TableName,SpaceViolationPolicy> enforcedPolicies;
private boolean started = false;
public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
this.rsServices = Objects.requireNonNull(rsServices);
}
public synchronized void start() throws IOException {
if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
LOG.info("Quota support disabled, not starting space quota manager.");
return;
}
spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this);
enforcedPolicies = new HashMap<>();
started = true;
}
public synchronized void stop() {
if (null != spaceQuotaRefresher) {
spaceQuotaRefresher.cancel();
spaceQuotaRefresher = null;
}
started = false;
}
/**
* @return if the {@code Chore} has been started.
*/
public boolean isStarted() {
return started;
}
Connection getConnection() {
return rsServices.getConnection();
}
/**
* Returns the collection of tables which have quota violation policies enforced on
* this RegionServer.
*/
public synchronized Map<TableName,SpaceViolationPolicy> getActiveViolationPolicyEnforcements()
throws IOException {
return new HashMap<>(this.enforcedPolicies);
}
/**
* Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing.
*/
void extractViolationPolicy(Result result, Map<TableName,SpaceViolationPolicy> activePolicies) {
QuotaTableUtil.extractViolationPolicy(result, activePolicies);
}
/**
* Reads all quota violation policies which are to be enforced from the quota table.
*
* @return The collection of tables which are in violation of their quota and the policy which
* should be enforced.
*/
public Map<TableName, SpaceViolationPolicy> getViolationPoliciesToEnforce() throws IOException {
try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) {
Map<TableName,SpaceViolationPolicy> activePolicies = new HashMap<>();
for (Result result : scanner) {
try {
extractViolationPolicy(result, activePolicies);
} catch (IllegalArgumentException e) {
final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
LOG.error(msg, e);
throw new IOException(msg, e);
}
}
return activePolicies;
}
}
/**
* Enforces the given violationPolicy on the given table in this RegionServer.
*/
synchronized void enforceViolationPolicy(
TableName tableName, SpaceViolationPolicy violationPolicy) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Enabling violation policy enforcement on " + tableName
+ " with policy " + violationPolicy);
}
// Enact the policy
enforceOnRegionServer(tableName, violationPolicy);
// Publicize our enacting of the policy
enforcedPolicies.put(tableName, violationPolicy);
}
/**
* Enacts the given violation policy on this table in the RegionServer.
*/
void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) {
throw new UnsupportedOperationException("TODO");
}
/**
* Disables enforcement on any violation policy on the given <code>tableName</code>.
*/
synchronized void disableViolationPolicyEnforcement(TableName tableName) {
if (LOG.isTraceEnabled()) {
LOG.trace("Disabling violation policy enforcement on " + tableName);
}
disableOnRegionServer(tableName);
enforcedPolicies.remove(tableName);
}
/**
* Disables any violation policy on this table in the RegionServer.
*/
void disableOnRegionServer(TableName tableName) {
throw new UnsupportedOperationException("TODO");
}
RegionServerServices getRegionServerServices() {
return rsServices;
}
}

View File

@ -16,16 +16,25 @@
*/
package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
/**
* An interface which abstract away the action taken to enable or disable
* a space quota violation policy across the HBase cluster.
* a space quota violation policy across the HBase cluster. Implementations
* must have a no-args constructor.
*/
@InterfaceAudience.Private
public interface SpaceQuotaViolationNotifier {
/**
* Initializes the notifier.
*/
void initialize(Connection conn);
/**
* Instructs the cluster that the given table is in violation of a space quota. The
* provided violation policy is the action which should be taken on the table.
@ -33,12 +42,13 @@ public interface SpaceQuotaViolationNotifier {
* @param tableName The name of the table in violation of the quota.
* @param violationPolicy The policy which should be enacted on the table.
*/
void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy);
void transitionTableToViolation(
TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException;
/**
* Instructs the cluster that the given table is in observance of any applicable space quota.
*
* @param tableName The name of the table in observance.
*/
void transitionTableToObservance(TableName tableName);
void transitionTableToObservance(TableName tableName) throws IOException;
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations
* must have a no-args constructor.
*/
@InterfaceAudience.Private
public class SpaceQuotaViolationNotifierFactory {
private static final SpaceQuotaViolationNotifierFactory INSTANCE =
new SpaceQuotaViolationNotifierFactory();
public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl";
public static final Class<? extends SpaceQuotaViolationNotifier> VIOLATION_NOTIFIER_DEFAULT =
SpaceQuotaViolationNotifierForTest.class;
// Private
private SpaceQuotaViolationNotifierFactory() {}
public static SpaceQuotaViolationNotifierFactory getInstance() {
return INSTANCE;
}
/**
* Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the
* configuration provided.
*
* @param conf Configuration object
* @return The SpaceQuotaViolationNotifier implementation
* @throws IllegalArgumentException if the class could not be instantiated
*/
public SpaceQuotaViolationNotifier create(Configuration conf) {
Class<? extends SpaceQuotaViolationNotifier> clz = Objects.requireNonNull(conf)
.getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT,
SpaceQuotaViolationNotifier.class);
try {
return clz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("Failed to instantiate the implementation", e);
}
}
}

View File

@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
/**
* A SpaceQuotaViolationNotifier implementation for verifying testing.
@ -30,6 +31,9 @@ public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNo
private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>();
@Override
public void initialize(Connection conn) {}
@Override
public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) {
tablesInViolation.put(tableName, violationPolicy);

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A {@link ScheduledChore} which periodically updates a local copy of tables which have
* space quota violation policies enacted on them.
*/
@InterfaceAudience.Private
public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class);
static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
"hbase.regionserver.quotas.policy.refresher.chore.period";
static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
"hbase.regionserver.quotas.policy.refresher.chore.delay";
static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
"hbase.regionserver.quotas.policy.refresher.chore.timeunit";
static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
"hbase.regionserver.quotas.policy.refresher.report.percent";
static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
private final RegionServerSpaceQuotaManager manager;
public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) {
super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(),
manager.getRegionServerServices(),
getPeriod(manager.getRegionServerServices().getConfiguration()),
getInitialDelay(manager.getRegionServerServices().getConfiguration()),
getTimeUnit(manager.getRegionServerServices().getConfiguration()));
this.manager = manager;
}
@Override
protected void chore() {
// Tables with a policy currently enforced
final Map<TableName, SpaceViolationPolicy> activeViolationPolicies;
// Tables with policies that should be enforced
final Map<TableName, SpaceViolationPolicy> violationPolicies;
try {
// Tables with a policy currently enforced
activeViolationPolicies = manager.getActiveViolationPolicyEnforcements();
// Tables with policies that should be enforced
violationPolicies = manager.getViolationPoliciesToEnforce();
} catch (IOException e) {
LOG.warn("Failed to fetch enforced quota violation policies, will retry.", e);
return;
}
// Ensure each policy which should be enacted is enacted.
for (Entry<TableName, SpaceViolationPolicy> entry : violationPolicies.entrySet()) {
final TableName tableName = entry.getKey();
final SpaceViolationPolicy policyToEnforce = entry.getValue();
final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName);
if (currentPolicy != policyToEnforce) {
if (LOG.isTraceEnabled()) {
LOG.trace("Enabling " + policyToEnforce + " on " + tableName);
}
manager.enforceViolationPolicy(tableName, policyToEnforce);
}
}
// Remove policies which should no longer be enforced
Iterator<TableName> iter = activeViolationPolicies.keySet().iterator();
while (iter.hasNext()) {
final TableName localTableWithPolicy = iter.next();
if (!violationPolicies.containsKey(localTableWithPolicy)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Removing quota violation policy on " + localTableWithPolicy);
}
manager.disableViolationPolicyEnforcement(localTableWithPolicy);
iter.remove();
}
}
}
/**
* Extracts the period for the chore from the configuration.
*
* @param conf The configuration object.
* @return The configured chore period or the default value.
*/
static int getPeriod(Configuration conf) {
return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
}
/**
* Extracts the initial delay for the chore from the configuration.
*
* @param conf The configuration object.
* @return The configured chore initial delay or the default value.
*/
static long getInitialDelay(Configuration conf) {
return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
}
/**
* Extracts the time unit for the chore period and initial delay from the configuration. The
* configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
* a {@link TimeUnit} value.
*
* @param conf The configuration object.
* @return The configured time unit for the chore period and initial delay or the default value.
*/
static TimeUnit getTimeUnit(Configuration conf) {
return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
}
/**
* Extracts the percent of Regions for a table to have been reported to enable quota violation
* state change.
*
* @param conf The configuration object.
* @return The percent of regions reported to use.
*/
static Double getRegionReportPercent(Configuration conf) {
return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
/**
* A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table.
*/
public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier {
private Connection conn;
@Override
public void transitionTableToViolation(
TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException {
final Put p = QuotaTableUtil.createEnableViolationPolicyUpdate(tableName, violationPolicy);
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
quotaTable.put(p);
}
}
@Override
public void transitionTableToObservance(TableName tableName) throws IOException {
final Delete d = QuotaTableUtil.createRemoveViolationPolicyUpdate(tableName);
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
quotaTable.delete(d);
}
}
@Override
public void initialize(Connection conn) {
this.conn = conn;
}
}

View File

@ -119,7 +119,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
@ -477,7 +478,8 @@ public class HRegionServer extends HasThread implements
private RegionServerProcedureManagerHost rspmHost;
private RegionServerQuotaManager rsQuotaManager;
private RegionServerRpcQuotaManager rsQuotaManager;
private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
/**
* Nonce manager. Nonces are used to make operations like increment and append idempotent
@ -928,7 +930,8 @@ public class HRegionServer extends HasThread implements
}
// Setup the Quota Manager
rsQuotaManager = new RegionServerQuotaManager(this);
rsQuotaManager = new RegionServerRpcQuotaManager(this);
rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
this.fsUtilizationChore = new FileSystemUtilizationChore(this);
@ -1000,6 +1003,7 @@ public class HRegionServer extends HasThread implements
// Start the Quota Manager
rsQuotaManager.start(getRpcServer().getScheduler());
rsSpaceQuotaManager.start();
}
// We registered with the Master. Go into run mode.
@ -1091,6 +1095,10 @@ public class HRegionServer extends HasThread implements
if (rsQuotaManager != null) {
rsQuotaManager.stop();
}
if (rsSpaceQuotaManager != null) {
rsSpaceQuotaManager.stop();
rsSpaceQuotaManager = null;
}
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
if (rspmHost != null) {
@ -2882,7 +2890,7 @@ public class HRegionServer extends HasThread implements
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
return rsQuotaManager;
}
@ -3745,4 +3753,9 @@ public class HRegionServer extends HasThread implements
public void unassign(byte[] regionName) throws IOException {
clusterConnection.getAdmin().unassign(regionName, false);
}
@Override
public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
return this.rsSpaceQuotaManager;
}
}

View File

@ -90,7 +90,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@ -190,6 +190,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1305,8 +1306,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return regionServer.getConfiguration();
}
private RegionServerQuotaManager getQuotaManager() {
return regionServer.getRegionServerQuotaManager();
private RegionServerRpcQuotaManager getQuotaManager() {
return regionServer.getRegionServerRpcQuotaManager();
}
void start() {

View File

@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.zookeeper.KeeperException;
@ -78,15 +79,20 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
RegionServerAccounting getRegionServerAccounting();
/**
* @return RegionServer's instance of {@link RegionServerQuotaManager}
* @return RegionServer's instance of {@link RegionServerRpcQuotaManager}
*/
RegionServerQuotaManager getRegionServerQuotaManager();
RegionServerRpcQuotaManager getRegionServerRpcQuotaManager();
/**
* @return RegionServer's instance of {@link SecureBulkLoadManager}
*/
SecureBulkLoadManager getSecureBulkLoadManager();
/**
* @return RegionServer's instance of {@link RegionServerSpaceQuotaManager}
*/
RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager();
/**
* Context for postOpenDeployTasks().
*/

View File

@ -38,7 +38,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@ -189,7 +190,7 @@ public class MockRegionServerServices implements RegionServerServices {
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
return null;
}
@ -360,4 +361,9 @@ public class MockRegionServerServices implements RegionServerServices {
@Override
public void unassign(byte[] regionName) throws IOException {
}
@Override
public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
return null;
}
}

View File

@ -103,7 +103,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -333,7 +334,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
return null;
}
@ -728,4 +729,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
@Override
public void unassign(byte[] regionName) throws IOException {
}
@Override
public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
return null;
}
}

View File

@ -94,6 +94,8 @@ public class TestQuotaObserverChoreWithMiniCluster {
conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.setClass(SpaceQuotaViolationNotifierFactory.VIOLATION_NOTIFIER_KEY,
SpaceQuotaViolationNotifierForTest.class, SpaceQuotaViolationNotifier.class);
TEST_UTIL.startMiniCluster(1);
}

View File

@ -21,6 +21,10 @@ package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -28,6 +32,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
@ -50,6 +58,10 @@ public class TestQuotaTableUtil {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Connection connection;
private int tableNameCounter;
@Rule
public TestName testName = new TestName();
@Rule
public TestName name = new TestName();
@ -75,6 +87,7 @@ public class TestQuotaTableUtil {
@Before
public void before() throws IOException {
this.connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
this.tableNameCounter = 0;
}
@After
@ -184,4 +197,38 @@ public class TestQuotaTableUtil {
resQuotaNS = QuotaUtil.getUserQuota(this.connection, user, namespace);
assertEquals(null, resQuotaNS);
}
@Test
public void testSerDeViolationPolicies() throws Exception {
final TableName tn1 = getUniqueTableName();
final SpaceViolationPolicy policy1 = SpaceViolationPolicy.DISABLE;
final TableName tn2 = getUniqueTableName();
final SpaceViolationPolicy policy2 = SpaceViolationPolicy.NO_INSERTS;
final TableName tn3 = getUniqueTableName();
final SpaceViolationPolicy policy3 = SpaceViolationPolicy.NO_WRITES;
List<Put> puts = new ArrayList<>();
puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn1, policy1));
puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn2, policy2));
puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn3, policy3));
final Map<TableName,SpaceViolationPolicy> expectedPolicies = new HashMap<>();
expectedPolicies.put(tn1, policy1);
expectedPolicies.put(tn2, policy2);
expectedPolicies.put(tn3, policy3);
final Map<TableName,SpaceViolationPolicy> actualPolicies = new HashMap<>();
try (Table quotaTable = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
quotaTable.put(puts);
ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan());
for (Result r : scanner) {
QuotaTableUtil.extractViolationPolicy(r, actualPolicies);
}
scanner.close();
}
assertEquals(expectedPolicies, actualPolicies);
}
private TableName getUniqueTableName() {
return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++);
}
}

View File

@ -105,7 +105,7 @@ public class TestQuotaThrottle {
@After
public void tearDown() throws Exception {
for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
QuotaCache quotaCache = quotaManager.getQuotaCache();
quotaCache.getNamespaceQuotaCache().clear();
quotaCache.getTableQuotaCache().clear();
@ -557,7 +557,7 @@ public class TestQuotaThrottle {
boolean nsLimiter, final TableName... tables) throws Exception {
envEdge.incValue(2 * REFRESH_TIME);
for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
QuotaCache quotaCache = quotaManager.getQuotaCache();
quotaCache.triggerCacheRefresh();

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Test class for {@link RegionServerSpaceQuotaManager}.
*/
@Category(SmallTests.class)
public class TestRegionServerSpaceQuotaManager {
private RegionServerSpaceQuotaManager quotaManager;
private Connection conn;
private Table quotaTable;
private ResultScanner scanner;
@Before
@SuppressWarnings("unchecked")
public void setup() throws Exception {
quotaManager = mock(RegionServerSpaceQuotaManager.class);
conn = mock(Connection.class);
quotaTable = mock(Table.class);
scanner = mock(ResultScanner.class);
// Call the real getViolationPoliciesToEnforce()
when(quotaManager.getViolationPoliciesToEnforce()).thenCallRealMethod();
// Mock out creating a scanner
when(quotaManager.getConnection()).thenReturn(conn);
when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
// Mock out the static method call with some indirection
doAnswer(new Answer<Void>(){
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Result result = invocation.getArgumentAt(0, Result.class);
Map<TableName,SpaceViolationPolicy> policies = invocation.getArgumentAt(1, Map.class);
QuotaTableUtil.extractViolationPolicy(result, policies);
return null;
}
}).when(quotaManager).extractViolationPolicy(any(Result.class), any(Map.class));
}
@Test
public void testMissingAllColumns() {
List<Result> results = new ArrayList<>();
results.add(Result.create(Collections.emptyList()));
when(scanner.iterator()).thenReturn(results.iterator());
try {
quotaManager.getViolationPoliciesToEnforce();
fail("Expected an IOException, but did not receive one.");
} catch (IOException e) {
// Expected an error because we had no cells in the row.
// This should only happen due to programmer error.
}
}
@Test
public void testMissingDesiredColumn() {
List<Result> results = new ArrayList<>();
// Give a column that isn't the one we want
Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), toBytes("s"), new byte[0]);
results.add(Result.create(Collections.singletonList(c)));
when(scanner.iterator()).thenReturn(results.iterator());
try {
quotaManager.getViolationPoliciesToEnforce();
fail("Expected an IOException, but did not receive one.");
} catch (IOException e) {
// Expected an error because we were missing the column we expected in this row.
// This should only happen due to programmer error.
}
}
@Test
public void testParsingError() {
List<Result> results = new ArrayList<>();
Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), toBytes("v"), new byte[0]);
results.add(Result.create(Collections.singletonList(c)));
when(scanner.iterator()).thenReturn(results.iterator());
try {
quotaManager.getViolationPoliciesToEnforce();
fail("Expected an IOException, but did not receive one.");
} catch (IOException e) {
// We provided a garbage serialized protobuf message (empty byte array), this should
// in turn throw an IOException
}
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test class for {@link SpaceQuotaViolationPolicyRefresherChore}.
*/
@Category(SmallTests.class)
public class TestSpaceQuotaViolationPolicyRefresherChore {
private RegionServerSpaceQuotaManager manager;
private RegionServerServices rss;
private SpaceQuotaViolationPolicyRefresherChore chore;
private Configuration conf;
@Before
public void setup() {
conf = HBaseConfiguration.create();
rss = mock(RegionServerServices.class);
manager = mock(RegionServerSpaceQuotaManager.class);
when(manager.getRegionServerServices()).thenReturn(rss);
when(rss.getConfiguration()).thenReturn(conf);
chore = new SpaceQuotaViolationPolicyRefresherChore(manager);
}
@Test
public void testPoliciesAreEnforced() throws IOException {
final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
policiesToEnforce.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
// No active enforcements
when(manager.getActiveViolationPolicyEnforcements()).thenReturn(Collections.emptyMap());
// Policies to enforce
when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
chore.chore();
for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
// Ensure we enforce the policy
verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
// Don't disable any policies
verify(manager, never()).disableViolationPolicyEnforcement(entry.getKey());
}
}
@Test
public void testOldPoliciesAreRemoved() throws IOException {
final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>();
previousPolicies.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
previousPolicies.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES);
// No active enforcements
when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
// Policies to enforce
when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
chore.chore();
for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
}
for (Entry<TableName,SpaceViolationPolicy> entry : previousPolicies.entrySet()) {
verify(manager).disableViolationPolicyEnforcement(entry.getKey());
}
}
@Test
public void testNewPolicyOverridesOld() throws IOException {
final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_WRITES);
policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_INSERTS);
final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>();
previousPolicies.put(TableName.valueOf("table1"), SpaceViolationPolicy.NO_WRITES);
// No active enforcements
when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
// Policies to enforce
when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
chore.chore();
for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
}
verify(manager, never()).disableViolationPolicyEnforcement(TableName.valueOf("table1"));
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentMatcher;
/**
* Test case for {@link TableSpaceQuotaViolationNotifier}.
*/
@Category(SmallTests.class)
public class TestTableSpaceQuotaViolationNotifier {
private TableSpaceQuotaViolationNotifier notifier;
private Connection conn;
@Before
public void setup() throws Exception {
notifier = new TableSpaceQuotaViolationNotifier();
conn = mock(Connection.class);
notifier.initialize(conn);
}
@Test
public void testToViolation() throws Exception {
final TableName tn = TableName.valueOf("inviolation");
final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS;
final Table quotaTable = mock(Table.class);
when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
final Put expectedPut = new Put(Bytes.toBytes("t." + tn.getNameAsString()));
final SpaceQuota protoQuota = SpaceQuota.newBuilder()
.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
.build();
expectedPut.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v"), protoQuota.toByteArray());
notifier.transitionTableToViolation(tn, policy);
verify(quotaTable).put(argThat(new SingleCellPutMatcher(expectedPut)));
}
@Test
public void testToObservance() throws Exception {
final TableName tn = TableName.valueOf("notinviolation");
final Table quotaTable = mock(Table.class);
when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
final Delete expectedDelete = new Delete(Bytes.toBytes("t." + tn.getNameAsString()));
expectedDelete.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v"));
notifier.transitionTableToObservance(tn);
verify(quotaTable).delete(argThat(new SingleCellDeleteMatcher(expectedDelete)));
}
/**
* Parameterized for Puts.
*/
private static class SingleCellPutMatcher extends SingleCellMutationMatcher<Put> {
private SingleCellPutMatcher(Put expected) {
super(expected);
}
}
/**
* Parameterized for Deletes.
*/
private static class SingleCellDeleteMatcher extends SingleCellMutationMatcher<Delete> {
private SingleCellDeleteMatcher(Delete expected) {
super(expected);
}
}
/**
* Quick hack to verify a Mutation with one column.
*/
private static class SingleCellMutationMatcher<T> extends ArgumentMatcher<T> {
private final Mutation expected;
private SingleCellMutationMatcher(Mutation expected) {
this.expected = expected;
}
@Override
public boolean matches(Object argument) {
if (!expected.getClass().isAssignableFrom(argument.getClass())) {
return false;
}
Mutation actual = (Mutation) argument;
if (!Arrays.equals(expected.getRow(), actual.getRow())) {
return false;
}
if (expected.size() != actual.size()) {
return false;
}
NavigableMap<byte[],List<Cell>> expectedCells = expected.getFamilyCellMap();
NavigableMap<byte[],List<Cell>> actualCells = actual.getFamilyCellMap();
Entry<byte[],List<Cell>> expectedEntry = expectedCells.entrySet().iterator().next();
Entry<byte[],List<Cell>> actualEntry = actualCells.entrySet().iterator().next();
if (!Arrays.equals(expectedEntry.getKey(), actualEntry.getKey())) {
return false;
}
return Objects.equals(expectedEntry.getValue(), actualEntry.getValue());
}
}
}