HBASE-17748 Include HBase snapshots in space quotas

Introduces a new Chore in the Master which computes the size
of the snapshots included in a cluster. The size of these
snapshots are included in the table's which the snapshot was created
from HDFS usage.

Includes some test stabilization, trying to make the tests more
deterministic by ensuring we observe stable values as we know
that those values are mutable. This should help avoid problems
where size reports are delayed and we see an incomplete value.
This commit is contained in:
Josh Elser 2017-03-08 20:56:37 -05:00
parent a6216db16f
commit e5ea457054
28 changed files with 1977 additions and 96 deletions

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
@ -70,16 +71,18 @@ import org.apache.hadoop.hbase.util.Strings;
/**
* Helper class to interact with the quota table.
* <pre>
* ROW-KEY FAM/QUAL DATA
* n.&lt;namespace&gt; q:s &lt;global-quotas&gt;
* t.&lt;namespace&gt; u:p &lt;namespace-quota policy&gt;
* t.&lt;table&gt; q:s &lt;global-quotas&gt;
* t.&lt;table&gt; u:p &lt;table-quota policy&gt;
* u.&lt;user&gt; q:s &lt;global-quotas&gt;
* u.&lt;user&gt; q:s.&lt;table&gt; &lt;table-quotas&gt;
* u.&lt;user&gt; q:s.&lt;ns&gt;: &lt;namespace-quotas&gt;
* </pre>
* <table>
* <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th></tr>
* <tr><td>n.&lt;namespace&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
* <tr><td>n.&lt;namespace&gt;</td><td>u:p</td><td>&lt;namespace-quota policy&gt;</td></tr>
* <tr><td>n.&lt;namespace&gt;</td><td>u:s</td><td>&lt;SpaceQuotaSnapshot&gt;</td></tr>
* <tr><td>t.&lt;table&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
* <tr><td>t.&lt;table&gt;</td><td>u:p</td><td>&lt;table-quota policy&gt;</td></tr>
* <tr><td>t.&lt;table&gt;</td><td>u:ss.&lt;snapshot name&gt;</td><td>&lt;SpaceQuotaSnapshot&gt;</td></tr>
* <tr><td>u.&lt;user&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
* <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;table&gt;</td><td>&lt;table-quotas&gt;</td></tr>
* <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;ns&gt;</td><td>&lt;namespace-quotas&gt;</td></tr>
* </table
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -95,6 +98,7 @@ public class QuotaTableUtil {
protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
protected static final byte[] QUOTA_QUALIFIER_POLICY = Bytes.toBytes("p");
protected static final byte[] QUOTA_SNAPSHOT_SIZE_QUALIFIER = Bytes.toBytes("ss");
protected static final String QUOTA_POLICY_COLUMN =
Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_POLICY);
protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
@ -229,12 +233,7 @@ public class QuotaTableUtil {
* Creates a {@link Scan} which returns only quota snapshots from the quota table.
*/
public static Scan makeQuotaSnapshotScan() {
Scan s = new Scan();
// Limit to "u:v" column
s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
// Limit rowspace to the "t:" prefix
s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
return s;
return makeQuotaSnapshotScanForTable(null);
}
/**
@ -254,6 +253,25 @@ public class QuotaTableUtil {
return snapshots;
}
/**
* Creates a {@link Scan} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
* specific table.
* @param tn Optionally, a table name to limit the scan's rowkey space. Can be null.
*/
public static Scan makeQuotaSnapshotScanForTable(TableName tn) {
Scan s = new Scan();
// Limit to "u:v" column
s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
if (null == tn) {
s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
} else {
byte[] row = getTableRowKey(tn);
// Limit rowspace to the "t:" prefix
s.withStartRow(row, true).withStopRow(row, true);
}
return s;
}
/**
* Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
* {@link Result} and adds them to the given {@link Map}. If the result does not contain
@ -416,7 +434,7 @@ public class QuotaTableUtil {
* Creates a {@link Put} to store the given {@code snapshot} for the given {@code tableName} in
* the quota table.
*/
public static Put createPutSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) {
static Put createPutForSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) {
Put p = new Put(getTableRowKey(tableName));
p.addColumn(
QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY,
@ -424,6 +442,88 @@ public class QuotaTableUtil {
return p;
}
/**
* Creates a {@link Get} for the HBase snapshot's size against the given table.
*/
static Get makeGetForSnapshotSize(TableName tn, String snapshot) {
Get g = new Get(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(tn.toString())));
g.addColumn(
QUOTA_FAMILY_USAGE,
Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
return g;
}
/**
* Creates a {@link Put} to persist the current size of the {@code snapshot} with respect to
* the given {@code table}.
*/
static Put createPutForSnapshotSize(TableName tableName, String snapshot, long size) {
// We just need a pb message with some `long usage`, so we can just reuse the
// SpaceQuotaSnapshot message instead of creating a new one.
Put p = new Put(getTableRowKey(tableName));
p.addColumn(QUOTA_FAMILY_USAGE, getSnapshotSizeQualifier(snapshot),
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
.newBuilder().setQuotaUsage(size).build().toByteArray());
return p;
}
/**
* Creates a {@code Put} for the namespace's total snapshot size.
*/
static Put createPutForNamespaceSnapshotSize(String namespace, long size) {
Put p = new Put(getNamespaceRowKey(namespace));
p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER,
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
.newBuilder().setQuotaUsage(size).build().toByteArray());
return p;
}
/**
* Fetches the computed size of all snapshots against tables in a namespace for space quotas.
*/
static long getNamespaceSnapshotSize(
Connection conn, String namespace) throws IOException {
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace));
if (r.isEmpty()) {
return 0L;
}
r.advance();
return parseSnapshotSize(r.current());
} catch (InvalidProtocolBufferException e) {
throw new IOException("Could not parse snapshot size value for namespace " + namespace, e);
}
}
/**
* Creates a {@code Get} to fetch the namespace's total snapshot size.
*/
static Get createGetNamespaceSnapshotSize(String namespace) {
Get g = new Get(getNamespaceRowKey(namespace));
g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
return g;
}
/**
* Parses the snapshot size from the given Cell's value.
*/
static long parseSnapshotSize(Cell c) throws InvalidProtocolBufferException {
ByteString bs = UnsafeByteOperations.unsafeWrap(
c.getValueArray(), c.getValueOffset(), c.getValueLength());
return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage();
}
static Scan createScanForSnapshotSizes(TableName table) {
byte[] rowkey = getTableRowKey(table);
return new Scan()
// Fetch just this one row
.withStartRow(rowkey)
.withStopRow(rowkey, true)
// Just the usage family
.addFamily(QUOTA_FAMILY_USAGE)
// Only the snapshot size qualifiers
.setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
}
/* =========================================================================
* Space quota status RPC helpers
@ -644,4 +744,15 @@ public class QuotaTableUtil {
}
return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
}
protected static byte[] getSnapshotSizeQualifier(String snapshotName) {
return Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshotName));
}
protected static long extractSnapshotSize(
byte[] data, int offset, int length) throws InvalidProtocolBufferException {
ByteString byteStr = UnsafeByteOperations.unsafeWrap(data, offset, length);
return org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
.parseFrom(byteStr).getQuotaUsage();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.util.StringUtils;
/**
* A point-in-time view of a space quota on a table.
@ -175,7 +176,8 @@ public class SpaceQuotaSnapshot {
public String toString() {
StringBuilder sb = new StringBuilder(32);
sb.append("SpaceQuotaSnapshot[policy=").append(quotaStatus).append(", use=");
sb.append(usage).append("bytes/").append(limit).append("bytes]");
sb.append(StringUtils.byteDesc(usage)).append("/");
sb.append(StringUtils.byteDesc(limit)).append("]");
return sb.toString();
}

View File

@ -39,6 +39,15 @@ public interface MetricsMasterQuotaSource extends BaseSource {
String QUOTA_OBSERVER_CHORE_TIME_NAME = "quotaObserverChoreTime";
String QUOTA_OBSERVER_CHORE_TIME_DESC =
"Histogram for the time in millis for the QuotaObserverChore";
String SNAPSHOT_OBSERVER_CHORE_TIME_NAME = "snapshotQuotaObserverChoreTime";
String SNAPSHOT_OBSERVER_CHORE_TIME_DESC =
"Histogram for the time in millis for the SnapshotQuotaObserverChore";
String SNAPSHOT_OBSERVER_SIZE_COMPUTATION_TIME_NAME = "snapshotObserverSizeComputationTime";
String SNAPSHOT_OBSERVER_SIZE_COMPUTATION_TIME_DESC =
"Histogram for the time in millis to compute the size of each snapshot";
String SNAPSHOT_OBSERVER_FETCH_TIME_NAME = "snapshotObserverSnapshotFetchTime";
String SNAPSHOT_OBSERVER_FETCH_TIME_DESC =
"Histogram for the time in millis to fetch all snapshots from HBase";
String TABLE_QUOTA_USAGE_NAME = "tableSpaceQuotaOverview";
String TABLE_QUOTA_USAGE_DESC = "A JSON summary of the usage of all tables with space quotas";
String NS_QUOTA_USAGE_NAME = "namespaceSpaceQuotaOverview";
@ -83,4 +92,22 @@ public interface MetricsMasterQuotaSource extends BaseSource {
* @param time The execution time of the chore in milliseconds
*/
void incrementSpaceQuotaObserverChoreTime(long time);
/**
* Updates the metric tracking the amount of time taken by the {@code SnapshotQuotaObserverChore}
* which runs periodically.
*/
void incrementSnapshotObserverChoreTime(long time);
/**
* Updates the metric tracking the amount of time taken by the {@code SnapshotQuotaObserverChore}
* to compute the size of one snapshot, relative to the files referenced by the originating table.
*/
void incrementSnapshotObserverSnapshotComputationTime(long time);
/**
* Updates the metric tracking the amount of time taken by the {@code SnapshotQuotaObserverChore}
* to fetch all snapshots.
*/
void incrementSnapshotObserverSnapshotFetchTime(long time);
}

