HBASE-16996 Implement storage/retrieval of filesystem-use quotas into quota table (Josh Elser)
This commit is contained in:
parent
c5172169f2
commit
f74e051bce
|
@ -53,7 +53,9 @@ import org.apache.hadoop.hbase.util.Strings;
|
||||||
* <pre>
|
* <pre>
|
||||||
* ROW-KEY FAM/QUAL DATA
|
* ROW-KEY FAM/QUAL DATA
|
||||||
* n.<namespace> q:s <global-quotas>
|
* n.<namespace> q:s <global-quotas>
|
||||||
|
* n.<namespace> u:du <size in bytes>
|
||||||
* t.<table> q:s <global-quotas>
|
* t.<table> q:s <global-quotas>
|
||||||
|
* t.<table> u:du <size in bytes>
|
||||||
* u.<user> q:s <global-quotas>
|
* u.<user> q:s <global-quotas>
|
||||||
* u.<user> q:s.<table> <table-quotas>
|
* u.<user> q:s.<table> <table-quotas>
|
||||||
* u.<user> q:s.<ns>: <namespace-quotas>
|
* u.<user> q:s.<ns>: <namespace-quotas>
|
||||||
|
@ -72,6 +74,7 @@ public class QuotaTableUtil {
|
||||||
protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
|
protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
|
||||||
protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
|
protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
|
||||||
protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
|
protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
|
||||||
|
protected static final byte[] QUOTA_QUALIFIER_DISKUSAGE = Bytes.toBytes("du");
|
||||||
protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
|
protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
|
||||||
protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
|
protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
|
||||||
protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
|
protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
|
||||||
|
@ -330,11 +333,16 @@ public class QuotaTableUtil {
|
||||||
* Quotas protobuf helpers
|
* Quotas protobuf helpers
|
||||||
*/
|
*/
|
||||||
protected static Quotas quotasFromData(final byte[] data) throws IOException {
|
protected static Quotas quotasFromData(final byte[] data) throws IOException {
|
||||||
|
return quotasFromData(data, 0, data.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Quotas quotasFromData(
|
||||||
|
final byte[] data, int offset, int length) throws IOException {
|
||||||
int magicLen = ProtobufMagic.lengthOfPBMagic();
|
int magicLen = ProtobufMagic.lengthOfPBMagic();
|
||||||
if (!ProtobufMagic.isPBMagicPrefix(data, 0, magicLen)) {
|
if (!ProtobufMagic.isPBMagicPrefix(data, offset, magicLen)) {
|
||||||
throw new IOException("Missing pb magic prefix");
|
throw new IOException("Missing pb magic prefix");
|
||||||
}
|
}
|
||||||
return Quotas.parseFrom(new ByteArrayInputStream(data, magicLen, data.length - magicLen));
|
return Quotas.parseFrom(new ByteArrayInputStream(data, offset + magicLen, length - magicLen));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static byte[] quotasToData(final Quotas data) throws IOException {
|
protected static byte[] quotasToData(final Quotas data) throws IOException {
|
||||||
|
@ -348,6 +356,7 @@ public class QuotaTableUtil {
|
||||||
boolean hasSettings = false;
|
boolean hasSettings = false;
|
||||||
hasSettings |= quotas.hasThrottle();
|
hasSettings |= quotas.hasThrottle();
|
||||||
hasSettings |= quotas.hasBypassGlobals();
|
hasSettings |= quotas.hasBypassGlobals();
|
||||||
|
hasSettings |= quotas.hasSpace();
|
||||||
return !hasSettings;
|
return !hasSettings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceLimitRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.ThrottleRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.ThrottleRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
|
||||||
|
@ -293,9 +295,11 @@ public class MasterQuotaManager implements RegionStateListener {
|
||||||
Quotas quotas = quotaOps.fetch();
|
Quotas quotas = quotaOps.fetch();
|
||||||
quotaOps.preApply(quotas);
|
quotaOps.preApply(quotas);
|
||||||
|
|
||||||
|
// Copy the user request into the Quotas object
|
||||||
Quotas.Builder builder = (quotas != null) ? quotas.toBuilder() : Quotas.newBuilder();
|
Quotas.Builder builder = (quotas != null) ? quotas.toBuilder() : Quotas.newBuilder();
|
||||||
if (req.hasThrottle()) applyThrottle(builder, req.getThrottle());
|
if (req.hasThrottle()) applyThrottle(builder, req.getThrottle());
|
||||||
if (req.hasBypassGlobals()) applyBypassGlobals(builder, req.getBypassGlobals());
|
if (req.hasBypassGlobals()) applyBypassGlobals(builder, req.getBypassGlobals());
|
||||||
|
if (req.hasSpaceLimit()) applySpaceLimit(builder, req.getSpaceLimit());
|
||||||
|
|
||||||
// Submit new changes
|
// Submit new changes
|
||||||
quotas = builder.build();
|
quotas = builder.build();
|
||||||
|
@ -437,6 +441,32 @@ public class MasterQuotaManager implements RegionStateListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the information from the provided {@link SpaceLimitRequest} to the {@link Quotas} builder.
|
||||||
|
*
|
||||||
|
* @param quotas The builder to update.
|
||||||
|
* @param req The request to extract space quota information from.
|
||||||
|
*/
|
||||||
|
void applySpaceLimit(final Quotas.Builder quotas, final SpaceLimitRequest req) {
|
||||||
|
if (req.hasQuota()) {
|
||||||
|
applySpaceQuota(quotas, req.getQuota());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merges the provided {@link SpaceQuota} into the given {@link Quotas} builder.
|
||||||
|
*
|
||||||
|
* @param quotas The Quotas builder instance to update
|
||||||
|
* @param quota The SpaceQuota instance to update from
|
||||||
|
*/
|
||||||
|
void applySpaceQuota(final Quotas.Builder quotas, final SpaceQuota quota) {
|
||||||
|
// Create a builder for Quotas
|
||||||
|
SpaceQuota.Builder builder = quotas.hasSpace() ? quotas.getSpace().toBuilder() :
|
||||||
|
SpaceQuota.newBuilder();
|
||||||
|
// Update the values from the provided quota into the new one and set it on Quotas.
|
||||||
|
quotas.setSpace(builder.mergeFrom(quota).build());
|
||||||
|
}
|
||||||
|
|
||||||
private void validateTimedQuota(final TimedQuota timedQuota) throws IOException {
|
private void validateTimedQuota(final TimedQuota timedQuota) throws IOException {
|
||||||
if (timedQuota.getSoftLimit() < 1) {
|
if (timedQuota.getSoftLimit() < 1) {
|
||||||
throw new DoNotRetryIOException(new UnsupportedOperationException(
|
throw new DoNotRetryIOException(new UnsupportedOperationException(
|
||||||
|
|
|
@ -22,20 +22,32 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceLimitRequest;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -233,10 +245,121 @@ public class TestQuotaAdmin {
|
||||||
assertNumResults(0, null);
|
assertNumResults(0, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetAndGetSpaceQuota() throws Exception {
|
||||||
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
final TableName tn = TableName.valueOf("table1");
|
||||||
|
final long sizeLimit = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
|
||||||
|
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_WRITES;
|
||||||
|
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
|
||||||
|
admin.setQuota(settings);
|
||||||
|
|
||||||
|
// Verify the Quotas in the table
|
||||||
|
try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
|
||||||
|
ResultScanner scanner = quotaTable.getScanner(new Scan());
|
||||||
|
try {
|
||||||
|
Result r = Iterables.getOnlyElement(scanner);
|
||||||
|
CellScanner cells = r.cellScanner();
|
||||||
|
assertTrue("Expected to find a cell", cells.advance());
|
||||||
|
assertSpaceQuota(sizeLimit, violationPolicy, cells.current());
|
||||||
|
} finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we can retrieve it via the QuotaRetriever API
|
||||||
|
QuotaRetriever scanner = QuotaRetriever.open(admin.getConfiguration());
|
||||||
|
try {
|
||||||
|
assertSpaceQuota(sizeLimit, violationPolicy, Iterables.getOnlyElement(scanner));
|
||||||
|
} finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetAndModifyQuota() throws Exception {
|
||||||
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
final TableName tn = TableName.valueOf("table1");
|
||||||
|
final long originalSizeLimit = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
|
||||||
|
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_WRITES;
|
||||||
|
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
|
||||||
|
tn, originalSizeLimit, violationPolicy);
|
||||||
|
admin.setQuota(settings);
|
||||||
|
|
||||||
|
// Verify the Quotas in the table
|
||||||
|
try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
|
||||||
|
ResultScanner scanner = quotaTable.getScanner(new Scan());
|
||||||
|
try {
|
||||||
|
Result r = Iterables.getOnlyElement(scanner);
|
||||||
|
CellScanner cells = r.cellScanner();
|
||||||
|
assertTrue("Expected to find a cell", cells.advance());
|
||||||
|
assertSpaceQuota(originalSizeLimit, violationPolicy, cells.current());
|
||||||
|
} finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we can retrieve it via the QuotaRetriever API
|
||||||
|
QuotaRetriever quotaScanner = QuotaRetriever.open(admin.getConfiguration());
|
||||||
|
try {
|
||||||
|
assertSpaceQuota(originalSizeLimit, violationPolicy, Iterables.getOnlyElement(quotaScanner));
|
||||||
|
} finally {
|
||||||
|
quotaScanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setting a new size and policy should be reflected
|
||||||
|
final long newSizeLimit = 1024L * 1024L * 1024L * 1024L; // 1TB
|
||||||
|
final SpaceViolationPolicy newViolationPolicy = SpaceViolationPolicy.NO_WRITES_COMPACTIONS;
|
||||||
|
QuotaSettings newSettings = QuotaSettingsFactory.limitTableSpace(
|
||||||
|
tn, newSizeLimit, newViolationPolicy);
|
||||||
|
admin.setQuota(newSettings);
|
||||||
|
|
||||||
|
// Verify the new Quotas in the table
|
||||||
|
try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
|
||||||
|
ResultScanner scanner = quotaTable.getScanner(new Scan());
|
||||||
|
try {
|
||||||
|
Result r = Iterables.getOnlyElement(scanner);
|
||||||
|
CellScanner cells = r.cellScanner();
|
||||||
|
assertTrue("Expected to find a cell", cells.advance());
|
||||||
|
assertSpaceQuota(newSizeLimit, newViolationPolicy, cells.current());
|
||||||
|
} finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we can retrieve the new quota via the QuotaRetriever API
|
||||||
|
quotaScanner = QuotaRetriever.open(admin.getConfiguration());
|
||||||
|
try {
|
||||||
|
assertSpaceQuota(newSizeLimit, newViolationPolicy, Iterables.getOnlyElement(quotaScanner));
|
||||||
|
} finally {
|
||||||
|
quotaScanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
|
private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
|
||||||
assertEquals(expected, countResults(filter));
|
assertEquals(expected, countResults(filter));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertSpaceQuota(
|
||||||
|
long sizeLimit, SpaceViolationPolicy violationPolicy, Cell cell) throws Exception {
|
||||||
|
Quotas q = QuotaTableUtil.quotasFromData(
|
||||||
|
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||||
|
assertTrue("Quota should have space quota defined", q.hasSpace());
|
||||||
|
QuotaProtos.SpaceQuota spaceQuota = q.getSpace();
|
||||||
|
assertEquals(sizeLimit, spaceQuota.getSoftLimit());
|
||||||
|
assertEquals(violationPolicy, ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertSpaceQuota(
|
||||||
|
long sizeLimit, SpaceViolationPolicy violationPolicy, QuotaSettings actualSettings) {
|
||||||
|
assertTrue("The actual QuotaSettings was not an instance of " + SpaceLimitSettings.class
|
||||||
|
+ " but of " + actualSettings.getClass(), actualSettings instanceof SpaceLimitSettings);
|
||||||
|
SpaceLimitRequest spaceLimitRequest = ((SpaceLimitSettings) actualSettings).getProto();
|
||||||
|
assertEquals(sizeLimit, spaceLimitRequest.getQuota().getSoftLimit());
|
||||||
|
assertEquals(violationPolicy,
|
||||||
|
ProtobufUtil.toViolationPolicy(spaceLimitRequest.getQuota().getViolationPolicy()));
|
||||||
|
}
|
||||||
|
|
||||||
private int countResults(final QuotaFilter filter) throws Exception {
|
private int countResults(final QuotaFilter filter) throws Exception {
|
||||||
QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), filter);
|
QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), filter);
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue