HBASE-22086: Space Quota issue: Deleting snapshot doesn't update the usage of table
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
11843b7010
commit
2d78dfb2ed
|
@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.quotas;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -41,6 +44,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.QuotaStatusCalls;
|
import org.apache.hadoop.hbase.client.QuotaStatusCalls;
|
||||||
|
@ -55,6 +59,14 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
|
||||||
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
||||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
@ -491,6 +503,87 @@ public class QuotaTableUtil {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of {@code Delete} to remove given table snapshot
|
||||||
|
* entries to remove from quota table
|
||||||
|
* @param snapshotEntriesToRemove the entries to remove
|
||||||
|
*/
|
||||||
|
static List<Delete> createDeletesForExistingTableSnapshotSizes(
|
||||||
|
Multimap<TableName, String> snapshotEntriesToRemove) {
|
||||||
|
List<Delete> deletes = new ArrayList<>();
|
||||||
|
for (Map.Entry<TableName, Collection<String>> entry : snapshotEntriesToRemove.asMap()
|
||||||
|
.entrySet()) {
|
||||||
|
for (String snapshot : entry.getValue()) {
|
||||||
|
Delete d = new Delete(getTableRowKey(entry.getKey()));
|
||||||
|
d.addColumns(QUOTA_FAMILY_USAGE,
|
||||||
|
Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
|
||||||
|
deletes.add(d);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return deletes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of {@code Delete} to remove all table snapshot entries from quota table.
|
||||||
|
* @param connection connection to re-use
|
||||||
|
*/
|
||||||
|
static List<Delete> createDeletesForExistingTableSnapshotSizes(Connection connection)
|
||||||
|
throws IOException {
|
||||||
|
return createDeletesForExistingSnapshotsFromScan(connection, createScanForSpaceSnapshotSizes());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of {@code Delete} to remove given namespace snapshot
|
||||||
|
* entries to removefrom quota table
|
||||||
|
* @param snapshotEntriesToRemove the entries to remove
|
||||||
|
*/
|
||||||
|
static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(
|
||||||
|
Set<String> snapshotEntriesToRemove) {
|
||||||
|
List<Delete> deletes = new ArrayList<>();
|
||||||
|
for (String snapshot : snapshotEntriesToRemove) {
|
||||||
|
Delete d = new Delete(getNamespaceRowKey(snapshot));
|
||||||
|
d.addColumns(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
|
||||||
|
deletes.add(d);
|
||||||
|
}
|
||||||
|
return deletes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of {@code Delete} to remove all namespace snapshot entries from quota table.
|
||||||
|
* @param connection connection to re-use
|
||||||
|
*/
|
||||||
|
static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(Connection connection)
|
||||||
|
throws IOException {
|
||||||
|
return createDeletesForExistingSnapshotsFromScan(connection,
|
||||||
|
createScanForNamespaceSnapshotSizes());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
|
||||||
|
* @param connection connection to re-use
|
||||||
|
* @param scan the scanner to use to generate the list of deletes
|
||||||
|
*/
|
||||||
|
static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
|
||||||
|
throws IOException {
|
||||||
|
List<Delete> deletes = new ArrayList<>();
|
||||||
|
try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
|
||||||
|
ResultScanner rs = quotaTable.getScanner(scan)) {
|
||||||
|
for (Result r : rs) {
|
||||||
|
CellScanner cs = r.cellScanner();
|
||||||
|
while (cs.advance()) {
|
||||||
|
Cell c = cs.current();
|
||||||
|
byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
|
||||||
|
byte[] qual =
|
||||||
|
Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
|
||||||
|
Delete d = new Delete(r.getRow());
|
||||||
|
d.addColumns(family, qual);
|
||||||
|
deletes.add(d);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return deletes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches the computed size of all snapshots against tables in a namespace for space quotas.
|
* Fetches the computed size of all snapshots against tables in a namespace for space quotas.
|
||||||
*/
|
*/
|
||||||
|
@ -526,6 +619,34 @@ public class QuotaTableUtil {
|
||||||
return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage();
|
return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a scanner for all existing namespace snapshot entries.
|
||||||
|
*/
|
||||||
|
static Scan createScanForNamespaceSnapshotSizes() {
|
||||||
|
return createScanForNamespaceSnapshotSizes(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a scanner for all namespace snapshot entries of the given namespace
|
||||||
|
* @param namespace name of the namespace whose snapshot entries are to be scanned
|
||||||
|
*/
|
||||||
|
static Scan createScanForNamespaceSnapshotSizes(String namespace) {
|
||||||
|
Scan s = new Scan();
|
||||||
|
if (namespace == null || namespace.isEmpty()) {
|
||||||
|
// Read all namespaces, just look at the row prefix
|
||||||
|
s.setRowPrefixFilter(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
|
||||||
|
} else {
|
||||||
|
// Fetch the exact row for the table
|
||||||
|
byte[] rowkey = getNamespaceRowKey(namespace);
|
||||||
|
// Fetch just this one row
|
||||||
|
s.withStartRow(rowkey).withStopRow(rowkey, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just the usage family and only the snapshot size qualifiers
|
||||||
|
return s.addFamily(QUOTA_FAMILY_USAGE)
|
||||||
|
.setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
|
||||||
|
}
|
||||||
|
|
||||||
static Scan createScanForSpaceSnapshotSizes() {
|
static Scan createScanForSpaceSnapshotSizes() {
|
||||||
return createScanForSpaceSnapshotSizes(null);
|
return createScanForSpaceSnapshotSizes(null);
|
||||||
}
|
}
|
||||||
|
@ -572,6 +693,46 @@ public class QuotaTableUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a multimap for all existing table snapshot entries.
|
||||||
|
* @param conn connection to re-use
|
||||||
|
*/
|
||||||
|
public static Multimap<TableName, String> getTableSnapshots(Connection conn) throws IOException {
|
||||||
|
try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
|
||||||
|
ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
|
||||||
|
Multimap<TableName, String> snapshots = HashMultimap.create();
|
||||||
|
for (Result r : rs) {
|
||||||
|
CellScanner cs = r.cellScanner();
|
||||||
|
while (cs.advance()) {
|
||||||
|
Cell c = cs.current();
|
||||||
|
|
||||||
|
final String snapshot = extractSnapshotNameFromSizeCell(c);
|
||||||
|
snapshots.put(getTableFromRowKey(r.getRow()), snapshot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return snapshots;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a set of the names of all namespaces containing snapshot entries.
|
||||||
|
* @param conn connection to re-use
|
||||||
|
*/
|
||||||
|
public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException {
|
||||||
|
try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
|
||||||
|
ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) {
|
||||||
|
Set<String> snapshots = new HashSet<>();
|
||||||
|
for (Result r : rs) {
|
||||||
|
CellScanner cs = r.cellScanner();
|
||||||
|
while (cs.advance()) {
|
||||||
|
cs.current();
|
||||||
|
snapshots.add(getNamespaceFromRowKey(r.getRow()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return snapshots;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* =========================================================================
|
/* =========================================================================
|
||||||
* Space quota status RPC helpers
|
* Space quota status RPC helpers
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.MetricsMaster;
|
import org.apache.hadoop.hbase.master.MetricsMaster;
|
||||||
|
@ -127,6 +128,12 @@ public class SnapshotQuotaObserverChore extends ScheduledChore {
|
||||||
metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000);
|
metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove old table snapshots data
|
||||||
|
pruneTableSnapshots(snapshotsToComputeSize);
|
||||||
|
|
||||||
|
// Remove old namespace snapshots data
|
||||||
|
pruneNamespaceSnapshots(snapshotsToComputeSize);
|
||||||
|
|
||||||
// For each table, compute the size of each snapshot
|
// For each table, compute the size of each snapshot
|
||||||
Multimap<TableName,SnapshotWithSize> snapshotsWithSize = computeSnapshotSizes(
|
Multimap<TableName,SnapshotWithSize> snapshotsWithSize = computeSnapshotSizes(
|
||||||
snapshotsToComputeSize);
|
snapshotsToComputeSize);
|
||||||
|
@ -135,6 +142,43 @@ public class SnapshotQuotaObserverChore extends ScheduledChore {
|
||||||
persistSnapshotSizes(snapshotsWithSize);
|
persistSnapshotSizes(snapshotsWithSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize
|
||||||
|
*
|
||||||
|
* @param snapshotsToComputeSize list of snapshots to be persisted
|
||||||
|
*/
|
||||||
|
void pruneTableSnapshots(Multimap<TableName, String> snapshotsToComputeSize) throws IOException {
|
||||||
|
Multimap<TableName, String> existingSnapshotEntries = QuotaTableUtil.getTableSnapshots(conn);
|
||||||
|
Multimap<TableName, String> snapshotEntriesToRemove = HashMultimap.create();
|
||||||
|
for (Entry<TableName, Collection<String>> entry : existingSnapshotEntries.asMap().entrySet()) {
|
||||||
|
TableName tn = entry.getKey();
|
||||||
|
Set<String> setOfSnapshots = new HashSet<>(entry.getValue());
|
||||||
|
for (String snapshot : snapshotsToComputeSize.get(tn)) {
|
||||||
|
setOfSnapshots.remove(snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String snapshot : setOfSnapshots) {
|
||||||
|
snapshotEntriesToRemove.put(tn, snapshot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
removeExistingTableSnapshotSizes(snapshotEntriesToRemove);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize
|
||||||
|
*
|
||||||
|
* @param snapshotsToComputeSize list of snapshots to be persisted
|
||||||
|
*/
|
||||||
|
void pruneNamespaceSnapshots(Multimap<TableName, String> snapshotsToComputeSize)
|
||||||
|
throws IOException {
|
||||||
|
Set<String> existingSnapshotEntries = QuotaTableUtil.getNamespaceSnapshots(conn);
|
||||||
|
for (TableName tableName : snapshotsToComputeSize.keySet()) {
|
||||||
|
existingSnapshotEntries.remove(tableName.getNamespaceAsString());
|
||||||
|
}
|
||||||
|
// here existingSnapshotEntries is left with the entries to be removed
|
||||||
|
removeExistingNamespaceSnapshotSizes(existingSnapshotEntries);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches each table with a quota (table or namespace quota), and then fetch the name of each
|
* Fetches each table with a quota (table or namespace quota), and then fetch the name of each
|
||||||
* snapshot which was created from that table.
|
* snapshot which was created from that table.
|
||||||
|
@ -506,6 +550,24 @@ public class SnapshotQuotaObserverChore extends ScheduledChore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void removeExistingTableSnapshotSizes(Multimap<TableName, String> snapshotEntriesToRemove)
|
||||||
|
throws IOException {
|
||||||
|
removeExistingSnapshotSizes(
|
||||||
|
QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(snapshotEntriesToRemove));
|
||||||
|
}
|
||||||
|
|
||||||
|
void removeExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove)
|
||||||
|
throws IOException {
|
||||||
|
removeExistingSnapshotSizes(
|
||||||
|
QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(snapshotEntriesToRemove));
|
||||||
|
}
|
||||||
|
|
||||||
|
void removeExistingSnapshotSizes(List<Delete> deletes) throws IOException {
|
||||||
|
try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
|
||||||
|
quotaTable.delete(deletes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts the period for the chore from the configuration.
|
* Extracts the period for the chore from the configuration.
|
||||||
*
|
*
|
||||||
|
|
|
@ -24,9 +24,12 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -36,6 +39,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
@ -53,6 +57,8 @@ import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
@ -109,6 +115,68 @@ public class TestQuotaTableUtil {
|
||||||
this.connection.close();
|
this.connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteSnapshots() throws Exception {
|
||||||
|
TableName tn = TableName.valueOf(name.getMethodName());
|
||||||
|
try (Table t = connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
|
||||||
|
Quotas quota = Quotas.newBuilder().setSpace(
|
||||||
|
QuotaProtos.SpaceQuota.newBuilder().setSoftLimit(7L)
|
||||||
|
.setViolationPolicy(QuotaProtos.SpaceViolationPolicy.NO_WRITES).build()).build();
|
||||||
|
QuotaUtil.addTableQuota(connection, tn, quota);
|
||||||
|
|
||||||
|
String snapshotName = name.getMethodName() + "_snapshot";
|
||||||
|
t.put(QuotaTableUtil.createPutForSnapshotSize(tn, snapshotName, 3L));
|
||||||
|
t.put(QuotaTableUtil.createPutForSnapshotSize(tn, snapshotName, 5L));
|
||||||
|
assertEquals(1, QuotaTableUtil.getObservedSnapshotSizes(connection).size());
|
||||||
|
|
||||||
|
List<Delete> deletes = QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(connection);
|
||||||
|
assertEquals(1, deletes.size());
|
||||||
|
|
||||||
|
t.delete(deletes);
|
||||||
|
assertEquals(0, QuotaTableUtil.getObservedSnapshotSizes(connection).size());
|
||||||
|
|
||||||
|
String ns = name.getMethodName();
|
||||||
|
t.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns, 5L));
|
||||||
|
t.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns, 3L));
|
||||||
|
assertEquals(3L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns));
|
||||||
|
|
||||||
|
deletes = QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(connection);
|
||||||
|
assertEquals(1, deletes.size());
|
||||||
|
|
||||||
|
t.delete(deletes);
|
||||||
|
assertEquals(0L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns));
|
||||||
|
|
||||||
|
t.put(QuotaTableUtil.createPutForSnapshotSize(TableName.valueOf("t1"), "s1", 3L));
|
||||||
|
t.put(QuotaTableUtil.createPutForSnapshotSize(TableName.valueOf("t2"), "s2", 3L));
|
||||||
|
t.put(QuotaTableUtil.createPutForSnapshotSize(TableName.valueOf("t3"), "s3", 3L));
|
||||||
|
t.put(QuotaTableUtil.createPutForSnapshotSize(TableName.valueOf("t4"), "s4", 3L));
|
||||||
|
t.put(QuotaTableUtil.createPutForSnapshotSize(TableName.valueOf("t1"), "s5", 3L));
|
||||||
|
|
||||||
|
t.put(QuotaTableUtil.createPutForNamespaceSnapshotSize("ns1", 3L));
|
||||||
|
t.put(QuotaTableUtil.createPutForNamespaceSnapshotSize("ns2", 3L));
|
||||||
|
t.put(QuotaTableUtil.createPutForNamespaceSnapshotSize("ns3", 3L));
|
||||||
|
|
||||||
|
assertEquals(5,QuotaTableUtil.getTableSnapshots(connection).size());
|
||||||
|
assertEquals(3,QuotaTableUtil.getNamespaceSnapshots(connection).size());
|
||||||
|
|
||||||
|
Multimap<TableName, String> tableSnapshotEntriesToRemove = HashMultimap.create();
|
||||||
|
tableSnapshotEntriesToRemove.put(TableName.valueOf("t1"), "s1");
|
||||||
|
tableSnapshotEntriesToRemove.put(TableName.valueOf("t3"), "s3");
|
||||||
|
tableSnapshotEntriesToRemove.put(TableName.valueOf("t4"), "s4");
|
||||||
|
|
||||||
|
Set<String> namespaceSnapshotEntriesToRemove = new HashSet<>();
|
||||||
|
namespaceSnapshotEntriesToRemove.add("ns2");
|
||||||
|
namespaceSnapshotEntriesToRemove.add("ns1");
|
||||||
|
|
||||||
|
deletes =
|
||||||
|
QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(tableSnapshotEntriesToRemove);
|
||||||
|
assertEquals(3, deletes.size());
|
||||||
|
deletes = QuotaTableUtil
|
||||||
|
.createDeletesForExistingNamespaceSnapshotSizes(namespaceSnapshotEntriesToRemove);
|
||||||
|
assertEquals(2, deletes.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTableQuotaUtil() throws Exception {
|
public void testTableQuotaUtil() throws Exception {
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
@ -266,6 +334,8 @@ public class TestQuotaTableUtil {
|
||||||
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap0", 2048L);
|
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap0", 2048L);
|
||||||
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap1", 4096L);
|
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap1", 4096L);
|
||||||
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap2", 6144L);
|
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap2", 6144L);
|
||||||
|
|
||||||
|
cleanUpSnapshotSizes();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,6 +352,8 @@ public class TestQuotaTableUtil {
|
||||||
assertEquals(1024L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns1));
|
assertEquals(1024L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns1));
|
||||||
assertEquals(2048L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns2));
|
assertEquals(2048L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns2));
|
||||||
assertEquals(8192L, QuotaTableUtil.getNamespaceSnapshotSize(connection, defaultNs));
|
assertEquals(8192L, QuotaTableUtil.getNamespaceSnapshotSize(connection, defaultNs));
|
||||||
|
|
||||||
|
cleanUpSnapshotSizes();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,4 +372,14 @@ public class TestQuotaTableUtil {
|
||||||
c.getValueArray(), c.getValueOffset(), c.getValueLength())).getQuotaUsage());
|
c.getValueArray(), c.getValueOffset(), c.getValueLength())).getQuotaUsage());
|
||||||
assertFalse(cs.advance());
|
assertFalse(cs.advance());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanUpSnapshotSizes() throws IOException {
|
||||||
|
try (Table t = connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
|
||||||
|
QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(connection);
|
||||||
|
List<Delete> deletes =
|
||||||
|
QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(connection);
|
||||||
|
deletes.addAll(QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(connection));
|
||||||
|
t.delete(deletes);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -333,6 +336,92 @@ public class TestSnapshotQuotaObserverChore {
|
||||||
assertEquals(3072L, (long) nsSizes.get(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR));
|
assertEquals(3072L, (long) nsSizes.get(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemovedSnapshots() throws Exception {
|
||||||
|
// Create a table and set a quota
|
||||||
|
TableName tn1 = helper.createTableWithRegions(1);
|
||||||
|
admin.setQuota(QuotaSettingsFactory.limitTableSpace(tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE,
|
||||||
|
SpaceViolationPolicy.NO_INSERTS));
|
||||||
|
|
||||||
|
// Write some data and flush it
|
||||||
|
helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); // 256 KB
|
||||||
|
|
||||||
|
final AtomicReference<Long> lastSeenSize = new AtomicReference<>();
|
||||||
|
// Wait for the Master chore to run to see the usage (with a fudge factor)
|
||||||
|
TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) {
|
||||||
|
@Override
|
||||||
|
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||||
|
lastSeenSize.set(snapshot.getUsage());
|
||||||
|
return snapshot.getUsage() > 230L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create a snapshot on the table
|
||||||
|
final String snapshotName1 = tn1 + "snapshot1";
|
||||||
|
admin.snapshot(new SnapshotDescription(snapshotName1, tn1, SnapshotType.SKIPFLUSH));
|
||||||
|
|
||||||
|
// Snapshot size has to be 0 as the snapshot shares the data with the table
|
||||||
|
final Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME);
|
||||||
|
TEST_UTIL.waitFor(30_000, new Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName1);
|
||||||
|
Result r = quotaTable.get(g);
|
||||||
|
if (r == null || r.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
r.advance();
|
||||||
|
Cell c = r.current();
|
||||||
|
return QuotaTableUtil.parseSnapshotSize(c) == 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Total usage has to remain same as what we saw before taking a snapshot
|
||||||
|
TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) {
|
||||||
|
@Override
|
||||||
|
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||||
|
return snapshot.getUsage() == lastSeenSize.get();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Major compact the table to force a rewrite
|
||||||
|
TEST_UTIL.compact(tn1, true);
|
||||||
|
// Now the snapshot size has to prev total size
|
||||||
|
TEST_UTIL.waitFor(30_000, new Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName1);
|
||||||
|
Result r = quotaTable.get(g);
|
||||||
|
if (r == null || r.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
r.advance();
|
||||||
|
Cell c = r.current();
|
||||||
|
// The compaction result file has an additional compaction event tracker
|
||||||
|
return lastSeenSize.get() == QuotaTableUtil.parseSnapshotSize(c);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// The total size now has to be equal/more than double of prev total size
|
||||||
|
// as double the number of store files exist now.
|
||||||
|
final AtomicReference<Long> sizeAfterCompaction = new AtomicReference<>();
|
||||||
|
TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) {
|
||||||
|
@Override
|
||||||
|
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||||
|
sizeAfterCompaction.set(snapshot.getUsage());
|
||||||
|
return snapshot.getUsage() >= 2 * lastSeenSize.get();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Delete the snapshot
|
||||||
|
admin.deleteSnapshot(snapshotName1);
|
||||||
|
// Total size has to come down to prev totalsize - snapshot size(which was removed)
|
||||||
|
TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) {
|
||||||
|
@Override
|
||||||
|
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||||
|
return snapshot.getUsage() == (sizeAfterCompaction.get() - lastSeenSize.get());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private long count(Table t) throws IOException {
|
private long count(Table t) throws IOException {
|
||||||
try (ResultScanner rs = t.getScanner(new Scan())) {
|
try (ResultScanner rs = t.getScanner(new Scan())) {
|
||||||
long sum = 0;
|
long sum = 0;
|
||||||
|
|
Loading…
Reference in New Issue