View File

@ -39,6 +39,9 @@ public class MetricsMasterQuotaSourceImpl extends BaseSourceImpl implements Metr
private final MutableGaugeLong namespacesViolatingQuotasGauge;
private final MutableGaugeLong regionSpaceReportsGauge;
private final MetricHistogram quotaObserverTimeHisto;
private final MetricHistogram snapshotObserverTimeHisto;
private final MetricHistogram snapshotObserverSizeComputationTimeHisto;
private final MetricHistogram snapshotObserverSnapshotFetchTimeHisto;
public MetricsMasterQuotaSourceImpl(MetricsMasterWrapper wrapper) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, wrapper);
@ -61,6 +64,13 @@ public class MetricsMasterQuotaSourceImpl extends BaseSourceImpl implements Metr
quotaObserverTimeHisto = getMetricsRegistry().newTimeHistogram(
QUOTA_OBSERVER_CHORE_TIME_NAME, QUOTA_OBSERVER_CHORE_TIME_DESC);
snapshotObserverTimeHisto = getMetricsRegistry().newTimeHistogram(
SNAPSHOT_OBSERVER_CHORE_TIME_NAME, SNAPSHOT_OBSERVER_CHORE_TIME_DESC);
snapshotObserverSizeComputationTimeHisto = getMetricsRegistry().newTimeHistogram(
SNAPSHOT_OBSERVER_SIZE_COMPUTATION_TIME_NAME, SNAPSHOT_OBSERVER_SIZE_COMPUTATION_TIME_DESC);
snapshotObserverSnapshotFetchTimeHisto = getMetricsRegistry().newTimeHistogram(
SNAPSHOT_OBSERVER_FETCH_TIME_NAME, SNAPSHOT_OBSERVER_FETCH_TIME_DESC);
}
@Override
@ -88,6 +98,11 @@ public class MetricsMasterQuotaSourceImpl extends BaseSourceImpl implements Metr
quotaObserverTimeHisto.add(time);
}
@Override
public void incrementSnapshotObserverChoreTime(long time) {
snapshotObserverTimeHisto.add(time);
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder record = metricsCollector.addRecord(metricsRegistry.info());
@ -130,4 +145,14 @@ public class MetricsMasterQuotaSourceImpl extends BaseSourceImpl implements Metr
sb.insert(0, "[").append("]");
return sb.toString();
}
@Override
public void incrementSnapshotObserverSnapshotComputationTime(long time) {
snapshotObserverSizeComputationTimeHisto.add(time);
}
@Override
public void incrementSnapshotObserverSnapshotFetchTime(long time) {
snapshotObserverSnapshotFetchTimeHisto.add(time);
}
}

View File

