HBASE-17516 Correctly handle case where table and NS quotas both apply
The logic surrounding when a table and namespace quota both apply to a table was incorrect, leading to a case where a table quota violation which should have fired did not because of the less-strict namespace quota.
This commit is contained in:
parent
80a1f8fa2a
commit
f031b69969
|
@ -287,7 +287,8 @@ public class QuotaObserverChore extends ScheduledChore {
|
|||
// We want to have a policy of "NONE", moving out of violation
|
||||
if (!targetStatus.isInViolation()) {
|
||||
for (TableName tableInNS : tablesByNamespace.get(namespace)) {
|
||||
if (!tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
|
||||
// If there is a quota on this table in violation
|
||||
if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
|
||||
// Table-level quota violation policy is being applied here.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Not activating Namespace violation policy because a Table violation"
|
||||
|
@ -298,16 +299,21 @@ public class QuotaObserverChore extends ScheduledChore {
|
|||
this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
|
||||
}
|
||||
}
|
||||
// We want to move into violation at the NS level
|
||||
} else {
|
||||
// Moving tables in the namespace into violation or to a different violation policy
|
||||
for (TableName tableInNS : tablesByNamespace.get(namespace)) {
|
||||
if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
|
||||
final SpaceQuotaSnapshot tableQuotaSnapshot =
|
||||
tableSnapshotStore.getCurrentState(tableInNS);
|
||||
final boolean hasTableQuota = QuotaSnapshotStore.NO_QUOTA != tableQuotaSnapshot;
|
||||
if (hasTableQuota && tableQuotaSnapshot.getQuotaStatus().isInViolation()) {
|
||||
// Table-level quota violation policy is being applied here.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Not activating Namespace violation policy because a Table violation"
|
||||
+ " policy is already in effect for " + tableInNS);
|
||||
}
|
||||
} else {
|
||||
// No table quota present or a table quota present that is not in violation
|
||||
LOG.info(tableInNS + " moving into violation of namespace space quota with policy " + targetStatus.getPolicy());
|
||||
this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
|
||||
}
|
||||
|
|
|
@ -193,40 +193,42 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
|||
|
||||
helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
|
||||
admin.flush(tn1);
|
||||
Map<TableName,SpaceQuotaSnapshot> violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// Check a few times to make sure we don't prematurely move to violation
|
||||
assertEquals("Should not see any quota violations after writing 2MB of data", 0, violatedQuotas.size());
|
||||
assertEquals(
|
||||
"Should not see any quota violations after writing 2MB of data", 0,
|
||||
numSnapshotsInViolation(snapshots));
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping." , e);
|
||||
}
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
}
|
||||
|
||||
helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
|
||||
admin.flush(tn2);
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// Check a few times to make sure we don't prematurely move to violation
|
||||
assertEquals("Should not see any quota violations after writing 4MB of data", 0,
|
||||
violatedQuotas.size());
|
||||
numSnapshotsInViolation(snapshots));
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping." , e);
|
||||
}
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
}
|
||||
|
||||
// Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total)
|
||||
// and should push all three tables in the namespace into violation.
|
||||
helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
|
||||
admin.flush(tn3);
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
while (violatedQuotas.size() < 3) {
|
||||
LOG.debug("Saw fewer violations than desired (expected 3): " + violatedQuotas
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
while (numSnapshotsInViolation(snapshots) < 3) {
|
||||
LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots
|
||||
+ ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
|
@ -234,19 +236,19 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
|||
LOG.debug("Interrupted while sleeping.", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
}
|
||||
|
||||
SpaceQuotaSnapshot snapshot1 = violatedQuotas.remove(tn1);
|
||||
SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
|
||||
assertNotNull("tn1 should be in violation", snapshot1);
|
||||
assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy());
|
||||
SpaceQuotaSnapshot snapshot2 = violatedQuotas.remove(tn2);
|
||||
SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
|
||||
assertNotNull("tn2 should be in violation", snapshot2);
|
||||
assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy());
|
||||
SpaceQuotaSnapshot snapshot3 = violatedQuotas.remove(tn3);
|
||||
SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
|
||||
assertNotNull("tn3 should be in violation", snapshot3);
|
||||
assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy());
|
||||
assertTrue("Unexpected additional quota violations: " + violatedQuotas, violatedQuotas.isEmpty());
|
||||
assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -272,24 +274,24 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
|||
|
||||
helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
|
||||
admin.flush(tn1);
|
||||
Map<TableName,SpaceQuotaSnapshot> violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// Check a few times to make sure we don't prematurely move to violation
|
||||
assertEquals("Should not see any quota violations after writing 2MB of data", 0,
|
||||
violatedQuotas.size());
|
||||
assertEquals("Should not see any quota violations after writing 2MB of data: " + snapshots, 0,
|
||||
numSnapshotsInViolation(snapshots));
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while sleeping." , e);
|
||||
}
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
}
|
||||
|
||||
helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
|
||||
admin.flush(tn2);
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
while (violatedQuotas.size() < 2) {
|
||||
LOG.debug("Saw fewer violations than desired (expected 2): " + violatedQuotas
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
while (numSnapshotsInViolation(snapshots) < 2) {
|
||||
LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots
|
||||
+ ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
|
||||
try {
|
||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||
|
@ -297,13 +299,13 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
|||
LOG.debug("Interrupted while sleeping.", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
}
|
||||
|
||||
SpaceQuotaSnapshot actualPolicyTN1 = violatedQuotas.get(tn1);
|
||||
SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1);
|
||||
assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
|
||||
assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy());
|
||||
SpaceQuotaSnapshot actualPolicyTN2 = violatedQuotas.get(tn2);
|
||||
SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2);
|
||||
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
||||
assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
|
||||
|
||||
|
@ -316,8 +318,8 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
|||
|
||||
// Keep checking for the table quota policy to override the namespace quota
|
||||
while (true) {
|
||||
violatedQuotas = snapshotNotifier.copySnapshots();
|
||||
SpaceQuotaSnapshot actualTableSnapshot = violatedQuotas.get(tn1);
|
||||
snapshots = snapshotNotifier.copySnapshots();
|
||||
SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1);
|
||||
assertNotNull("Violation policy should never be null", actualTableSnapshot);
|
||||
if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy()) {
|
||||
LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
|
||||
|
@ -334,7 +336,7 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
|||
}
|
||||
|
||||
// This should not change with the introduction of the table quota for tn1
|
||||
actualPolicyTN2 = violatedQuotas.get(tn2);
|
||||
actualPolicyTN2 = snapshots.get(tn2);
|
||||
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
||||
assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
|
||||
}
|
||||
|
@ -440,6 +442,16 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
|||
assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota));
|
||||
}
|
||||
|
||||
private int numSnapshotsInViolation(Map<TableName,SpaceQuotaSnapshot> snapshots) {
|
||||
int sum = 0;
|
||||
for (SpaceQuotaSnapshot snapshot : snapshots.values()) {
|
||||
if (snapshot.getQuotaStatus().isInViolation()) {
|
||||
sum++;
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
private void sleepWithInterrupt(long millis) {
|
||||
try {
|
||||
Thread.sleep(millis);
|
||||
|
|
|
@ -22,8 +22,11 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -47,6 +50,7 @@ import org.junit.rules.TestName;
|
|||
*/
|
||||
@Category({MediumTests.class})
|
||||
public class TestQuotaStatusRPCs {
|
||||
private static final Log LOG = LogFactory.getLog(TestQuotaStatusRPCs.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final AtomicLong COUNTER = new AtomicLong(0);
|
||||
|
||||
|
@ -92,7 +96,10 @@ public class TestQuotaStatusRPCs {
|
|||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return numRegions == countRegionsForTable(tn, quotaManager.snapshotRegionSizes());
|
||||
Map<HRegionInfo,Long> regionSizes = quotaManager.snapshotRegionSizes();
|
||||
LOG.trace("Region sizes=" + regionSizes);
|
||||
return numRegions == countRegionsForTable(tn, regionSizes) &&
|
||||
tableSize <= getTableSize(tn, regionSizes);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -189,4 +196,16 @@ public class TestQuotaStatusRPCs {
|
|||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
private int getTableSize(TableName tn, Map<HRegionInfo,Long> regionSizes) {
|
||||
int tableSize = 0;
|
||||
for (Entry<HRegionInfo,Long> entry : regionSizes.entrySet()) {
|
||||
HRegionInfo regionInfo = entry.getKey();
|
||||
long regionSize = entry.getValue();
|
||||
if (tn.equals(regionInfo.getTable())) {
|
||||
tableSize += regionSize;
|
||||
}
|
||||
}
|
||||
return tableSize;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -340,6 +340,31 @@ public class TestSpaceQuotas {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=120000)
|
||||
public void testTableQuotaOverridesNamespaceQuota() throws Exception {
|
||||
final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS;
|
||||
final TableName tn = helper.createTableWithRegions(10);
|
||||
|
||||
// 2MB limit on the table, 1GB limit on the namespace
|
||||
final long tableLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
|
||||
final long namespaceLimit = 1024L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
|
||||
TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy));
|
||||
TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitNamespaceSpace(
|
||||
tn.getNamespaceAsString(), namespaceLimit, policy));
|
||||
|
||||
// Write more data than should be allowed and flush it to disk
|
||||
helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
|
||||
|
||||
// This should be sufficient time for the chores to run and see the change.
|
||||
Thread.sleep(5000);
|
||||
|
||||
// The write should be rejected because the table quota takes priority over the namespace
|
||||
Put p = new Put(Bytes.toBytes("to_reject"));
|
||||
p.addColumn(
|
||||
Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
|
||||
verifyViolation(policy, tn, p);
|
||||
}
|
||||
|
||||
private Map<HRegionInfo,Long> getReportedSizesForTable(TableName tn) {
|
||||
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
|
||||
MasterQuotaManager quotaManager = master.getMasterQuotaManager();
|
||||
|
@ -371,7 +396,12 @@ public class TestSpaceQuotas {
|
|||
private TableName writeUntilViolationAndVerifyViolation(
|
||||
SpaceViolationPolicy policyToViolate, Mutation m) throws Exception {
|
||||
final TableName tn = writeUntilViolation(policyToViolate);
|
||||
verifyViolation(policyToViolate, tn, m);
|
||||
return tn;
|
||||
}
|
||||
|
||||
private void verifyViolation(
|
||||
SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) throws Exception {
|
||||
// But let's try a few times to get the exception before failing
|
||||
boolean sawError = false;
|
||||
for (int i = 0; i < NUM_RETRIES && !sawError; i++) {
|
||||
|
@ -411,8 +441,6 @@ public class TestSpaceQuotas {
|
|||
}
|
||||
assertTrue(
|
||||
"Expected to see an exception writing data to a table exceeding its quota", sawError);
|
||||
|
||||
return tn;
|
||||
}
|
||||
|
||||
private ClientServiceCallable<Void> generateFileToLoad(
|
||||
|
|
Loading…
Reference in New Issue