HBASE-20662 Increasing space quota on a violated table does not remove SpaceViolationPolicy.DISABLE enforcement
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
c6e0826679
commit
7094987311
|
@ -274,6 +274,18 @@ public class QuotaTableUtil {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a {@link Get} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
|
||||||
|
* specific table.
|
||||||
|
* @param tn table name to get from. Can't be null.
|
||||||
|
*/
|
||||||
|
public static Get makeQuotaSnapshotGetForTable(TableName tn) {
|
||||||
|
Get g = new Get(getTableRowKey(tn));
|
||||||
|
// Limit to "u:v" column
|
||||||
|
g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
|
||||||
|
return g;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
|
* Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
|
||||||
* {@link Result} and adds them to the given {@link Map}. If the result does not contain
|
* {@link Result} and adds them to the given {@link Map}. If the result does not contain
|
||||||
|
@ -286,7 +298,7 @@ public class QuotaTableUtil {
|
||||||
public static void extractQuotaSnapshot(
|
public static void extractQuotaSnapshot(
|
||||||
Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
|
Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
|
||||||
byte[] row = Objects.requireNonNull(result).getRow();
|
byte[] row = Objects.requireNonNull(result).getRow();
|
||||||
if (row == null) {
|
if (row == null || row.length == 0) {
|
||||||
throw new IllegalArgumentException("Provided result had a null row");
|
throw new IllegalArgumentException("Provided result had a null row");
|
||||||
}
|
}
|
||||||
final TableName targetTableName = getTableFromRowKey(row);
|
final TableName targetTableName = getTableFromRowKey(row);
|
||||||
|
@ -642,6 +654,28 @@ public class QuotaTableUtil {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current space quota snapshot of the given {@code tableName} from
|
||||||
|
* {@code QuotaTableUtil.QUOTA_TABLE_NAME} or null if the no quota information is available for
|
||||||
|
* that tableName.
|
||||||
|
* @param conn connection to re-use
|
||||||
|
* @param tableName name of the table whose current snapshot is to be retreived
|
||||||
|
*/
|
||||||
|
public static SpaceQuotaSnapshot getCurrentSnapshotFromQuotaTable(Connection conn,
|
||||||
|
TableName tableName) throws IOException {
|
||||||
|
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
|
||||||
|
Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(1);
|
||||||
|
Result result = quotaTable.get(makeQuotaSnapshotGetForTable(tableName));
|
||||||
|
// if we don't have any row corresponding to this get, return null
|
||||||
|
if (result.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// otherwise, extract quota snapshot in snapshots object
|
||||||
|
extractQuotaSnapshot(result, snapshots);
|
||||||
|
return snapshots.get(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* =========================================================================
|
/* =========================================================================
|
||||||
* Quotas protobuf helpers
|
* Quotas protobuf helpers
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -159,10 +159,14 @@ import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||||
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
||||||
import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
|
import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
|
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
|
||||||
|
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
||||||
import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
|
import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
|
||||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
|
||||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
@ -218,8 +222,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||||
|
|
||||||
|
@ -2483,10 +2485,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
MasterQuotaManager quotaManager = getMasterQuotaManager();
|
MasterQuotaManager quotaManager = getMasterQuotaManager();
|
||||||
if (quotaManager != null) {
|
if (quotaManager != null) {
|
||||||
if (quotaManager.isQuotaInitialized()) {
|
if (quotaManager.isQuotaInitialized()) {
|
||||||
Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName);
|
SpaceQuotaSnapshot currSnapshotOfTable =
|
||||||
if (quotaForTable != null && quotaForTable.hasSpace()) {
|
QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName);
|
||||||
SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy();
|
if (currSnapshotOfTable != null) {
|
||||||
if (SpaceViolationPolicy.DISABLE == policy) {
|
SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
|
||||||
|
if (quotaStatus.isInViolation()
|
||||||
|
&& SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy()) {
|
||||||
throw new AccessDeniedException("Enabling the table '" + tableName
|
throw new AccessDeniedException("Enabling the table '" + tableName
|
||||||
+ "' is disallowed due to a violated space quota.");
|
+ "' is disallowed due to a violated space quota.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
|
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
@ -247,7 +248,16 @@ public class MasterQuotaManager implements RegionStateListener {
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public void delete() throws IOException {
|
public void delete() throws IOException {
|
||||||
|
SpaceQuotaSnapshot currSnapshotOfTable =
|
||||||
|
QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table);
|
||||||
QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
|
QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
|
||||||
|
if (currSnapshotOfTable != null) {
|
||||||
|
SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
|
||||||
|
if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy()
|
||||||
|
&& quotaStatus.isInViolation()) {
|
||||||
|
QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||||
|
|
|
@ -188,7 +188,8 @@ public class QuotaObserverChore extends ScheduledChore {
|
||||||
|
|
||||||
for (TableName tableInLimbo : tablesInLimbo) {
|
for (TableName tableInLimbo : tablesInLimbo) {
|
||||||
final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
|
final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
|
||||||
if (currentSnapshot.getQuotaStatus().isInViolation()) {
|
SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
|
||||||
|
if (currentStatus.isInViolation()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
|
LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
|
||||||
+ " reported than required.");
|
+ " reported than required.");
|
||||||
|
@ -199,6 +200,10 @@ public class QuotaObserverChore extends ScheduledChore {
|
||||||
this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
|
this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
|
||||||
// Update it in the Table QuotaStore so that memory is consistent with no violation.
|
// Update it in the Table QuotaStore so that memory is consistent with no violation.
|
||||||
tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
|
tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
|
||||||
|
// In case of Disable SVP, we need to enable the table as it moves out of violation
|
||||||
|
if (SpaceViolationPolicy.DISABLE == currentStatus.getPolicy()) {
|
||||||
|
QuotaUtil.enableTableIfNotEnabled(conn, tableInLimbo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,20 +329,35 @@ public class QuotaObserverChore extends ScheduledChore {
|
||||||
|
|
||||||
// If we're changing something, log it.
|
// If we're changing something, log it.
|
||||||
if (!currentSnapshot.equals(targetSnapshot)) {
|
if (!currentSnapshot.equals(targetSnapshot)) {
|
||||||
// If the target is none, we're moving out of violation. Update the hbase:quota table
|
|
||||||
if (!targetStatus.isInViolation()) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(table + " moving into observance of table space quota.");
|
|
||||||
}
|
|
||||||
} else if (LOG.isDebugEnabled()) {
|
|
||||||
// We're either moving into violation or changing violation policies
|
|
||||||
LOG.debug(table + " moving into violation of table space quota with policy of "
|
|
||||||
+ targetStatus.getPolicy());
|
|
||||||
}
|
|
||||||
|
|
||||||
this.snapshotNotifier.transitionTable(table, targetSnapshot);
|
this.snapshotNotifier.transitionTable(table, targetSnapshot);
|
||||||
// Update it in memory
|
// Update it in memory
|
||||||
tableSnapshotStore.setCurrentState(table, targetSnapshot);
|
tableSnapshotStore.setCurrentState(table, targetSnapshot);
|
||||||
|
|
||||||
|
// If the target is none, we're moving out of violation. Update the hbase:quota table
|
||||||
|
SpaceViolationPolicy currPolicy = currentStatus.getPolicy();
|
||||||
|
SpaceViolationPolicy targetPolicy = targetStatus.getPolicy();
|
||||||
|
if (!targetStatus.isInViolation()) {
|
||||||
|
// In case of Disable SVP, we need to enable the table as it moves out of violation
|
||||||
|
if (isDisableSpaceViolationPolicy(currPolicy, targetPolicy)) {
|
||||||
|
QuotaUtil.enableTableIfNotEnabled(conn, table);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(table + " moved into observance of table space quota.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We're either moving into violation or changing violation policies
|
||||||
|
if (currPolicy != targetPolicy && SpaceViolationPolicy.DISABLE == currPolicy) {
|
||||||
|
// In case of policy switch, we need to enable the table if current policy is Disable SVP
|
||||||
|
QuotaUtil.enableTableIfNotEnabled(conn, table);
|
||||||
|
} else if (SpaceViolationPolicy.DISABLE == targetPolicy) {
|
||||||
|
// In case of Disable SVP, we need to disable the table as it moves into violation
|
||||||
|
QuotaUtil.disableTableIfNotDisabled(conn, table);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
table + " moved into violation of table space quota with policy of " + targetPolicy);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else if (LOG.isTraceEnabled()) {
|
} else if (LOG.isTraceEnabled()) {
|
||||||
// Policies are the same, so we have nothing to do except log this. Don't need to re-update
|
// Policies are the same, so we have nothing to do except log this. Don't need to re-update
|
||||||
// the quota table
|
// the quota table
|
||||||
|
@ -349,6 +369,19 @@ public class QuotaObserverChore extends ScheduledChore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to check whether we are dealing with DISABLE {@link SpaceViolationPolicy}. In such a
|
||||||
|
* case, currPolicy or/and targetPolicy will be having DISABLE policy.
|
||||||
|
* @param currPolicy currently set space violation policy
|
||||||
|
* @param targetPolicy new space violation policy
|
||||||
|
* @return true if is DISABLE space violation policy; otherwise false
|
||||||
|
*/
|
||||||
|
private boolean isDisableSpaceViolationPolicy(final SpaceViolationPolicy currPolicy,
|
||||||
|
final SpaceViolationPolicy targetPolicy) {
|
||||||
|
return SpaceViolationPolicy.DISABLE == currPolicy
|
||||||
|
|| SpaceViolationPolicy.DISABLE == targetPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
|
* Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
|
||||||
* if necessary.
|
* if necessary.
|
||||||
|
@ -363,7 +396,7 @@ public class QuotaObserverChore extends ScheduledChore {
|
||||||
final Multimap<String,TableName> tablesByNamespace) throws IOException {
|
final Multimap<String,TableName> tablesByNamespace) throws IOException {
|
||||||
final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
|
final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
|
||||||
|
|
||||||
// When the policies differ, we need to move into or out of violatino
|
// When the policies differ, we need to move into or out of violation
|
||||||
if (!currentSnapshot.equals(targetSnapshot)) {
|
if (!currentSnapshot.equals(targetSnapshot)) {
|
||||||
// We want to have a policy of "NONE", moving out of violation
|
// We want to have a policy of "NONE", moving out of violation
|
||||||
if (!targetStatus.isInViolation()) {
|
if (!targetStatus.isInViolation()) {
|
||||||
|
|
|
@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||||
|
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||||
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -308,4 +311,35 @@ public class QuotaUtil extends QuotaTableUtil {
|
||||||
}
|
}
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to enable a table, if not already enabled. This method suppresses
|
||||||
|
* {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
|
||||||
|
* the table.
|
||||||
|
* @param conn connection to re-use
|
||||||
|
* @param tableName name of the table to be enabled
|
||||||
|
*/
|
||||||
|
public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
conn.getAdmin().enableTable(tableName);
|
||||||
|
} catch (TableNotDisabledException | TableNotFoundException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to disable a table, if not already disabled. This method suppresses
|
||||||
|
* {@link TableNotEnabledException}, if thrown while disabling the table.
|
||||||
|
* @param conn connection to re-use
|
||||||
|
* @param tableName table name which has moved into space quota violation
|
||||||
|
*/
|
||||||
|
public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
conn.getAdmin().disableTable(tableName);
|
||||||
|
} catch (TableNotEnabledException | TableNotFoundException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,18 +96,26 @@ public class SpaceQuotaRefresherChore extends ScheduledChore {
|
||||||
LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
|
LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
|
||||||
}
|
}
|
||||||
if (!newSnapshot.equals(currentSnapshot)) {
|
if (!newSnapshot.equals(currentSnapshot)) {
|
||||||
// We have a new snapshot. We might need to enforce it or disable the enforcement
|
// We have a new snapshot.
|
||||||
if (!isInViolation(currentSnapshot) && newSnapshot.getQuotaStatus().isInViolation()) {
|
// We might need to enforce it or disable the enforcement or switch policy
|
||||||
|
boolean currInViolation = isInViolation(currentSnapshot);
|
||||||
|
boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation();
|
||||||
|
if (!currInViolation && newInViolation) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Enabling " + newSnapshot + " on " + tableName);
|
LOG.trace("Enabling " + newSnapshot + " on " + tableName);
|
||||||
}
|
}
|
||||||
getManager().enforceViolationPolicy(tableName, newSnapshot);
|
getManager().enforceViolationPolicy(tableName, newSnapshot);
|
||||||
}
|
} else if (currInViolation && !newInViolation) {
|
||||||
if (isInViolation(currentSnapshot) && !newSnapshot.getQuotaStatus().isInViolation()) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Removing quota violation policy on " + tableName);
|
LOG.trace("Removing quota violation policy on " + tableName);
|
||||||
}
|
}
|
||||||
getManager().disableViolationPolicyEnforcement(tableName);
|
getManager().disableViolationPolicyEnforcement(tableName);
|
||||||
|
} else if (currInViolation && newInViolation) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Switching quota violation policy on " + tableName + " from "
|
||||||
|
+ currentSnapshot + " to " + newSnapshot);
|
||||||
|
}
|
||||||
|
getManager().enforceViolationPolicy(tableName, newSnapshot);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,55 +18,29 @@ package org.apache.hadoop.hbase.quotas.policies;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
|
||||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
|
import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
|
||||||
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
|
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
|
||||||
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
|
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement
|
* A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement counterpart
|
||||||
* counterpart to {@link SpaceViolationPolicy#DISABLE}.
|
* to {@link SpaceViolationPolicy#DISABLE}. This violation policy is different from others as it
|
||||||
|
* doesn't take action (i.e. enable/disable table) local to the RegionServer, like the other
|
||||||
|
* ViolationPolicies do. In case of violation, the appropriate action is initiated by the master.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DisableTableViolationPolicyEnforcement extends DefaultViolationPolicyEnforcement {
|
public class DisableTableViolationPolicyEnforcement extends DefaultViolationPolicyEnforcement {
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(DisableTableViolationPolicyEnforcement.class);
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void enable() throws IOException {
|
public void enable() throws IOException {
|
||||||
try {
|
// do nothing
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Starting disable of " + getTableName());
|
|
||||||
}
|
|
||||||
getRegionServerServices().getClusterConnection().getAdmin().disableTable(getTableName());
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Disable is complete for " + getTableName());
|
|
||||||
}
|
|
||||||
} catch (TableNotEnabledException tnee) {
|
|
||||||
// The state we wanted it to be in.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disable() throws IOException {
|
public void disable() throws IOException {
|
||||||
try {
|
// do nothing
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Starting enable of " + getTableName());
|
|
||||||
}
|
|
||||||
getRegionServerServices().getClusterConnection().getAdmin().enableTable(getTableName());
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Enable is complete for " + getTableName());
|
|
||||||
}
|
|
||||||
} catch (TableNotDisabledException | TableNotFoundException e) {
|
|
||||||
// The state we wanted it to be in
|
|
||||||
// Or, in case table is not found, nothing to do
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -388,7 +388,7 @@ public class TestSpaceQuotas {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetQuotaAndThenDropTableeWithNoWritesCompactions() throws Exception {
|
public void testSetQuotaAndThenDropTableWithNoWritesCompactions() throws Exception {
|
||||||
setQuotaAndThenDropTable(SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
|
setQuotaAndThenDropTable(SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,6 +412,16 @@ public class TestSpaceQuotas {
|
||||||
setQuotaAndThenIncreaseQuota(SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
|
setQuotaAndThenIncreaseQuota(SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetQuotaAndThenIncreaseQuotaWithDisable() throws Exception {
|
||||||
|
setQuotaAndThenIncreaseQuota(SpaceViolationPolicy.DISABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetQuotaAndThenDisableIncrEnableWithDisable() throws Exception {
|
||||||
|
setQuotaNextDisableThenIncreaseFinallyEnable(SpaceViolationPolicy.DISABLE);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetQuotaAndThenRemoveInOneWithNoInserts() throws Exception {
|
public void testSetQuotaAndThenRemoveInOneWithNoInserts() throws Exception {
|
||||||
setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy.NO_INSERTS);
|
setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy.NO_INSERTS);
|
||||||
|
@ -432,6 +442,36 @@ public class TestSpaceQuotas {
|
||||||
setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy.DISABLE);
|
setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy.DISABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetQuotaFirstWithDisableNextNoWrites() throws Exception {
|
||||||
|
setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
|
||||||
|
SpaceViolationPolicy.NO_WRITES);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetQuotaFirstWithDisableNextAgainDisable() throws Exception {
|
||||||
|
setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
|
||||||
|
SpaceViolationPolicy.DISABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetQuotaFirstWithDisableNextNoInserts() throws Exception {
|
||||||
|
setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
|
||||||
|
SpaceViolationPolicy.NO_INSERTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetQuotaFirstWithDisableNextNoWritesCompaction() throws Exception {
|
||||||
|
setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
|
||||||
|
SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetQuotaFirstWithNoWritesNextWithDisable() throws Exception {
|
||||||
|
setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.NO_WRITES,
|
||||||
|
SpaceViolationPolicy.DISABLE);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetQuotaOnNonExistingTableWithNoInserts() throws Exception {
|
public void testSetQuotaOnNonExistingTableWithNoInserts() throws Exception {
|
||||||
setQuotaLimit(NON_EXISTENT_TABLE, SpaceViolationPolicy.NO_INSERTS, 2L);
|
setQuotaLimit(NON_EXISTENT_TABLE, SpaceViolationPolicy.NO_INSERTS, 2L);
|
||||||
|
@ -452,6 +492,26 @@ public class TestSpaceQuotas {
|
||||||
setQuotaLimit(NON_EXISTENT_TABLE, SpaceViolationPolicy.DISABLE, 2L);
|
setQuotaLimit(NON_EXISTENT_TABLE, SpaceViolationPolicy.DISABLE, 2L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy policy1,
|
||||||
|
SpaceViolationPolicy policy2) throws Exception {
|
||||||
|
Put put = new Put(Bytes.toBytes("to_reject"));
|
||||||
|
put.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
|
||||||
|
Bytes.toBytes("reject"));
|
||||||
|
|
||||||
|
// Do puts until we violate space violation policy1
|
||||||
|
final TableName tn = writeUntilViolationAndVerifyViolation(policy1, put);
|
||||||
|
|
||||||
|
// Now, change violation policy to policy2
|
||||||
|
setQuotaLimit(tn, policy2, 2L);
|
||||||
|
|
||||||
|
// The table should be in enabled state on changing violation policy
|
||||||
|
if (policy1.equals(SpaceViolationPolicy.DISABLE) && !policy1.equals(policy2)) {
|
||||||
|
TEST_UTIL.waitTableEnabled(tn, 20000);
|
||||||
|
}
|
||||||
|
// Put some row now: should still violate as quota limit still violated
|
||||||
|
verifyViolation(policy2, tn, put);
|
||||||
|
}
|
||||||
|
|
||||||
private void setQuotaAndThenRemove(SpaceViolationPolicy policy) throws Exception {
|
private void setQuotaAndThenRemove(SpaceViolationPolicy policy) throws Exception {
|
||||||
Put put = new Put(Bytes.toBytes("to_reject"));
|
Put put = new Put(Bytes.toBytes("to_reject"));
|
||||||
put.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
|
put.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
|
||||||
|
@ -502,6 +562,34 @@ public class TestSpaceQuotas {
|
||||||
verifyNoViolation(policy, tn, put);
|
verifyNoViolation(policy, tn, put);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setQuotaNextDisableThenIncreaseFinallyEnable(SpaceViolationPolicy policy)
|
||||||
|
throws Exception {
|
||||||
|
Put put = new Put(Bytes.toBytes("to_reject"));
|
||||||
|
put.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
|
||||||
|
Bytes.toBytes("reject"));
|
||||||
|
|
||||||
|
// Do puts until we violate space policy
|
||||||
|
final TableName tn = writeUntilViolationAndVerifyViolation(policy, put);
|
||||||
|
|
||||||
|
// Disable the table; in case of SpaceViolationPolicy.DISABLE already disabled
|
||||||
|
if (!policy.equals(SpaceViolationPolicy.DISABLE)) {
|
||||||
|
TEST_UTIL.getAdmin().disableTable(tn);
|
||||||
|
TEST_UTIL.waitTableDisabled(tn, 10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, increase limit and perform put
|
||||||
|
setQuotaLimit(tn, policy, 4L);
|
||||||
|
|
||||||
|
// in case of disable policy quota manager will enable it
|
||||||
|
if (!policy.equals(SpaceViolationPolicy.DISABLE)) {
|
||||||
|
TEST_UTIL.getAdmin().enableTable(tn);
|
||||||
|
}
|
||||||
|
TEST_UTIL.waitTableEnabled(tn, 10000);
|
||||||
|
|
||||||
|
// Put some row now: should not violate as quota limit increased
|
||||||
|
verifyNoViolation(policy, tn, put);
|
||||||
|
}
|
||||||
|
|
||||||
public void setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy policy)
|
public void setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy policy)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Put put = new Put(Bytes.toBytes("to_reject"));
|
Put put = new Put(Bytes.toBytes("to_reject"));
|
||||||
|
@ -572,6 +660,7 @@ public class TestSpaceQuotas {
|
||||||
SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) throws Exception {
|
SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) throws Exception {
|
||||||
// But let's try a few times to get the exception before failing
|
// But let's try a few times to get the exception before failing
|
||||||
boolean sawError = false;
|
boolean sawError = false;
|
||||||
|
String msg = "";
|
||||||
for (int i = 0; i < NUM_RETRIES && !sawError; i++) {
|
for (int i = 0; i < NUM_RETRIES && !sawError; i++) {
|
||||||
try (Table table = TEST_UTIL.getConnection().getTable(tn)) {
|
try (Table table = TEST_UTIL.getConnection().getTable(tn)) {
|
||||||
if (m instanceof Put) {
|
if (m instanceof Put) {
|
||||||
|
@ -590,15 +679,16 @@ public class TestSpaceQuotas {
|
||||||
LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry");
|
LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry");
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String msg = StringUtils.stringifyException(e);
|
msg = StringUtils.stringifyException(e);
|
||||||
if (policyToViolate.equals(SpaceViolationPolicy.DISABLE)) {
|
if ((policyToViolate.equals(SpaceViolationPolicy.DISABLE)
|
||||||
assertTrue(e instanceof TableNotEnabledException);
|
&& e instanceof TableNotEnabledException) || msg.contains(policyToViolate.name())) {
|
||||||
} else {
|
LOG.info("Got the expected exception={}", msg);
|
||||||
assertTrue("Expected exception message to contain the word '" + policyToViolate.name()
|
|
||||||
+ "', but was " + msg,
|
|
||||||
msg.contains(policyToViolate.name()));
|
|
||||||
}
|
|
||||||
sawError = true;
|
sawError = true;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
LOG.info("Did not get the expected exception, will sleep and retry");
|
||||||
|
Thread.sleep(2000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!sawError) {
|
if (!sawError) {
|
||||||
|
@ -611,6 +701,15 @@ public class TestSpaceQuotas {
|
||||||
}
|
}
|
||||||
scanner.close();
|
scanner.close();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (policyToViolate.equals(SpaceViolationPolicy.DISABLE)) {
|
||||||
|
assertTrue(
|
||||||
|
msg.contains("TableNotEnabledException") || msg.contains(policyToViolate.name()));
|
||||||
|
} else {
|
||||||
|
assertTrue("Expected exception message to contain the word '" + policyToViolate.name()
|
||||||
|
+ "', but was " + msg,
|
||||||
|
msg.contains(policyToViolate.name()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
assertTrue(
|
assertTrue(
|
||||||
"Expected to see an exception writing data to a table exceeding its quota", sawError);
|
"Expected to see an exception writing data to a table exceeding its quota", sawError);
|
||||||
|
|
Loading…
Reference in New Issue