@ -141,6 +141,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver;
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
@ -385,6 +386,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private volatile MasterQuotaManager quotaManager;
private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
private QuotaObserverChore quotaObserverChore;
private SnapshotQuotaObserverChore snapshotQuotaChore;
private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
private WALProcedureStore procedureStore;
@ -896,6 +898,10 @@ public class HMaster extends HRegionServer implements MasterServices {
this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
// Start the chore to read the region FS space reports and act on them
getChoreService().scheduleChore(quotaObserverChore);
this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
// Start the chore to read snapshots and add their usage to table/NS quotas
getChoreService().scheduleChore(snapshotQuotaChore);
}
// clear the dead servers with same host name and port of online server because we are not
@ -1240,6 +1246,9 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.quotaObserverChore != null) {
quotaObserverChore.cancel();
}
if (this.snapshotQuotaChore != null) {
snapshotQuotaChore.cancel();
}
}
/**

View File

@ -156,4 +156,25 @@ public class MetricsMaster {
}
};
}
/**
* Sets the execution time of a period of the {@code SnapshotQuotaObserverChore}.
*/
public void incrementSnapshotObserverTime(final long executionTime) {
masterQuotaSource.incrementSnapshotObserverChoreTime(executionTime);
}
/**
* Sets the execution time to compute the size of a single snapshot.
*/
public void incrementSnapshotSizeComputationTime(final long executionTime) {
masterQuotaSource.incrementSnapshotObserverSnapshotComputationTime(executionTime);
}
/**
* Sets the execution time to fetch the mapping of snapshots to originating table.
*/
public void incrementSnapshotFetchTime(long executionTime) {
masterQuotaSource.incrementSnapshotObserverSnapshotFetchTime(executionTime);
}
}

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -168,9 +170,10 @@ public class FileSystemUtilizationChore extends ScheduledChore {
long computeSize(Region r) {
long regionSize = 0L;
for (Store store : r.getStores()) {
// StoreFile/StoreFileReaders are already instantiated with the file length cached.
// Can avoid extra NN ops.
regionSize += store.getStorefilesSize();
regionSize += store.getHFilesSize();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Size of " + r + " is " + regionSize);
}
return regionSize;
}

View File

@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -77,7 +77,8 @@ public class NamespaceQuotaSnapshotStore implements QuotaSnapshotStore<String> {
}
@Override
public SpaceQuotaSnapshot getTargetState(String subject, SpaceQuota spaceQuota) {
public SpaceQuotaSnapshot getTargetState(
String subject, SpaceQuota spaceQuota) throws IOException {
rlock.lock();
try {
final long sizeLimitInBytes = spaceQuota.getSoftLimit();
@ -85,6 +86,8 @@ public class NamespaceQuotaSnapshotStore implements QuotaSnapshotStore<String> {
for (Entry<HRegionInfo,Long> entry : filterBySubject(subject)) {
sum += entry.getValue();
}
// Add in the size for any snapshots against this table
sum += QuotaTableUtil.getNamespaceSnapshotSize(conn, subject);
// Observance is defined as the size of the table being less than the limit
SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation()
: new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));

View File

@ -161,7 +161,9 @@ public class QuotaObserverChore extends ScheduledChore {
// The current "view" of region space use. Used henceforth.
final Map<HRegionInfo,Long> reportedRegionSpaceUse = quotaManager.snapshotRegionSizes();
if (LOG.isTraceEnabled()) {
LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports");
LOG.trace(
"Using " + reportedRegionSpaceUse.size() + " region space use reports: " +
reportedRegionSpaceUse);
}
// Remove the "old" region reports

View File

@ -69,7 +69,7 @@ public interface QuotaSnapshotStore<T> {
* @param subject The object which to determine the target SpaceQuotaSnapshot of
* @param spaceQuota The quota "definition" for the {@code subject}
*/
SpaceQuotaSnapshot getTargetState(T subject, SpaceQuota spaceQuota);
SpaceQuotaSnapshot getTargetState(T subject, SpaceQuota spaceQuota) throws IOException;
/**
* Filters the provided <code>regions</code>, returning those which match the given

View File

@ -0,0 +1,543 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
/**
* A Master-invoked {@code Chore} that computes the size of each snapshot which was created from
* a table which has a space quota.
*/
@InterfaceAudience.Private
public class SnapshotQuotaObserverChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(SnapshotQuotaObserverChore.class);
static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY =
"hbase.master.quotas.snapshot.chore.period";
static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY =
"hbase.master.quotas.snapshot.chore.delay";
static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis
static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY =
"hbase.master.quotas.snapshot.chore.timeunit";
static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
private final Connection conn;
private final Configuration conf;
private final MetricsMaster metrics;
private final FileSystem fs;
public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) {
this(
master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics);
}
SnapshotQuotaObserverChore(
Connection conn, Configuration conf, FileSystem fs, Stoppable stopper,
MetricsMaster metrics) {
super(
QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
getInitialDelay(conf), getTimeUnit(conf));
this.conn = conn;
this.conf = conf;
this.metrics = metrics;
this.fs = fs;
}
@Override
protected void chore() {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Computing sizes of snapshots for quota management.");
}
long start = System.nanoTime();
_chore();
if (null != metrics) {
metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000);
}
} catch (IOException e) {
LOG.warn("Failed to compute the size of snapshots, will retry", e);
}
}
void _chore() throws IOException {
// Gets all tables with quotas that also have snapshots.
// This values are all of the snapshots that we need to compute the size of.
long start = System.nanoTime();
Multimap<TableName,String> snapshotsToComputeSize = getSnapshotsToComputeSize();
if (null != metrics) {
metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000);
}
// For each table, compute the size of each snapshot
Multimap<TableName,SnapshotWithSize> snapshotsWithSize = computeSnapshotSizes(
snapshotsToComputeSize);
// Write the size data to the quota table.
persistSnapshotSizes(snapshotsWithSize);
}
/**
* Fetches each table with a quota (table or namespace quota), and then fetch the name of each
* snapshot which was created from that table.
*
* @return A mapping of table to snapshots created from that table
*/
Multimap<TableName,String> getSnapshotsToComputeSize() throws IOException {
Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>();
QuotaFilter filter = new QuotaFilter();
filter.addTypeFilter(QuotaType.SPACE);
try (Admin admin = conn.getAdmin()) {
// Pull all of the tables that have quotas (direct, or from namespace)
for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) {
String ns = qs.getNamespace();
TableName tn = qs.getTableName();
if ((null == ns && null == tn) || (null != ns && null != tn)) {
throw new IllegalStateException(
"Expected only one of namespace and tablename to be null");
}
// Collect either the table name itself, or all of the tables in the namespace
if (null != ns) {
tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns)));
} else {
tablesToFetchSnapshotsFrom.add(tn);
}
}
// Fetch all snapshots that were created from these tables
return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom);
}
}
/**
* Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName}
* exists in the provided {@code Set}.
*/
Multimap<TableName,String> getSnapshotsFromTables(
Admin admin, Set<TableName> tablesToFetchSnapshotsFrom) throws IOException {
Multimap<TableName,String> snapshotsToCompute = HashMultimap.create();
for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) {
TableName tn = sd.getTableName();
if (tablesToFetchSnapshotsFrom.contains(tn)) {
snapshotsToCompute.put(tn, sd.getName());
}
}
return snapshotsToCompute;
}
/**
* Computes the size of each snapshot provided given the current files referenced by the table.
*
* @param snapshotsToComputeSize The snapshots to compute the size of
* @return A mapping of table to snapshot created from that table and the snapshot's size.
*/
Multimap<TableName,SnapshotWithSize> computeSnapshotSizes(
Multimap<TableName,String> snapshotsToComputeSize) throws IOException {
Multimap<TableName,SnapshotWithSize> snapshotSizes = HashMultimap.create();
for (Entry<TableName,Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) {
final TableName tn = entry.getKey();
final List<String> snapshotNames = new ArrayList<>(entry.getValue());
// Sort the snapshots so we process them in lexicographic order. This ensures that multiple
// invocations of this Chore do not more the size ownership of some files between snapshots
// that reference the file (prevents size ownership from moving between snapshots).
Collections.sort(snapshotNames);
final Path rootDir = FSUtils.getRootDir(conf);
// Get the map of store file names to store file path for this table
// TODO is the store-file name unique enough? Does this need to be region+family+storefile?
final Set<String> tableReferencedStoreFiles;
try {
tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir).keySet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles);
}
// For each snapshot on this table, get the files which the snapshot references which
// the table does not.
Set<String> snapshotReferencedFiles = new HashSet<>();
for (String snapshotName : snapshotNames) {
final long start = System.nanoTime();
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
if (LOG.isTraceEnabled()) {
LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles);
}
// Get the set of files from the manifest that this snapshot references which are not also
// referenced by the originating table.
Set<StoreFileReference> unreferencedStoreFileNames = getStoreFilesFromSnapshot(
manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn)
&& !snapshotReferencedFiles.contains(sfn));
if (LOG.isTraceEnabled()) {
LOG.trace("Snapshot " + snapshotName + " solely references the files: "
+ unreferencedStoreFileNames);
}
// Compute the size of the store files for this snapshot
long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames);
if (LOG.isTraceEnabled()) {
LOG.trace("Computed size of " + snapshotName + " to be " + size);
}
// Persist this snapshot's size into the map
snapshotSizes.put(tn, new SnapshotWithSize(snapshotName, size));
// Make sure that we don't double-count the same file
for (StoreFileReference ref : unreferencedStoreFileNames) {
for (String fileName : ref.getFamilyToFilesMapping().values()) {
snapshotReferencedFiles.add(fileName);
}
}
// Update the amount of time it took to compute the snapshot's size
if (null != metrics) {
metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000);
}
}
}
return snapshotSizes;
}
/**
* Extracts the names of the store files referenced by this snapshot which satisfy the given
* predicate (the predicate returns {@code true}).
*/
Set<StoreFileReference> getStoreFilesFromSnapshot(
SnapshotManifest manifest, Predicate<String> filter) {
Set<StoreFileReference> references = new HashSet<>();
// For each region referenced by the snapshot
for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
StoreFileReference regionReference = new StoreFileReference(
HRegionInfo.convert(rm.getRegionInfo()).getEncodedName());
// For each column family in this region
for (FamilyFiles ff : rm.getFamilyFilesList()) {
final String familyName = ff.getFamilyName().toStringUtf8();
// And each store file in that family
for (StoreFile sf : ff.getStoreFilesList()) {
String storeFileName = sf.getName();
// A snapshot only "inherits" a files size if it uniquely refers to it (no table
// and no other snapshot references it).
if (filter.test(storeFileName)) {
regionReference.addFamilyStoreFile(familyName, storeFileName);
}
}
}
// Only add this Region reference if we retained any files.
if (!regionReference.getFamilyToFilesMapping().isEmpty()) {
references.add(regionReference);
}
}
return references;
}
/**
* Calculates the directory in HDFS for a table based on the configuration.
*/
Path getTableDir(TableName tn) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
return FSUtils.getTableDir(rootDir, tn);
}
/**
* Computes the size of each store file in {@code storeFileNames}
*/
long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> storeFileNames) {
return storeFileNames.stream()
.collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr)));
}
/**
* Computes the size of the store files for a single region.
*/
long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) {
String regionName = storeFileName.getRegionName();
return storeFileName.getFamilyToFilesMapping()
.entries().stream()
.collect(Collectors.summingLong((e) ->
getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue())));
}
/**
* Computes the size of the store file given its name, region and family name in
* the archive directory.
*/
long getSizeOfStoreFile(
TableName tn, String regionName, String family, String storeFile) {
Path familyArchivePath;
try {
familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family);
} catch (IOException e) {
LOG.warn("Could not compute path for the archive directory for the region", e);
return 0L;
}
Path fileArchivePath = new Path(familyArchivePath, storeFile);
try {
if (fs.exists(fileArchivePath)) {
FileStatus[] status = fs.listStatus(fileArchivePath);
if (1 != status.length) {
LOG.warn("Expected " + fileArchivePath +
" to be a file but was a directory, ignoring reference");
return 0L;
}
return status[0].getLen();
}
} catch (IOException e) {
LOG.warn("Could not obtain the status of " + fileArchivePath, e);
return 0L;
}
LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference.");
return 0L;
}
/**
* Writes the snapshot sizes to the {@code hbase:quota} table.
*
* @param snapshotsWithSize The snapshot sizes to write.
*/
void persistSnapshotSizes(
Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
// Write each snapshot size for the table
persistSnapshotSizes(quotaTable, snapshotsWithSize);
// Write a size entry for all snapshots in a namespace
persistSnapshotSizesByNS(quotaTable, snapshotsWithSize);
}
}
/**
* Writes the snapshot sizes to the provided {@code table}.
*/
void persistSnapshotSizes(
Table table, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
// Convert each entry in the map to a Put and write them to the quota table
table.put(snapshotsWithSize.entries()
.stream()
.map(e -> QuotaTableUtil.createPutForSnapshotSize(
e.getKey(), e.getValue().getName(), e.getValue().getSize()))
.collect(Collectors.toList()));
}
/**
* Rolls up the snapshot sizes by namespace and writes a single record for each namespace
* which is the size of all snapshots in that namespace.
*/
void persistSnapshotSizesByNS(
Table quotaTable, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
Map<String,Long> namespaceSnapshotSizes = groupSnapshotSizesByNamespace(snapshotsWithSize);
quotaTable.put(namespaceSnapshotSizes.entrySet().stream()
.map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(
e.getKey(), e.getValue()))
.collect(Collectors.toList()));
}
/**
* Sums the snapshot sizes for each namespace.
*/
Map<String,Long> groupSnapshotSizesByNamespace(
Multimap<TableName,SnapshotWithSize> snapshotsWithSize) {
return snapshotsWithSize.entries().stream()
.collect(Collectors.groupingBy(
// Convert TableName into the namespace string
(e) -> e.getKey().getNamespaceAsString(),
// Sum the values for namespace
Collectors.mapping(
Map.Entry::getValue, Collectors.summingLong((sws) -> sws.getSize()))));
}
/**
* A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is
* defined as the amount of filesystem space taken by the files the snapshot refers to which
* the originating table no longer refers to.
*/
static class SnapshotWithSize {
private final String name;
private final long size;
SnapshotWithSize(String name, long size) {
this.name = Objects.requireNonNull(name);
this.size = size;
}
String getName() {
return name;
}
long getSize() {
return size;
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(name).append(size).toHashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SnapshotWithSize)) {
return false;
}
SnapshotWithSize other = (SnapshotWithSize) o;
return name.equals(other.name) && size == other.size;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(32);
return sb.append("SnapshotWithSize:[").append(name).append(" ")
.append(StringUtils.byteDesc(size)).append("]").toString();
}
}
/**
* A reference to a collection of files in the archive directory for a single region.
*/
static class StoreFileReference {
private final String regionName;
private final Multimap<String,String> familyToFiles;
StoreFileReference(String regionName) {
this.regionName = Objects.requireNonNull(regionName);
familyToFiles = HashMultimap.create();
}
String getRegionName() {
return regionName;
}
Multimap<String,String> getFamilyToFilesMapping() {
return familyToFiles;
}
void addFamilyStoreFile(String family, String storeFileName) {
familyToFiles.put(family, storeFileName);
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof StoreFileReference)) {
return false;
}
StoreFileReference other = (StoreFileReference) o;
return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
return sb.append("StoreFileReference[region=").append(regionName).append(", files=")
.append(familyToFiles).append("]").toString();
}
}
/**
* Extracts the period for the chore from the configuration.
*
* @param conf The configuration object.
* @return The configured chore period or the default value.
*/
static int getPeriod(Configuration conf) {
return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY,
SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT);
}
/**
* Extracts the initial delay for the chore from the configuration.
*
* @param conf The configuration object.
* @return The configured chore initial delay or the default value.
*/
static long getInitialDelay(Configuration conf) {
return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY,
SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT);
}
/**
* Extracts the time unit for the chore period and initial delay from the configuration. The
* configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to
* a {@link TimeUnit} value.
*
* @param conf The configuration object.
* @return The configured time unit for the chore period and initial delay or the default value.
*/
static TimeUnit getTimeUnit(Configuration conf) {
return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY,
SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT));
}
}

View File

@ -18,17 +18,26 @@ package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
@ -41,6 +50,8 @@ import com.google.common.collect.Iterables;
*/
@InterfaceAudience.Private
public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
private static final Log LOG = LogFactory.getLog(TableQuotaSnapshotStore.class);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadLock rlock = lock.readLock();
private final WriteLock wlock = lock.writeLock();
@ -77,7 +88,8 @@ public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
}
@Override
public SpaceQuotaSnapshot getTargetState(TableName table, SpaceQuota spaceQuota) {
public SpaceQuotaSnapshot getTargetState(
TableName table, SpaceQuota spaceQuota) throws IOException {
rlock.lock();
try {
final long sizeLimitInBytes = spaceQuota.getSoftLimit();
@ -85,6 +97,8 @@ public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) {
sum += entry.getValue();
}
// Add in the size for any snapshots against this table
sum += getSnapshotSizesForTable(table);
// Observance is defined as the size of the table being less than the limit
SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation()
: new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));
@ -94,6 +108,42 @@ public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
}
}
/**
* Fetches any serialized snapshot sizes from the quota table for the {@code tn} provided. Any
* malformed records are skipped with a warning printed out.
*/
long getSnapshotSizesForTable(TableName tn) throws IOException {
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
Scan s = QuotaTableUtil.createScanForSnapshotSizes(tn);
ResultScanner rs = quotaTable.getScanner(s);
try {
long size = 0L;
// Should just be a single row (for our table)
for (Result result : rs) {
// May have multiple columns, one for each snapshot
CellScanner cs = result.cellScanner();
while (cs.advance()) {
Cell current = cs.current();
try {
long snapshotSize = QuotaTableUtil.parseSnapshotSize(current);
if (LOG.isTraceEnabled()) {
LOG.trace("Saw snapshot size of " + snapshotSize + " for " + current);
}
size += snapshotSize;
} catch (InvalidProtocolBufferException e) {
LOG.warn("Failed to parse snapshot size from cell: " + current);
}
}
}
return size;
} finally {
if (null != rs) {
rs.close();
}
}
}
}
@Override
public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) {
rlock.lock();

View File

@ -36,7 +36,7 @@ public class TableSpaceQuotaSnapshotNotifier implements SpaceQuotaSnapshotNotifi
@Override
public void transitionTable(
TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException {
final Put p = QuotaTableUtil.createPutSpaceSnapshot(tableName, snapshot);
final Put p = QuotaTableUtil.createPutForSpaceSnapshot(tableName, snapshot);
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Persisting a space quota snapshot " + snapshot + " for " + tableName);

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -2026,6 +2027,17 @@ public class HStore implements Store {
@Override
public long getStorefilesSize() {
// Include all StoreFiles
return getStorefilesSize(storeFile -> true);
}
@Override
public long getHFilesSize() {
// Include only StoreFiles which are HFiles
return getStorefilesSize(storeFile -> storeFile.isHFile());
}
private long getStorefilesSize(Predicate<StoreFile> predicate) {
long size = 0;
for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFileReader r = s.getReader();
@ -2033,7 +2045,9 @@ public class HStore implements Store {
LOG.warn("StoreFile " + s + " has a null Reader");
continue;
}
size += r.length();
if (predicate.test(s)) {
size += r.length();
}
}
return size;
}

View File

@ -401,6 +401,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
long getStorefilesSize();
/**
* @return The size of only the store files which are HFiles, in bytes.
*/
long getHFilesSize();
/**
* @return The size of the store file indexes, in bytes.
*/

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
@ -29,9 +30,11 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.Predicate;
@ -40,6 +43,10 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.rules.TestName;
@ -55,6 +62,7 @@ public class SpaceQuotaHelperForTests {
public static final String F1 = "f1";
public static final long ONE_KILOBYTE = 1024L;
public static final long ONE_MEGABYTE = ONE_KILOBYTE * ONE_KILOBYTE;
public static final long ONE_GIGABYTE = ONE_MEGABYTE * ONE_KILOBYTE;
private final HBaseTestingUtility testUtil;
private final TestName testName;
@ -67,6 +75,25 @@ public class SpaceQuotaHelperForTests {
this.counter = Objects.requireNonNull(counter);
}
//
// Static helpers
//
static void updateConfigForQuotas(Configuration conf) {
// Increase the frequency of some of the chores for responsiveness of the test
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000);
conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000);
conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000);
// The period at which we check for compacted files that should be deleted from HDFS
conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
}
//
// Helpers
//
@ -88,24 +115,33 @@ public class SpaceQuotaHelperForTests {
/**
* Removes all quotas defined in the HBase quota table.
*/
void removeAllQuotas(Connection conn) throws IOException {
QuotaRetriever scanner = QuotaRetriever.open(conn.getConfiguration());
try {
for (QuotaSettings quotaSettings : scanner) {
final String namespace = quotaSettings.getNamespace();
final TableName tableName = quotaSettings.getTableName();
if (namespace != null) {
LOG.debug("Deleting quota for namespace: " + namespace);
QuotaUtil.deleteNamespaceQuota(conn, namespace);
} else {
assert tableName != null;
LOG.debug("Deleting quota for table: "+ tableName);
QuotaUtil.deleteTableQuota(conn, tableName);
void removeAllQuotas(Connection conn) throws IOException, InterruptedException {
// Wait for the quota table to be created
if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
do {
LOG.debug("Quota table does not yet exist");
Thread.sleep(1000);
} while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME));
} else {
// Or, clean up any quotas from previous test runs.
QuotaRetriever scanner = QuotaRetriever.open(conn.getConfiguration());
try {
for (QuotaSettings quotaSettings : scanner) {
final String namespace = quotaSettings.getNamespace();
final TableName tableName = quotaSettings.getTableName();
if (namespace != null) {
LOG.debug("Deleting quota for namespace: " + namespace);
QuotaUtil.deleteNamespaceQuota(conn, namespace);
} else {
assert tableName != null;
LOG.debug("Deleting quota for table: "+ tableName);
QuotaUtil.deleteTableQuota(conn, tableName);
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}
}
@ -146,6 +182,15 @@ public class SpaceQuotaHelperForTests {
}
void writeData(Connection conn, TableName tn, long sizeInBytes) throws IOException {
writeData(tn, sizeInBytes, Bytes.toBytes("q1"));
}
void writeData(TableName tn, long sizeInBytes, String qual) throws IOException {
writeData(tn, sizeInBytes, Bytes.toBytes(qual));
}
void writeData(TableName tn, long sizeInBytes, byte[] qual) throws IOException {
final Connection conn = testUtil.getConnection();
final Table table = conn.getTable(tn);
try {
List<Put> updates = new ArrayList<>();
@ -160,7 +205,7 @@ public class SpaceQuotaHelperForTests {
Put p = new Put(Bytes.toBytes(sb.reverse().toString()));
byte[] value = new byte[SIZE_PER_VALUE];
r.nextBytes(value);
p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value);
p.addColumn(Bytes.toBytes(F1), qual, value);
updates.add(p);
// Batch ~13KB worth of updates
@ -188,6 +233,12 @@ public class SpaceQuotaHelperForTests {
}
}
NamespaceDescriptor createNamespace() throws Exception {
NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build();
testUtil.getAdmin().createNamespace(nd);
return nd;
}
Multimap<TableName, QuotaSettings> createTablesWithSpaceQuotas() throws Exception {
final Admin admin = testUtil.getAdmin();
final Multimap<TableName, QuotaSettings> tablesWithQuotas = HashMultimap.create();
@ -195,8 +246,7 @@ public class SpaceQuotaHelperForTests {
final TableName tn1 = createTable();
final TableName tn2 = createTable();
NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build();
admin.createNamespace(nd);
NamespaceDescriptor nd = createNamespace();
final TableName tn3 = createTableInNamespace(nd);
final TableName tn4 = createTableInNamespace(nd);
final TableName tn5 = createTableInNamespace(nd);
@ -233,6 +283,14 @@ public class SpaceQuotaHelperForTests {
return tablesWithQuotas;
}
TableName getNextTableName() {
return getNextTableName(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR);
}
TableName getNextTableName(String namespace) {
return TableName.valueOf(namespace, testName.getMethodName() + counter.getAndIncrement());
}
TableName createTable() throws Exception {
return createTableWithRegions(1);
}
@ -251,8 +309,7 @@ public class SpaceQuotaHelperForTests {
}
TableName createTableWithRegions(Admin admin, String namespace, int numRegions) throws Exception {
final TableName tn = TableName.valueOf(
namespace, testName.getMethodName() + counter.getAndIncrement());
final TableName tn = getNextTableName(namespace);
// Delete the old table
if (admin.tableExists(tn)) {
@ -308,4 +365,87 @@ public class SpaceQuotaHelperForTests {
}
}
}
/**
* Abstraction to simplify the case where a test needs to verify a certain state
* on a {@code SpaceQuotaSnapshot}. This class fails-fast when there is no such
* snapshot obtained from the Master. As such, it is not useful to verify the
* lack of a snapshot.
*/
static abstract class SpaceQuotaSnapshotPredicate implements Predicate<Exception> {
private final Connection conn;
private final TableName tn;
private final String ns;
SpaceQuotaSnapshotPredicate(Connection conn, TableName tn) {
this(Objects.requireNonNull(conn), Objects.requireNonNull(tn), null);
}
SpaceQuotaSnapshotPredicate(Connection conn, String ns) {
this(Objects.requireNonNull(conn), null, Objects.requireNonNull(ns));
}
SpaceQuotaSnapshotPredicate(Connection conn, TableName tn, String ns) {
if ((null != tn && null != ns) || (null == tn && null == ns)) {
throw new IllegalArgumentException(
"One of TableName and Namespace must be non-null, and the other null");
}
this.conn = conn;
this.tn = tn;
this.ns = ns;
}
@Override
public boolean evaluate() throws Exception {
SpaceQuotaSnapshot snapshot;
if (null == ns) {
snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn);
} else {
snapshot = QuotaTableUtil.getCurrentSnapshot(conn, ns);
}
LOG.debug("Saw quota snapshot for " + (null == tn ? ns : tn) + ": " + snapshot);
if (null == snapshot) {
return false;
}
return evaluate(snapshot);
}
/**
* Must determine if the given {@code SpaceQuotaSnapshot} meets some criteria.
*
* @param snapshot a non-null snapshot obtained from the HBase Master
* @return true if the criteria is met, false otherwise
*/
abstract boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception;
}
/**
* Predicate that waits for all store files in a table to have no compacted files.
*/
static class NoFilesToDischarge implements Predicate<Exception> {
private final MiniHBaseCluster cluster;
private final TableName tn;
NoFilesToDischarge(MiniHBaseCluster cluster, TableName tn) {
this.cluster = cluster;
this.tn = tn;
}
@Override
public boolean evaluate() throws Exception {
for (HRegion region : cluster.getRegions(tn)) {
for (Store store : region.getStores()) {
HStore hstore = (HStore) store;
Collection<StoreFile> files =
hstore.getStoreEngine().getStoreFileManager().getCompactedfiles();
if (null != files && !files.isEmpty()) {
LOG.debug(region.getRegionInfo().getEncodedName() + " still has compacted files");
return false;
}
}
}
return true;
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -238,13 +239,13 @@ public class TestFileSystemUtilizationChore {
final Configuration conf = getDefaultHBaseConfiguration();
final HRegionServer rs = mockRegionServer(conf);
// Three regions with multiple store sizes
// Two regions with multiple store sizes
final List<Long> r1Sizes = Arrays.asList(1024L, 2048L);
final long r1Sum = sum(r1Sizes);
final List<Long> r2Sizes = Arrays.asList(1024L * 1024L);
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum))))
doAnswer(new ExpectedRegionSizeSummationAnswer(r1Sum))
.when(rs)
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
@ -254,6 +255,33 @@ public class TestFileSystemUtilizationChore {
chore.chore();
}
@SuppressWarnings("unchecked")
@Test
public void testNonHFilesAreIgnored() {
final Configuration conf = getDefaultHBaseConfiguration();
final HRegionServer rs = mockRegionServer(conf);
// Region r1 has two store files, one hfile link and one hfile
final List<Long> r1StoreFileSizes = Arrays.asList(1024L, 2048L);
final List<Long> r1HFileSizes = Arrays.asList(0L, 2048L);
final long r1HFileSizeSum = sum(r1HFileSizes);
// Region r2 has one store file which is a hfile link
final List<Long> r2StoreFileSizes = Arrays.asList(1024L * 1024L);
final List<Long> r2HFileSizes = Arrays.asList(0L);
final long r2HFileSizeSum = sum(r2HFileSizes);
// We expect that only the hfiles would be counted (hfile links are ignored)
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
doAnswer(new ExpectedRegionSizeSummationAnswer(
sum(Arrays.asList(r1HFileSizeSum, r2HFileSizeSum))))
.when(rs).reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
final Region r1 = mockRegionWithHFileLinks(r1StoreFileSizes, r1HFileSizes);
final Region r2 = mockRegionWithHFileLinks(r2StoreFileSizes, r2HFileSizes);
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(r1, r2));
chore.chore();
}
/**
* Creates an HBase Configuration object for the default values.
*/
@ -298,9 +326,31 @@ public class TestFileSystemUtilizationChore {
List<Store> stores = new ArrayList<>();
when(r.getStores()).thenReturn(stores);
for (Long storeSize : storeSizes) {
final Store s = mock(Store.class);
stores.add(s);
when(s.getHFilesSize()).thenReturn(storeSize);
}
return r;
}
private Region mockRegionWithHFileLinks(Collection<Long> storeSizes, Collection<Long> hfileSizes) {
final Region r = mock(Region.class);
final HRegionInfo info = mock(HRegionInfo.class);
when(r.getRegionInfo()).thenReturn(info);
List<Store> stores = new ArrayList<>();
when(r.getStores()).thenReturn(stores);
assertEquals(
"Logic error, storeSizes and linkSizes must be equal in size", storeSizes.size(),
hfileSizes.size());
Iterator<Long> storeSizeIter = storeSizes.iterator();
Iterator<Long> hfileSizeIter = hfileSizes.iterator();
while (storeSizeIter.hasNext() && hfileSizeIter.hasNext()) {
final long storeSize = storeSizeIter.next();
final long hfileSize = hfileSizeIter.next();
final Store s = mock(Store.class);
stores.add(s);
when(s.getStorefilesSize()).thenReturn(storeSize);
when(s.getHFilesSize()).thenReturn(hfileSize);
}
return r;
}

View File

@ -23,6 +23,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -31,6 +32,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
@ -91,7 +95,8 @@ public class TestNamespaceQuotaViolationStore {
}
@Test
public void testTargetViolationState() {
public void testTargetViolationState() throws IOException {
mockNoSnapshotSizes();
final String NS = "ns";
TableName tn1 = TableName.valueOf(NS, "tn1");
TableName tn2 = TableName.valueOf(NS, "tn2");
@ -123,7 +128,8 @@ public class TestNamespaceQuotaViolationStore {
// Exceeds the quota, should be in violation
assertEquals(true, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
assertEquals(SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy());
assertEquals(
SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy());
}
@Test
@ -153,4 +159,9 @@ public class TestNamespaceQuotaViolationStore {
assertEquals(18, size(store.filterBySubject("ns")));
}
void mockNoSnapshotSizes() throws IOException {
Table quotaTable = mock(Table.class);
when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
when(quotaTable.get(any(Get.class))).thenReturn(new Result());
}
}

View File

@ -65,11 +65,8 @@ public class TestQuotaObserverChoreRegionReports {
@Before
public void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
// Increase the frequency of some of the chores for responsiveness of the test
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
conf.setInt(QuotaObserverChore.REGION_REPORT_RETENTION_DURATION_KEY, 1000);
}

View File

@ -77,11 +77,7 @@ public class TestQuotaObserverChoreWithMiniCluster {
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY,
SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class);
TEST_UTIL.startMiniCluster(1);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.hadoop.hbase.quotas.policies.MissingSnapshotViolationPolicyEnforcement;
@ -65,13 +66,7 @@ public class TestQuotaStatusRPCs {
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Increase the frequency of some of the chores for responsiveness of the test
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
TEST_UTIL.startMiniCluster(1);
}
@ -167,7 +162,7 @@ public class TestQuotaStatusRPCs {
// Write at least `tableSize` data
try {
helper.writeData(tn, tableSize);
} catch (SpaceLimitingException e) {
} catch (RetriesExhaustedWithDetailsException | SpaceLimitingException e) {
// Pass
}
@ -245,7 +240,7 @@ public class TestQuotaStatusRPCs {
try {
helper.writeData(tn, tableSize * 2L);
} catch (SpaceLimitingException e) {
} catch (RetriesExhaustedWithDetailsException | SpaceLimitingException e) {
// Pass
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@ -27,8 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -37,7 +42,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -211,9 +218,9 @@ public class TestQuotaTableUtil {
final SpaceQuotaSnapshot snapshot3 = new SpaceQuotaSnapshot(
new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 512L, 1024L);
List<Put> puts = new ArrayList<>();
puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn1, snapshot1));
puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn2, snapshot2));
puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn3, snapshot3));
puts.add(QuotaTableUtil.createPutForSpaceSnapshot(tn1, snapshot1));
puts.add(QuotaTableUtil.createPutForSpaceSnapshot(tn2, snapshot2));
puts.add(QuotaTableUtil.createPutForSpaceSnapshot(tn3, snapshot3));
final Map<TableName,SpaceQuotaSnapshot> expectedPolicies = new HashMap<>();
expectedPolicies.put(tn1, snapshot1);
expectedPolicies.put(tn2, snapshot2);
@ -232,7 +239,59 @@ public class TestQuotaTableUtil {
assertEquals(expectedPolicies, actualPolicies);
}
@Test
public void testSerdeTableSnapshotSizes() throws Exception {
TableName tn1 = TableName.valueOf("tn1");
TableName tn2 = TableName.valueOf("tn2");
try (Table quotaTable = connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
for (int i = 0; i < 3; i++) {
Put p = QuotaTableUtil.createPutForSnapshotSize(tn1, "tn1snap" + i, 1024L * (1+i));
quotaTable.put(p);
}
for (int i = 0; i < 3; i++) {
Put p = QuotaTableUtil.createPutForSnapshotSize(tn2, "tn2snap" + i, 2048L * (1+i));
quotaTable.put(p);
}
verifyTableSnapshotSize(quotaTable, tn1, "tn1snap0", 1024L);
verifyTableSnapshotSize(quotaTable, tn1, "tn1snap1", 2048L);
verifyTableSnapshotSize(quotaTable, tn1, "tn1snap2", 3072L);
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap0", 2048L);
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap1", 4096L);
verifyTableSnapshotSize(quotaTable, tn2, "tn2snap2", 6144L);
}
}
@Test
public void testReadNamespaceSnapshotSizes() throws Exception {
String ns1 = "ns1";
String ns2 = "ns2";
String defaultNs = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
try (Table quotaTable = connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
quotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns1, 1024L));
quotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns2, 2048L));
quotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(defaultNs, 8192L));
assertEquals(1024L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns1));
assertEquals(2048L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns2));
assertEquals(8192L, QuotaTableUtil.getNamespaceSnapshotSize(connection, defaultNs));
}
}
private TableName getUniqueTableName() {
return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++);
}
private void verifyTableSnapshotSize(
Table quotaTable, TableName tn, String snapshotName, long expectedSize) throws IOException {
Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshotName));
CellScanner cs = r.cellScanner();
assertTrue(cs.advance());
Cell c = cs.current();
assertEquals(expectedSize, QuotaProtos.SpaceQuotaSnapshot.parseFrom(
UnsafeByteOperations.unsafeWrap(
c.getValueArray(), c.getValueOffset(), c.getValueLength())).getQuotaUsage());
assertFalse(cs.advance());
}
}

View File

@ -67,9 +67,8 @@ public class TestRegionSizeUse {
@Before
public void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
// Increase the frequency of some of the chores for responsiveness of the test
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
cluster = TEST_UTIL.startMiniCluster(2);
}

View File

@ -0,0 +1,368 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore.SnapshotWithSize;
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.NoFilesToDischarge;
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
/**
* Test class for the {@link SnapshotQuotaObserverChore}.
*/
@Category(MediumTests.class)
public class TestSnapshotQuotaObserverChore {
private static final Log LOG = LogFactory.getLog(TestSnapshotQuotaObserverChore.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final AtomicLong COUNTER = new AtomicLong();
@Rule
public TestName testName = new TestName();
private Connection conn;
private Admin admin;
private SpaceQuotaHelperForTests helper;
private HMaster master;
private SnapshotQuotaObserverChore testChore;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
// Clean up the compacted files faster than normal (15s instead of 2mins)
conf.setInt("hbase.hfile.compaction.discharger.interval", 15 * 1000);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setup() throws Exception {
conn = TEST_UTIL.getConnection();
admin = TEST_UTIL.getAdmin();
helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
master = TEST_UTIL.getHBaseCluster().getMaster();
helper.removeAllQuotas(conn);
testChore = new SnapshotQuotaObserverChore(
TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration(), master.getFileSystem(), master,
null);
}
@Test
public void testSnapshotSizePersistence() throws IOException {
final Admin admin = TEST_UTIL.getAdmin();
final TableName tn = TableName.valueOf("quota_snapshotSizePersistence");
if (admin.tableExists(tn)) {
admin.disableTable(tn);
admin.deleteTable(tn);
}
HTableDescriptor desc = new HTableDescriptor(tn);
desc.addFamily(new HColumnDescriptor(QuotaTableUtil.QUOTA_FAMILY_USAGE));
admin.createTable(desc);
Multimap<TableName,SnapshotWithSize> snapshotsWithSizes = HashMultimap.create();
try (Table table = conn.getTable(tn)) {
// Writing no values will result in no records written.
verify(table, () -> {
testChore.persistSnapshotSizes(table, snapshotsWithSizes);
assertEquals(0, count(table));
});
verify(table, () -> {
TableName originatingTable = TableName.valueOf("t1");
snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss1", 1024L));
snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss2", 4096L));
testChore.persistSnapshotSizes(table, snapshotsWithSizes);
assertEquals(2, count(table));
assertEquals(1024L, extractSnapshotSize(table, originatingTable, "ss1"));
assertEquals(4096L, extractSnapshotSize(table, originatingTable, "ss2"));
});
snapshotsWithSizes.clear();
verify(table, () -> {
snapshotsWithSizes.put(TableName.valueOf("t1"), new SnapshotWithSize("ss1", 1024L));
snapshotsWithSizes.put(TableName.valueOf("t2"), new SnapshotWithSize("ss2", 4096L));
snapshotsWithSizes.put(TableName.valueOf("t3"), new SnapshotWithSize("ss3", 8192L));
testChore.persistSnapshotSizes(table, snapshotsWithSizes);
assertEquals(3, count(table));
assertEquals(1024L, extractSnapshotSize(table, TableName.valueOf("t1"), "ss1"));
assertEquals(4096L, extractSnapshotSize(table, TableName.valueOf("t2"), "ss2"));
assertEquals(8192L, extractSnapshotSize(table, TableName.valueOf("t3"), "ss3"));
});
}
}
@Test
public void testSnapshotsFromTables() throws Exception {
TableName tn1 = helper.createTableWithRegions(1);
TableName tn2 = helper.createTableWithRegions(1);
TableName tn3 = helper.createTableWithRegions(1);
// Set a space quota on table 1 and 2 (but not 3)
admin.setQuota(QuotaSettingsFactory.limitTableSpace(
tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS));
admin.setQuota(QuotaSettingsFactory.limitTableSpace(
tn2, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS));
// Create snapshots on each table (we didn't write any data, so just skipflush)
admin.snapshot(new SnapshotDescription(tn1 + "snapshot", tn1, SnapshotType.SKIPFLUSH));
admin.snapshot(new SnapshotDescription(tn2 + "snapshot", tn2, SnapshotType.SKIPFLUSH));
admin.snapshot(new SnapshotDescription(tn3 + "snapshot", tn3, SnapshotType.SKIPFLUSH));
Multimap<TableName,String> mapping = testChore.getSnapshotsToComputeSize();
assertEquals(2, mapping.size());
assertEquals(1, mapping.get(tn1).size());
assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next());
assertEquals(1, mapping.get(tn2).size());
assertEquals(tn2 + "snapshot", mapping.get(tn2).iterator().next());
admin.snapshot(new SnapshotDescription(tn2 + "snapshot1", tn2, SnapshotType.SKIPFLUSH));
admin.snapshot(new SnapshotDescription(tn3 + "snapshot1", tn3, SnapshotType.SKIPFLUSH));
mapping = testChore.getSnapshotsToComputeSize();
assertEquals(3, mapping.size());
assertEquals(1, mapping.get(tn1).size());
assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next());
assertEquals(2, mapping.get(tn2).size());
assertEquals(
new HashSet<String>(Arrays.asList(tn2 + "snapshot", tn2 + "snapshot1")), mapping.get(tn2));
}
@Test
public void testSnapshotsFromNamespaces() throws Exception {
NamespaceDescriptor ns = NamespaceDescriptor.create("snapshots_from_namespaces").build();
admin.createNamespace(ns);
TableName tn1 = helper.createTableWithRegions(ns.getName(), 1);
TableName tn2 = helper.createTableWithRegions(ns.getName(), 1);
TableName tn3 = helper.createTableWithRegions(1);
// Set a space quota on the namespace
admin.setQuota(QuotaSettingsFactory.limitNamespaceSpace(
ns.getName(), SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS));
// Create snapshots on each table (we didn't write any data, so just skipflush)
admin.snapshot(new SnapshotDescription(
tn1.getQualifierAsString() + "snapshot", tn1, SnapshotType.SKIPFLUSH));
admin.snapshot(new SnapshotDescription(
tn2.getQualifierAsString() + "snapshot", tn2, SnapshotType.SKIPFLUSH));
admin.snapshot(new SnapshotDescription(
tn3.getQualifierAsString() + "snapshot", tn3, SnapshotType.SKIPFLUSH));
Multimap<TableName,String> mapping = testChore.getSnapshotsToComputeSize();
assertEquals(2, mapping.size());
assertEquals(1, mapping.get(tn1).size());
assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next());
assertEquals(1, mapping.get(tn2).size());
assertEquals(tn2.getQualifierAsString() + "snapshot", mapping.get(tn2).iterator().next());
admin.snapshot(new SnapshotDescription(
tn2.getQualifierAsString() + "snapshot1", tn2, SnapshotType.SKIPFLUSH));
admin.snapshot(new SnapshotDescription(
tn3.getQualifierAsString() + "snapshot2", tn3, SnapshotType.SKIPFLUSH));
mapping = testChore.getSnapshotsToComputeSize();
assertEquals(3, mapping.size());
assertEquals(1, mapping.get(tn1).size());
assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next());
assertEquals(2, mapping.get(tn2).size());
assertEquals(
new HashSet<String>(Arrays.asList(tn2.getQualifierAsString() + "snapshot",
tn2.getQualifierAsString() + "snapshot1")), mapping.get(tn2));
}
@Test
public void testSnapshotSize() throws Exception {
// Create a table and set a quota
TableName tn1 = helper.createTableWithRegions(5);
admin.setQuota(QuotaSettingsFactory.limitTableSpace(
tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS));
// Write some data and flush it
helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE);
admin.flush(tn1);
final AtomicReference<Long> lastSeenSize = new AtomicReference<>();
// Wait for the Master chore to run to see the usage (with a fudge factor)
TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) {
@Override
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
lastSeenSize.set(snapshot.getUsage());
return snapshot.getUsage() > 230L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
}
});
// Create a snapshot on the table
final String snapshotName = tn1 + "snapshot";
admin.snapshot(new SnapshotDescription(snapshotName, tn1, SnapshotType.SKIPFLUSH));
// Get the snapshots
Multimap<TableName,String> snapshotsToCompute = testChore.getSnapshotsToComputeSize();
assertEquals(
"Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size());
// Get the size of our snapshot
Multimap<TableName,SnapshotWithSize> snapshotsWithSize = testChore.computeSnapshotSizes(
snapshotsToCompute);
assertEquals(1, snapshotsWithSize.size());
SnapshotWithSize sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1));
assertEquals(snapshotName, sws.getName());
// The snapshot should take up no space since the table refers to it completely
assertEquals(0, sws.getSize());
// Write some more data, flush it, and then major_compact the table
helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE);
admin.flush(tn1);
TEST_UTIL.compact(tn1, true);
// Test table should reflect it's original size since ingest was deterministic
TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) {
@Override
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
LOG.debug("Current usage=" + snapshot.getUsage() + " lastSeenSize=" + lastSeenSize.get());
return closeInSize(
snapshot.getUsage(), lastSeenSize.get(), SpaceQuotaHelperForTests.ONE_KILOBYTE);
}
});
// Wait for no compacted files on the regions of our table
TEST_UTIL.waitFor(30_000, new NoFilesToDischarge(TEST_UTIL.getMiniHBaseCluster(), tn1));
// Still should see only one snapshot
snapshotsToCompute = testChore.getSnapshotsToComputeSize();
assertEquals(
"Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size());
snapshotsWithSize = testChore.computeSnapshotSizes(
snapshotsToCompute);
assertEquals(1, snapshotsWithSize.size());
sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1));
assertEquals(snapshotName, sws.getName());
// The snapshot should take up the size the table originally took up
assertEquals(lastSeenSize.get().longValue(), sws.getSize());
}
@Test
public void testPersistingSnapshotsForNamespaces() throws Exception {
Multimap<TableName,SnapshotWithSize> snapshotsWithSizes = HashMultimap.create();
TableName tn1 = TableName.valueOf("ns1:tn1");
TableName tn2 = TableName.valueOf("ns1:tn2");
TableName tn3 = TableName.valueOf("ns2:tn1");
TableName tn4 = TableName.valueOf("ns2:tn2");
TableName tn5 = TableName.valueOf("tn1");
snapshotsWithSizes.put(tn1, new SnapshotWithSize("", 1024L));
snapshotsWithSizes.put(tn2, new SnapshotWithSize("", 1024L));
snapshotsWithSizes.put(tn3, new SnapshotWithSize("", 512L));
snapshotsWithSizes.put(tn4, new SnapshotWithSize("", 1024L));
snapshotsWithSizes.put(tn5, new SnapshotWithSize("", 3072L));
Map<String,Long> nsSizes = testChore.groupSnapshotSizesByNamespace(snapshotsWithSizes);
assertEquals(3, nsSizes.size());
assertEquals(2048L, (long) nsSizes.get("ns1"));
assertEquals(1536L, (long) nsSizes.get("ns2"));
assertEquals(3072L, (long) nsSizes.get(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR));
}
private long count(Table t) throws IOException {
try (ResultScanner rs = t.getScanner(new Scan())) {
long sum = 0;
for (Result r : rs) {
while (r.advance()) {
sum++;
}
}
return sum;
}
}
private long extractSnapshotSize(
Table quotaTable, TableName tn, String snapshot) throws IOException {
Get g = QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot);
Result r = quotaTable.get(g);
assertNotNull(r);
CellScanner cs = r.cellScanner();
cs.advance();
Cell c = cs.current();
assertNotNull(c);
return QuotaTableUtil.extractSnapshotSize(
c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
private void verify(Table t, IOThrowingRunnable test) throws IOException {
admin.disableTable(t.getName());
admin.truncateTable(t.getName(), false);
test.run();
}
@FunctionalInterface
private interface IOThrowingRunnable {
void run() throws IOException;
}
/**
* Computes if {@code size2} is within {@code delta} of {@code size1}, inclusive.
*/
boolean closeInSize(long size1, long size2, long delta) {
long lower = size1 - delta;
long upper = size1 + delta;
return lower <= size2 && size2 <= upper;
}
}

View File

@ -79,6 +79,7 @@ import org.junit.rules.TestName;
public class TestSpaceQuotas {
private static final Log LOG = LogFactory.getLog(TestSpaceQuotas.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
// Global for all tests in the class
private static final AtomicLong COUNTER = new AtomicLong(0);
private static final int NUM_RETRIES = 10;
@ -89,14 +90,7 @@ public class TestSpaceQuotas {
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Increase the frequency of some of the chores for responsiveness of the test
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
TEST_UTIL.startMiniCluster(1);
}

View File

@ -0,0 +1,448 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import com.google.common.collect.Iterables;
/**
* Test class to exercise the inclusion of snapshots in space quotas
*/
@Category({LargeTests.class})
public class TestSpaceQuotasWithSnapshots {
private static final Log LOG = LogFactory.getLog(TestSpaceQuotasWithSnapshots.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
// Global for all tests in the class
private static final AtomicLong COUNTER = new AtomicLong(0);
private static final long FUDGE_FOR_TABLE_SIZE = 500L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
@Rule
public TestName testName = new TestName();
private SpaceQuotaHelperForTests helper;
private Connection conn;
private Admin admin;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void removeAllQuotas() throws Exception {
helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
conn = TEST_UTIL.getConnection();
admin = TEST_UTIL.getAdmin();
}
@Test
public void testTablesInheritSnapshotSize() throws Exception {
TableName tn = helper.createTableWithRegions(1);
LOG.info("Writing data");
// Set a quota
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
admin.setQuota(settings);
// Write some data
final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
helper.writeData(tn, initialSize);
LOG.info("Waiting until table size reflects written data");
// Wait until that data is seen by the master
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= initialSize;
}
});
// Make sure we see the final quota usage size
waitForStableQuotaSize(conn, tn, null);
// The actual size on disk after we wrote our data the first time
final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, tn).getUsage();
LOG.info("Initial table size was " + actualInitialSize);
LOG.info("Snapshot the table");
final String snapshot1 = tn.toString() + "_snapshot1";
admin.snapshot(snapshot1, tn);
// Write the same data again, then flush+compact. This should make sure that
// the snapshot is referencing files that the table no longer references.
LOG.info("Write more data");
helper.writeData(tn, initialSize);
LOG.info("Flush the table");
admin.flush(tn);
LOG.info("Synchronously compacting the table");
TEST_UTIL.compact(tn, true);
final long upperBound = initialSize + FUDGE_FOR_TABLE_SIZE;
final long lowerBound = initialSize - FUDGE_FOR_TABLE_SIZE;
// Store the actual size after writing more data and then compacting it down to one file
LOG.info("Waiting for the region reports to reflect the correct size, between ("
+ lowerBound + ", " + upperBound + ")");
TEST_UTIL.waitFor(30 * 1000, 500, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
long size = getRegionSizeReportForTable(conn, tn);
return size < upperBound && size > lowerBound;
}
});
// Make sure we see the "final" new size for the table, not some intermediate
waitForStableRegionSizeReport(conn, tn);
final long finalSize = getRegionSizeReportForTable(conn, tn);
assertNotNull("Did not expect to see a null size", finalSize);
LOG.info("Last seen size: " + finalSize);
// Make sure the QuotaObserverChore has time to reflect the new region size reports
// (we saw above). The usage of the table should *not* decrease when we check it below,
// though, because the snapshot on our table will cause the table to "retain" the size.
TEST_UTIL.waitFor(20 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
@Override
public boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= finalSize;
}
});
// The final usage should be the sum of the initial size (referenced by the snapshot) and the
// new size we just wrote above.
long expectedFinalSize = actualInitialSize + finalSize;
LOG.info(
"Expecting table usage to be " + actualInitialSize + " + " + finalSize
+ " = " + expectedFinalSize);
// The size of the table (WRT quotas) should now be approximately double what it was previously
TEST_UTIL.waitFor(30 * 1000, 1000, new SpaceQuotaSnapshotPredicate(conn, tn) {
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
LOG.debug("Checking for " + expectedFinalSize + " == " + snapshot.getUsage());
return expectedFinalSize == snapshot.getUsage();
}
});
}
@Test
public void testNamespacesInheritSnapshotSize() throws Exception {
String ns = helper.createNamespace().getName();
TableName tn = helper.createTableWithRegions(ns, 1);
LOG.info("Writing data");
// Set a quota
QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(
ns, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
admin.setQuota(settings);
// Write some data and flush it to disk
final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
helper.writeData(tn, initialSize);
admin.flush(tn);
LOG.info("Waiting until namespace size reflects written data");
// Wait until that data is seen by the master
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, ns) {
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= initialSize;
}
});
// Make sure we see the "final" new size for the table, not some intermediate
waitForStableQuotaSize(conn, null, ns);
// The actual size on disk after we wrote our data the first time
final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, ns).getUsage();
LOG.info("Initial table size was " + actualInitialSize);
LOG.info("Snapshot the table");
final String snapshot1 = tn.getQualifierAsString() + "_snapshot1";
admin.snapshot(snapshot1, tn);
// Write the same data again, then flush+compact. This should make sure that
// the snapshot is referencing files that the table no longer references.
LOG.info("Write more data");
helper.writeData(tn, initialSize);
LOG.info("Flush the table");
admin.flush(tn);
LOG.info("Synchronously compacting the table");
TEST_UTIL.compact(tn, true);
final long upperBound = initialSize + FUDGE_FOR_TABLE_SIZE;
final long lowerBound = initialSize - FUDGE_FOR_TABLE_SIZE;
LOG.info("Waiting for the region reports to reflect the correct size, between ("
+ lowerBound + ", " + upperBound + ")");
TEST_UTIL.waitFor(30 * 1000, 500, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Map<TableName,Long> sizes = QuotaTableUtil.getMasterReportedTableSizes(conn);
LOG.debug("Master observed table sizes from region size reports: " + sizes);
Long size = sizes.get(tn);
if (null == size) {
return false;
}
return size < upperBound && size > lowerBound;
}
});
// Make sure we see the "final" new size for the table, not some intermediate
waitForStableRegionSizeReport(conn, tn);
final long finalSize = getRegionSizeReportForTable(conn, tn);
assertNotNull("Did not expect to see a null size", finalSize);
LOG.info("Final observed size of table: " + finalSize);
// Make sure the QuotaObserverChore has time to reflect the new region size reports
// (we saw above). The usage of the table should *not* decrease when we check it below,
// though, because the snapshot on our table will cause the table to "retain" the size.
TEST_UTIL.waitFor(20 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, ns) {
@Override
public boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= finalSize;
}
});
// The final usage should be the sum of the initial size (referenced by the snapshot) and the
// new size we just wrote above.
long expectedFinalSize = actualInitialSize + finalSize;
LOG.info(
"Expecting namespace usage to be " + actualInitialSize + " + " + finalSize
+ " = " + expectedFinalSize);
// The size of the table (WRT quotas) should now be approximately double what it was previously
TEST_UTIL.waitFor(30 * 1000, 1000, new SpaceQuotaSnapshotPredicate(conn, ns) {
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
LOG.debug("Checking for " + expectedFinalSize + " == " + snapshot.getUsage());
return expectedFinalSize == snapshot.getUsage();
}
});
}
@Test
public void testTablesWithSnapshots() throws Exception {
final Connection conn = TEST_UTIL.getConnection();
final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS;
final TableName tn = helper.createTableWithRegions(10);
// 3MB limit on the table
final long tableLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy));
LOG.info("Writing first data set");
// Write more data than should be allowed and flush it to disk
helper.writeData(tn, 1L * SpaceQuotaHelperForTests.ONE_MEGABYTE, "q1");
LOG.info("Creating snapshot");
TEST_UTIL.getAdmin().snapshot(tn.toString() + "snap1", tn, SnapshotType.FLUSH);
LOG.info("Writing second data set");
// Write some more data
helper.writeData(tn, 1L * SpaceQuotaHelperForTests.ONE_MEGABYTE, "q2");
LOG.info("Flushing and major compacting table");
// Compact the table to force the snapshot to own all of its files
TEST_UTIL.getAdmin().flush(tn);
TEST_UTIL.compact(tn, true);
LOG.info("Checking for quota violation");
// Wait to observe the quota moving into violation
TEST_UTIL.waitFor(60_000, 1_000, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Scan s = QuotaTableUtil.makeQuotaSnapshotScanForTable(tn);
try (Table t = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
ResultScanner rs = t.getScanner(s);
try {
Result r = Iterables.getOnlyElement(rs);
CellScanner cs = r.cellScanner();
assertTrue(cs.advance());
Cell c = cs.current();
SpaceQuotaSnapshot snapshot = SpaceQuotaSnapshot.toSpaceQuotaSnapshot(
QuotaProtos.SpaceQuotaSnapshot.parseFrom(
UnsafeByteOperations.unsafeWrap(
c.getValueArray(), c.getValueOffset(), c.getValueLength())));
LOG.info(
snapshot.getUsage() + "/" + snapshot.getLimit() + " " + snapshot.getQuotaStatus());
// We expect to see the table move to violation
return snapshot.getQuotaStatus().isInViolation();
} finally {
if (null != rs) {
rs.close();
}
}
}
}
});
}
@Test
public void testRematerializedTablesDoNoInheritSpace() throws Exception {
TableName tn = helper.createTableWithRegions(1);
TableName tn2 = helper.getNextTableName();
LOG.info("Writing data");
// Set a quota on both tables
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
admin.setQuota(settings);
QuotaSettings settings2 = QuotaSettingsFactory.limitTableSpace(
tn2, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
admin.setQuota(settings2);
// Write some data
final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
helper.writeData(tn, initialSize);
LOG.info("Waiting until table size reflects written data");
// Wait until that data is seen by the master
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= initialSize;
}
});
// Make sure we see the final quota usage size
waitForStableQuotaSize(conn, tn, null);
// The actual size on disk after we wrote our data the first time
final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, tn).getUsage();
LOG.info("Initial table size was " + actualInitialSize);
LOG.info("Snapshot the table");
final String snapshot1 = tn.toString() + "_snapshot1";
admin.snapshot(snapshot1, tn);
admin.cloneSnapshot(snapshot1, tn2);
// Write some more data to the first table
helper.writeData(tn, initialSize, "q2");
admin.flush(tn);
// Watch the usage of the first table with some more data to know when the new
// region size reports were sent to the master
TEST_UTIL.waitFor(30_000, 1_000, new SpaceQuotaSnapshotPredicate(conn, tn) {
@Override
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= actualInitialSize * 2;
}
});
// We know that reports were sent by our RS, verify that they take up zero size.
SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn2);
assertNotNull(snapshot);
assertEquals(0, snapshot.getUsage());
// Compact the cloned table to force it to own its own files.
TEST_UTIL.compact(tn2, true);
// After the table is compacted, it should have its own files and be the same size as originally
TEST_UTIL.waitFor(30_000, 1_000, new SpaceQuotaSnapshotPredicate(conn, tn2) {
@Override
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() == actualInitialSize;
}
});
}
void waitForStableQuotaSize(Connection conn, TableName tn, String ns) throws Exception {
// For some stability in the value before proceeding
// Helps make sure that we got the actual last value, not some inbetween
AtomicLong lastValue = new AtomicLong(-1);
AtomicInteger counter = new AtomicInteger(0);
TEST_UTIL.waitFor(15_000, 500, new SpaceQuotaSnapshotPredicate(conn, tn, ns) {
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
LOG.debug("Last observed size=" + lastValue.get());
if (snapshot.getUsage() == lastValue.get()) {
int numMatches = counter.incrementAndGet();
if (numMatches >= 5) {
return true;
}
// Not yet..
return false;
}
counter.set(0);
lastValue.set(snapshot.getUsage());
return false;
}
});
}
long getRegionSizeReportForTable(Connection conn, TableName tn) throws IOException {
Map<TableName,Long> sizes = QuotaTableUtil.getMasterReportedTableSizes(conn);
Long value = sizes.get(tn);
if (null == value) {
return 0L;
}
return value.longValue();
}
void waitForStableRegionSizeReport(Connection conn, TableName tn) throws Exception {
// For some stability in the value before proceeding
// Helps make sure that we got the actual last value, not some inbetween
AtomicLong lastValue = new AtomicLong(-1);
AtomicInteger counter = new AtomicInteger(0);
TEST_UTIL.waitFor(15_000, 500, new Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
LOG.debug("Last observed size=" + lastValue.get());
long actual = getRegionSizeReportForTable(conn, tn);
if (actual == lastValue.get()) {
int numMatches = counter.incrementAndGet();
if (numMatches >= 5) {
return true;
}
// Not yet..
return false;
}
counter.set(0);
lastValue.set(actual);
return false;
}
});
}
}

View File

@ -76,13 +76,7 @@ public class TestSuperUserQuotaPermissions {
public static void setupMiniCluster() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Increase the frequency of some of the chores for responsiveness of the test
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000);
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName());

View File

@ -23,6 +23,8 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -30,6 +32,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
@ -87,7 +93,8 @@ public class TestTableQuotaViolationStore {
}
@Test
public void testTargetViolationState() {
public void testTargetViolationState() throws IOException {
mockNoSnapshotSizes();
TableName tn1 = TableName.valueOf("violation1");
TableName tn2 = TableName.valueOf("observance1");
TableName tn3 = TableName.valueOf("observance2");
@ -154,4 +161,12 @@ public class TestTableQuotaViolationStore {
quotaRef.set(quotaWithoutSpace);
assertNull(mockStore.getSpaceQuota(TableName.valueOf("foo")));
}
void mockNoSnapshotSizes() throws IOException {
Table quotaTable = mock(Table.class);
ResultScanner scanner = mock(ResultScanner.class);
when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
when(scanner.iterator()).thenReturn(Collections.<Result> emptyList().iterator());
}
}