diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java index 1b670e67a3e..c1863a7d63e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -70,16 +71,18 @@ import org.apache.hadoop.hbase.util.Strings; /** * Helper class to interact with the quota table. - *
- *     ROW-KEY      FAM/QUAL        DATA
- *   n.<namespace> q:s         <global-quotas>
- *   t.<namespace> u:p        <namespace-quota policy>
- *   t.<table>     q:s         <global-quotas>
- *   t.<table>     u:p        <table-quota policy>
- *   u.<user>      q:s         <global-quotas>
- *   u.<user>      q:s.<table> <table-quotas>
- *   u.<user>      q:s.<ns>:   <namespace-quotas>
- * 
+ * + * + * + * + * + * + * + * + * + * + * + *
ROW-KEYFAM/QUALDATA
n.<namespace>q:s<global-quotas>
n.<namespace>u:p<namespace-quota policy>
n.<namespace>u:s<SpaceQuotaSnapshot>
t.<table>q:s<global-quotas>
t.<table>u:p<table-quota policy>
t.<table>u:ss.<snapshot name><SpaceQuotaSnapshot>
u.<user>q:s<global-quotas>
u.<user>q:s.<table><table-quotas>
u.<user>q:s.<ns><namespace-quotas>
procedureExecutor; private WALProcedureStore procedureStore; @@ -896,6 +898,10 @@ public class HMaster extends HRegionServer implements MasterServices { this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics()); // Start the chore to read the region FS space reports and act on them getChoreService().scheduleChore(quotaObserverChore); + + this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics()); + // Start the chore to read snapshots and add their usage to table/NS quotas + getChoreService().scheduleChore(snapshotQuotaChore); } // clear the dead servers with same host name and port of online server because we are not @@ -1240,6 +1246,9 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.quotaObserverChore != null) { quotaObserverChore.cancel(); } + if (this.snapshotQuotaChore != null) { + snapshotQuotaChore.cancel(); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java index 2810d14570e..c896441c5f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java @@ -156,4 +156,25 @@ public class MetricsMaster { } }; } + + /** + * Sets the execution time of a period of the {@code SnapshotQuotaObserverChore}. + */ + public void incrementSnapshotObserverTime(final long executionTime) { + masterQuotaSource.incrementSnapshotObserverChoreTime(executionTime); + } + + /** + * Sets the execution time to compute the size of a single snapshot. + */ + public void incrementSnapshotSizeComputationTime(final long executionTime) { + masterQuotaSource.incrementSnapshotObserverSnapshotComputationTime(executionTime); + } + + /** + * Sets the execution time to fetch the mapping of snapshots to originating table. + */ + public void incrementSnapshotFetchTime(long executionTime) { + masterQuotaSource.incrementSnapshotObserverSnapshotFetchTime(executionTime); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java index 418a1635a1b..1e4468647b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -168,9 +170,10 @@ public class FileSystemUtilizationChore extends ScheduledChore { long computeSize(Region r) { long regionSize = 0L; for (Store store : r.getStores()) { - // StoreFile/StoreFileReaders are already instantiated with the file length cached. - // Can avoid extra NN ops. - regionSize += store.getStorefilesSize(); + regionSize += store.getHFilesSize(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Size of " + r + " is " + regionSize); } return regionSize; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaSnapshotStore.java index f93d33dedbc..170879ae469 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaSnapshotStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaSnapshotStore.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.quotas; import java.io.IOException; import java.util.Map; -import java.util.Objects; import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -77,7 +77,8 @@ public class NamespaceQuotaSnapshotStore implements QuotaSnapshotStore { } @Override - public SpaceQuotaSnapshot getTargetState(String subject, SpaceQuota spaceQuota) { + public SpaceQuotaSnapshot getTargetState( + String subject, SpaceQuota spaceQuota) throws IOException { rlock.lock(); try { final long sizeLimitInBytes = spaceQuota.getSoftLimit(); @@ -85,6 +86,8 @@ public class NamespaceQuotaSnapshotStore implements QuotaSnapshotStore { for (Entry entry : filterBySubject(subject)) { sum += entry.getValue(); } + // Add in the size for any snapshots against this table + sum += QuotaTableUtil.getNamespaceSnapshotSize(conn, subject); // Observance is defined as the size of the table being less than the limit SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation() : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java index 4404b27761d..9600d1797dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java @@ -161,7 +161,9 @@ public class QuotaObserverChore extends ScheduledChore { // The current "view" of region space use. Used henceforth. final Map reportedRegionSpaceUse = quotaManager.snapshotRegionSizes(); if (LOG.isTraceEnabled()) { - LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports"); + LOG.trace( + "Using " + reportedRegionSpaceUse.size() + " region space use reports: " + + reportedRegionSpaceUse); } // Remove the "old" region reports diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java index 8b0b3a72124..2b5ba59ab66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java @@ -69,7 +69,7 @@ public interface QuotaSnapshotStore { * @param subject The object which to determine the target SpaceQuotaSnapshot of * @param spaceQuota The quota "definition" for the {@code subject} */ - SpaceQuotaSnapshot getTargetState(T subject, SpaceQuota spaceQuota); + SpaceQuotaSnapshot getTargetState(T subject, SpaceQuota spaceQuota) throws IOException; /** * Filters the provided regions, returning those which match the given diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java new file mode 100644 index 00000000000..46f5a642861 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MetricsMaster; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +/** + * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from + * a table which has a space quota. + */ +@InterfaceAudience.Private +public class SnapshotQuotaObserverChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(SnapshotQuotaObserverChore.class); + static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY = + "hbase.master.quotas.snapshot.chore.period"; + static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis + + static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY = + "hbase.master.quotas.snapshot.chore.delay"; + static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis + + static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY = + "hbase.master.quotas.snapshot.chore.timeunit"; + static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); + + private final Connection conn; + private final Configuration conf; + private final MetricsMaster metrics; + private final FileSystem fs; + + public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) { + this( + master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics); + } + + SnapshotQuotaObserverChore( + Connection conn, Configuration conf, FileSystem fs, Stoppable stopper, + MetricsMaster metrics) { + super( + QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), + getInitialDelay(conf), getTimeUnit(conf)); + this.conn = conn; + this.conf = conf; + this.metrics = metrics; + this.fs = fs; + } + + @Override + protected void chore() { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Computing sizes of snapshots for quota management."); + } + long start = System.nanoTime(); + _chore(); + if (null != metrics) { + metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000); + } + } catch (IOException e) { + LOG.warn("Failed to compute the size of snapshots, will retry", e); + } + } + + void _chore() throws IOException { + // Gets all tables with quotas that also have snapshots. + // This values are all of the snapshots that we need to compute the size of. + long start = System.nanoTime(); + Multimap snapshotsToComputeSize = getSnapshotsToComputeSize(); + if (null != metrics) { + metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000); + } + + // For each table, compute the size of each snapshot + Multimap snapshotsWithSize = computeSnapshotSizes( + snapshotsToComputeSize); + + // Write the size data to the quota table. + persistSnapshotSizes(snapshotsWithSize); + } + + /** + * Fetches each table with a quota (table or namespace quota), and then fetch the name of each + * snapshot which was created from that table. + * + * @return A mapping of table to snapshots created from that table + */ + Multimap getSnapshotsToComputeSize() throws IOException { + Set tablesToFetchSnapshotsFrom = new HashSet<>(); + QuotaFilter filter = new QuotaFilter(); + filter.addTypeFilter(QuotaType.SPACE); + try (Admin admin = conn.getAdmin()) { + // Pull all of the tables that have quotas (direct, or from namespace) + for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) { + String ns = qs.getNamespace(); + TableName tn = qs.getTableName(); + if ((null == ns && null == tn) || (null != ns && null != tn)) { + throw new IllegalStateException( + "Expected only one of namespace and tablename to be null"); + } + // Collect either the table name itself, or all of the tables in the namespace + if (null != ns) { + tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns))); + } else { + tablesToFetchSnapshotsFrom.add(tn); + } + } + // Fetch all snapshots that were created from these tables + return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom); + } + } + + /** + * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName} + * exists in the provided {@code Set}. + */ + Multimap getSnapshotsFromTables( + Admin admin, Set tablesToFetchSnapshotsFrom) throws IOException { + Multimap snapshotsToCompute = HashMultimap.create(); + for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) { + TableName tn = sd.getTableName(); + if (tablesToFetchSnapshotsFrom.contains(tn)) { + snapshotsToCompute.put(tn, sd.getName()); + } + } + return snapshotsToCompute; + } + + /** + * Computes the size of each snapshot provided given the current files referenced by the table. + * + * @param snapshotsToComputeSize The snapshots to compute the size of + * @return A mapping of table to snapshot created from that table and the snapshot's size. + */ + Multimap computeSnapshotSizes( + Multimap snapshotsToComputeSize) throws IOException { + Multimap snapshotSizes = HashMultimap.create(); + for (Entry> entry : snapshotsToComputeSize.asMap().entrySet()) { + final TableName tn = entry.getKey(); + final List snapshotNames = new ArrayList<>(entry.getValue()); + // Sort the snapshots so we process them in lexicographic order. This ensures that multiple + // invocations of this Chore do not more the size ownership of some files between snapshots + // that reference the file (prevents size ownership from moving between snapshots). + Collections.sort(snapshotNames); + final Path rootDir = FSUtils.getRootDir(conf); + // Get the map of store file names to store file path for this table + // TODO is the store-file name unique enough? Does this need to be region+family+storefile? + final Set tableReferencedStoreFiles; + try { + tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir).keySet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles); + } + + // For each snapshot on this table, get the files which the snapshot references which + // the table does not. + Set snapshotReferencedFiles = new HashSet<>(); + for (String snapshotName : snapshotNames) { + final long start = System.nanoTime(); + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd); + + if (LOG.isTraceEnabled()) { + LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles); + } + + // Get the set of files from the manifest that this snapshot references which are not also + // referenced by the originating table. + Set unreferencedStoreFileNames = getStoreFilesFromSnapshot( + manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn) + && !snapshotReferencedFiles.contains(sfn)); + + if (LOG.isTraceEnabled()) { + LOG.trace("Snapshot " + snapshotName + " solely references the files: " + + unreferencedStoreFileNames); + } + + // Compute the size of the store files for this snapshot + long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames); + if (LOG.isTraceEnabled()) { + LOG.trace("Computed size of " + snapshotName + " to be " + size); + } + + // Persist this snapshot's size into the map + snapshotSizes.put(tn, new SnapshotWithSize(snapshotName, size)); + + // Make sure that we don't double-count the same file + for (StoreFileReference ref : unreferencedStoreFileNames) { + for (String fileName : ref.getFamilyToFilesMapping().values()) { + snapshotReferencedFiles.add(fileName); + } + } + // Update the amount of time it took to compute the snapshot's size + if (null != metrics) { + metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000); + } + } + } + return snapshotSizes; + } + + /** + * Extracts the names of the store files referenced by this snapshot which satisfy the given + * predicate (the predicate returns {@code true}). + */ + Set getStoreFilesFromSnapshot( + SnapshotManifest manifest, Predicate filter) { + Set references = new HashSet<>(); + // For each region referenced by the snapshot + for (SnapshotRegionManifest rm : manifest.getRegionManifests()) { + StoreFileReference regionReference = new StoreFileReference( + HRegionInfo.convert(rm.getRegionInfo()).getEncodedName()); + + // For each column family in this region + for (FamilyFiles ff : rm.getFamilyFilesList()) { + final String familyName = ff.getFamilyName().toStringUtf8(); + // And each store file in that family + for (StoreFile sf : ff.getStoreFilesList()) { + String storeFileName = sf.getName(); + // A snapshot only "inherits" a files size if it uniquely refers to it (no table + // and no other snapshot references it). + if (filter.test(storeFileName)) { + regionReference.addFamilyStoreFile(familyName, storeFileName); + } + } + } + // Only add this Region reference if we retained any files. + if (!regionReference.getFamilyToFilesMapping().isEmpty()) { + references.add(regionReference); + } + } + return references; + } + + /** + * Calculates the directory in HDFS for a table based on the configuration. + */ + Path getTableDir(TableName tn) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + return FSUtils.getTableDir(rootDir, tn); + } + + /** + * Computes the size of each store file in {@code storeFileNames} + */ + long getSizeOfStoreFiles(TableName tn, Set storeFileNames) { + return storeFileNames.stream() + .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr))); + } + + /** + * Computes the size of the store files for a single region. + */ + long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) { + String regionName = storeFileName.getRegionName(); + return storeFileName.getFamilyToFilesMapping() + .entries().stream() + .collect(Collectors.summingLong((e) -> + getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue()))); + } + + /** + * Computes the size of the store file given its name, region and family name in + * the archive directory. + */ + long getSizeOfStoreFile( + TableName tn, String regionName, String family, String storeFile) { + Path familyArchivePath; + try { + familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family); + } catch (IOException e) { + LOG.warn("Could not compute path for the archive directory for the region", e); + return 0L; + } + Path fileArchivePath = new Path(familyArchivePath, storeFile); + try { + if (fs.exists(fileArchivePath)) { + FileStatus[] status = fs.listStatus(fileArchivePath); + if (1 != status.length) { + LOG.warn("Expected " + fileArchivePath + + " to be a file but was a directory, ignoring reference"); + return 0L; + } + return status[0].getLen(); + } + } catch (IOException e) { + LOG.warn("Could not obtain the status of " + fileArchivePath, e); + return 0L; + } + LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference."); + return 0L; + } + + /** + * Writes the snapshot sizes to the {@code hbase:quota} table. + * + * @param snapshotsWithSize The snapshot sizes to write. + */ + void persistSnapshotSizes( + Multimap snapshotsWithSize) throws IOException { + try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + // Write each snapshot size for the table + persistSnapshotSizes(quotaTable, snapshotsWithSize); + // Write a size entry for all snapshots in a namespace + persistSnapshotSizesByNS(quotaTable, snapshotsWithSize); + } + } + + /** + * Writes the snapshot sizes to the provided {@code table}. + */ + void persistSnapshotSizes( + Table table, Multimap snapshotsWithSize) throws IOException { + // Convert each entry in the map to a Put and write them to the quota table + table.put(snapshotsWithSize.entries() + .stream() + .map(e -> QuotaTableUtil.createPutForSnapshotSize( + e.getKey(), e.getValue().getName(), e.getValue().getSize())) + .collect(Collectors.toList())); + } + + /** + * Rolls up the snapshot sizes by namespace and writes a single record for each namespace + * which is the size of all snapshots in that namespace. + */ + void persistSnapshotSizesByNS( + Table quotaTable, Multimap snapshotsWithSize) throws IOException { + Map namespaceSnapshotSizes = groupSnapshotSizesByNamespace(snapshotsWithSize); + quotaTable.put(namespaceSnapshotSizes.entrySet().stream() + .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize( + e.getKey(), e.getValue())) + .collect(Collectors.toList())); + } + + /** + * Sums the snapshot sizes for each namespace. + */ + Map groupSnapshotSizesByNamespace( + Multimap snapshotsWithSize) { + return snapshotsWithSize.entries().stream() + .collect(Collectors.groupingBy( + // Convert TableName into the namespace string + (e) -> e.getKey().getNamespaceAsString(), + // Sum the values for namespace + Collectors.mapping( + Map.Entry::getValue, Collectors.summingLong((sws) -> sws.getSize())))); + } + + /** + * A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is + * defined as the amount of filesystem space taken by the files the snapshot refers to which + * the originating table no longer refers to. + */ + static class SnapshotWithSize { + private final String name; + private final long size; + + SnapshotWithSize(String name, long size) { + this.name = Objects.requireNonNull(name); + this.size = size; + } + + String getName() { + return name; + } + + long getSize() { + return size; + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(name).append(size).toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof SnapshotWithSize)) { + return false; + } + + SnapshotWithSize other = (SnapshotWithSize) o; + return name.equals(other.name) && size == other.size; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(32); + return sb.append("SnapshotWithSize:[").append(name).append(" ") + .append(StringUtils.byteDesc(size)).append("]").toString(); + } + } + + /** + * A reference to a collection of files in the archive directory for a single region. + */ + static class StoreFileReference { + private final String regionName; + private final Multimap familyToFiles; + + StoreFileReference(String regionName) { + this.regionName = Objects.requireNonNull(regionName); + familyToFiles = HashMultimap.create(); + } + + String getRegionName() { + return regionName; + } + + Multimap getFamilyToFilesMapping() { + return familyToFiles; + } + + void addFamilyStoreFile(String family, String storeFileName) { + familyToFiles.put(family, storeFileName); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StoreFileReference)) { + return false; + } + StoreFileReference other = (StoreFileReference) o; + return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + return sb.append("StoreFileReference[region=").append(regionName).append(", files=") + .append(familyToFiles).append("]").toString(); + } + } + + /** + * Extracts the period for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore period or the default value. + */ + static int getPeriod(Configuration conf) { + return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, + SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT); + } + + /** + * Extracts the initial delay for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore initial delay or the default value. + */ + static long getInitialDelay(Configuration conf) { + return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY, + SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT); + } + + /** + * Extracts the time unit for the chore period and initial delay from the configuration. The + * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to + * a {@link TimeUnit} value. + * + * @param conf The configuration object. + * @return The configured time unit for the chore period and initial delay or the default value. + */ + static TimeUnit getTimeUnit(Configuration conf) { + return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY, + SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java index 1abf34737b4..6a29a828e72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java @@ -18,17 +18,26 @@ package org.apache.hadoop.hbase.quotas; import java.io.IOException; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; @@ -41,6 +50,8 @@ import com.google.common.collect.Iterables; */ @InterfaceAudience.Private public class TableQuotaSnapshotStore implements QuotaSnapshotStore { + private static final Log LOG = LogFactory.getLog(TableQuotaSnapshotStore.class); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReadLock rlock = lock.readLock(); private final WriteLock wlock = lock.writeLock(); @@ -77,7 +88,8 @@ public class TableQuotaSnapshotStore implements QuotaSnapshotStore { } @Override - public SpaceQuotaSnapshot getTargetState(TableName table, SpaceQuota spaceQuota) { + public SpaceQuotaSnapshot getTargetState( + TableName table, SpaceQuota spaceQuota) throws IOException { rlock.lock(); try { final long sizeLimitInBytes = spaceQuota.getSoftLimit(); @@ -85,6 +97,8 @@ public class TableQuotaSnapshotStore implements QuotaSnapshotStore { for (Entry entry : filterBySubject(table)) { sum += entry.getValue(); } + // Add in the size for any snapshots against this table + sum += getSnapshotSizesForTable(table); // Observance is defined as the size of the table being less than the limit SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation() : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy())); @@ -94,6 +108,42 @@ public class TableQuotaSnapshotStore implements QuotaSnapshotStore { } } + /** + * Fetches any serialized snapshot sizes from the quota table for the {@code tn} provided. Any + * malformed records are skipped with a warning printed out. + */ + long getSnapshotSizesForTable(TableName tn) throws IOException { + try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + Scan s = QuotaTableUtil.createScanForSnapshotSizes(tn); + ResultScanner rs = quotaTable.getScanner(s); + try { + long size = 0L; + // Should just be a single row (for our table) + for (Result result : rs) { + // May have multiple columns, one for each snapshot + CellScanner cs = result.cellScanner(); + while (cs.advance()) { + Cell current = cs.current(); + try { + long snapshotSize = QuotaTableUtil.parseSnapshotSize(current); + if (LOG.isTraceEnabled()) { + LOG.trace("Saw snapshot size of " + snapshotSize + " for " + current); + } + size += snapshotSize; + } catch (InvalidProtocolBufferException e) { + LOG.warn("Failed to parse snapshot size from cell: " + current); + } + } + } + return size; + } finally { + if (null != rs) { + rs.close(); + } + } + } + } + @Override public Iterable> filterBySubject(TableName table) { rlock.lock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java index 548faf878d2..f9813e55f43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java @@ -36,7 +36,7 @@ public class TableSpaceQuotaSnapshotNotifier implements SpaceQuotaSnapshotNotifi @Override public void transitionTable( TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException { - final Put p = QuotaTableUtil.createPutSpaceSnapshot(tableName, snapshot); + final Put p = QuotaTableUtil.createPutForSpaceSnapshot(tableName, snapshot); try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { if (LOG.isTraceEnabled()) { LOG.trace("Persisting a space quota snapshot " + snapshot + " for " + tableName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 17e255a3e18..743c9cd3039 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -2026,6 +2027,17 @@ public class HStore implements Store { @Override public long getStorefilesSize() { + // Include all StoreFiles + return getStorefilesSize(storeFile -> true); + } + + @Override + public long getHFilesSize() { + // Include only StoreFiles which are HFiles + return getStorefilesSize(storeFile -> storeFile.isHFile()); + } + + private long getStorefilesSize(Predicate predicate) { long size = 0; for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { StoreFileReader r = s.getReader(); @@ -2033,7 +2045,9 @@ public class HStore implements Store { LOG.warn("StoreFile " + s + " has a null Reader"); continue; } - size += r.length(); + if (predicate.test(s)) { + size += r.length(); + } } return size; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 76595f3622f..f5e90eb85c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -401,6 +401,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf */ long getStorefilesSize(); + /** + * @return The size of only the store files which are HFiles, in bytes. + */ + long getHFilesSize(); + /** * @return The size of the store file indexes, in bytes. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java index 1e2235a3f97..c31bfe50802 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java @@ -20,6 +20,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map.Entry; import java.util.Objects; @@ -29,9 +30,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.Predicate; @@ -40,6 +43,10 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.junit.rules.TestName; @@ -55,6 +62,7 @@ public class SpaceQuotaHelperForTests { public static final String F1 = "f1"; public static final long ONE_KILOBYTE = 1024L; public static final long ONE_MEGABYTE = ONE_KILOBYTE * ONE_KILOBYTE; + public static final long ONE_GIGABYTE = ONE_MEGABYTE * ONE_KILOBYTE; private final HBaseTestingUtility testUtil; private final TestName testName; @@ -67,6 +75,25 @@ public class SpaceQuotaHelperForTests { this.counter = Objects.requireNonNull(counter); } + // + // Static helpers + // + + static void updateConfigForQuotas(Configuration conf) { + // Increase the frequency of some of the chores for responsiveness of the test + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); + conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000); + conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000); + // The period at which we check for compacted files that should be deleted from HDFS + conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000); + conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + } + // // Helpers // @@ -88,24 +115,33 @@ public class SpaceQuotaHelperForTests { /** * Removes all quotas defined in the HBase quota table. */ - void removeAllQuotas(Connection conn) throws IOException { - QuotaRetriever scanner = QuotaRetriever.open(conn.getConfiguration()); - try { - for (QuotaSettings quotaSettings : scanner) { - final String namespace = quotaSettings.getNamespace(); - final TableName tableName = quotaSettings.getTableName(); - if (namespace != null) { - LOG.debug("Deleting quota for namespace: " + namespace); - QuotaUtil.deleteNamespaceQuota(conn, namespace); - } else { - assert tableName != null; - LOG.debug("Deleting quota for table: "+ tableName); - QuotaUtil.deleteTableQuota(conn, tableName); + void removeAllQuotas(Connection conn) throws IOException, InterruptedException { + // Wait for the quota table to be created + if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { + do { + LOG.debug("Quota table does not yet exist"); + Thread.sleep(1000); + } while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)); + } else { + // Or, clean up any quotas from previous test runs. + QuotaRetriever scanner = QuotaRetriever.open(conn.getConfiguration()); + try { + for (QuotaSettings quotaSettings : scanner) { + final String namespace = quotaSettings.getNamespace(); + final TableName tableName = quotaSettings.getTableName(); + if (namespace != null) { + LOG.debug("Deleting quota for namespace: " + namespace); + QuotaUtil.deleteNamespaceQuota(conn, namespace); + } else { + assert tableName != null; + LOG.debug("Deleting quota for table: "+ tableName); + QuotaUtil.deleteTableQuota(conn, tableName); + } + } + } finally { + if (scanner != null) { + scanner.close(); } - } - } finally { - if (scanner != null) { - scanner.close(); } } } @@ -146,6 +182,15 @@ public class SpaceQuotaHelperForTests { } void writeData(Connection conn, TableName tn, long sizeInBytes) throws IOException { + writeData(tn, sizeInBytes, Bytes.toBytes("q1")); + } + + void writeData(TableName tn, long sizeInBytes, String qual) throws IOException { + writeData(tn, sizeInBytes, Bytes.toBytes(qual)); + } + + void writeData(TableName tn, long sizeInBytes, byte[] qual) throws IOException { + final Connection conn = testUtil.getConnection(); final Table table = conn.getTable(tn); try { List updates = new ArrayList<>(); @@ -160,7 +205,7 @@ public class SpaceQuotaHelperForTests { Put p = new Put(Bytes.toBytes(sb.reverse().toString())); byte[] value = new byte[SIZE_PER_VALUE]; r.nextBytes(value); - p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value); + p.addColumn(Bytes.toBytes(F1), qual, value); updates.add(p); // Batch ~13KB worth of updates @@ -188,6 +233,12 @@ public class SpaceQuotaHelperForTests { } } + NamespaceDescriptor createNamespace() throws Exception { + NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build(); + testUtil.getAdmin().createNamespace(nd); + return nd; + } + Multimap createTablesWithSpaceQuotas() throws Exception { final Admin admin = testUtil.getAdmin(); final Multimap tablesWithQuotas = HashMultimap.create(); @@ -195,8 +246,7 @@ public class SpaceQuotaHelperForTests { final TableName tn1 = createTable(); final TableName tn2 = createTable(); - NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build(); - admin.createNamespace(nd); + NamespaceDescriptor nd = createNamespace(); final TableName tn3 = createTableInNamespace(nd); final TableName tn4 = createTableInNamespace(nd); final TableName tn5 = createTableInNamespace(nd); @@ -233,6 +283,14 @@ public class SpaceQuotaHelperForTests { return tablesWithQuotas; } + TableName getNextTableName() { + return getNextTableName(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR); + } + + TableName getNextTableName(String namespace) { + return TableName.valueOf(namespace, testName.getMethodName() + counter.getAndIncrement()); + } + TableName createTable() throws Exception { return createTableWithRegions(1); } @@ -251,8 +309,7 @@ public class SpaceQuotaHelperForTests { } TableName createTableWithRegions(Admin admin, String namespace, int numRegions) throws Exception { - final TableName tn = TableName.valueOf( - namespace, testName.getMethodName() + counter.getAndIncrement()); + final TableName tn = getNextTableName(namespace); // Delete the old table if (admin.tableExists(tn)) { @@ -308,4 +365,87 @@ public class SpaceQuotaHelperForTests { } } } + + /** + * Abstraction to simplify the case where a test needs to verify a certain state + * on a {@code SpaceQuotaSnapshot}. This class fails-fast when there is no such + * snapshot obtained from the Master. As such, it is not useful to verify the + * lack of a snapshot. + */ + static abstract class SpaceQuotaSnapshotPredicate implements Predicate { + private final Connection conn; + private final TableName tn; + private final String ns; + + SpaceQuotaSnapshotPredicate(Connection conn, TableName tn) { + this(Objects.requireNonNull(conn), Objects.requireNonNull(tn), null); + } + + SpaceQuotaSnapshotPredicate(Connection conn, String ns) { + this(Objects.requireNonNull(conn), null, Objects.requireNonNull(ns)); + } + + SpaceQuotaSnapshotPredicate(Connection conn, TableName tn, String ns) { + if ((null != tn && null != ns) || (null == tn && null == ns)) { + throw new IllegalArgumentException( + "One of TableName and Namespace must be non-null, and the other null"); + } + this.conn = conn; + this.tn = tn; + this.ns = ns; + } + + @Override + public boolean evaluate() throws Exception { + SpaceQuotaSnapshot snapshot; + if (null == ns) { + snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn); + } else { + snapshot = QuotaTableUtil.getCurrentSnapshot(conn, ns); + } + + LOG.debug("Saw quota snapshot for " + (null == tn ? ns : tn) + ": " + snapshot); + if (null == snapshot) { + return false; + } + return evaluate(snapshot); + } + + /** + * Must determine if the given {@code SpaceQuotaSnapshot} meets some criteria. + * + * @param snapshot a non-null snapshot obtained from the HBase Master + * @return true if the criteria is met, false otherwise + */ + abstract boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception; + } + + /** + * Predicate that waits for all store files in a table to have no compacted files. + */ + static class NoFilesToDischarge implements Predicate { + private final MiniHBaseCluster cluster; + private final TableName tn; + + NoFilesToDischarge(MiniHBaseCluster cluster, TableName tn) { + this.cluster = cluster; + this.tn = tn; + } + + @Override + public boolean evaluate() throws Exception { + for (HRegion region : cluster.getRegions(tn)) { + for (Store store : region.getStores()) { + HStore hstore = (HStore) store; + Collection files = + hstore.getStoreEngine().getStoreFileManager().getCompactedfiles(); + if (null != files && !files.isEmpty()) { + LOG.debug(region.getRegionInfo().getEncodedName() + " still has compacted files"); + return false; + } + } + } + return true; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java index 18e47af7d1c..823b1f7b898 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -238,13 +239,13 @@ public class TestFileSystemUtilizationChore { final Configuration conf = getDefaultHBaseConfiguration(); final HRegionServer rs = mockRegionServer(conf); - // Three regions with multiple store sizes + // Two regions with multiple store sizes final List r1Sizes = Arrays.asList(1024L, 2048L); final long r1Sum = sum(r1Sizes); final List r2Sizes = Arrays.asList(1024L * 1024L); final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs); - doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum)))) + doAnswer(new ExpectedRegionSizeSummationAnswer(r1Sum)) .when(rs) .reportRegionSizesForQuotas((Map) any(Map.class)); @@ -254,6 +255,33 @@ public class TestFileSystemUtilizationChore { chore.chore(); } + @SuppressWarnings("unchecked") + @Test + public void testNonHFilesAreIgnored() { + final Configuration conf = getDefaultHBaseConfiguration(); + final HRegionServer rs = mockRegionServer(conf); + + // Region r1 has two store files, one hfile link and one hfile + final List r1StoreFileSizes = Arrays.asList(1024L, 2048L); + final List r1HFileSizes = Arrays.asList(0L, 2048L); + final long r1HFileSizeSum = sum(r1HFileSizes); + // Region r2 has one store file which is a hfile link + final List r2StoreFileSizes = Arrays.asList(1024L * 1024L); + final List r2HFileSizes = Arrays.asList(0L); + final long r2HFileSizeSum = sum(r2HFileSizes); + + // We expect that only the hfiles would be counted (hfile links are ignored) + final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs); + doAnswer(new ExpectedRegionSizeSummationAnswer( + sum(Arrays.asList(r1HFileSizeSum, r2HFileSizeSum)))) + .when(rs).reportRegionSizesForQuotas((Map) any(Map.class)); + + final Region r1 = mockRegionWithHFileLinks(r1StoreFileSizes, r1HFileSizes); + final Region r2 = mockRegionWithHFileLinks(r2StoreFileSizes, r2HFileSizes); + when(rs.getOnlineRegions()).thenReturn(Arrays.asList(r1, r2)); + chore.chore(); + } + /** * Creates an HBase Configuration object for the default values. */ @@ -298,9 +326,31 @@ public class TestFileSystemUtilizationChore { List stores = new ArrayList<>(); when(r.getStores()).thenReturn(stores); for (Long storeSize : storeSizes) { + final Store s = mock(Store.class); + stores.add(s); + when(s.getHFilesSize()).thenReturn(storeSize); + } + return r; + } + + private Region mockRegionWithHFileLinks(Collection storeSizes, Collection hfileSizes) { + final Region r = mock(Region.class); + final HRegionInfo info = mock(HRegionInfo.class); + when(r.getRegionInfo()).thenReturn(info); + List stores = new ArrayList<>(); + when(r.getStores()).thenReturn(stores); + assertEquals( + "Logic error, storeSizes and linkSizes must be equal in size", storeSizes.size(), + hfileSizes.size()); + Iterator storeSizeIter = storeSizes.iterator(); + Iterator hfileSizeIter = hfileSizes.iterator(); + while (storeSizeIter.hasNext() && hfileSizeIter.hasNext()) { + final long storeSize = storeSizeIter.next(); + final long hfileSize = hfileSizeIter.next(); final Store s = mock(Store.class); stores.add(s); when(s.getStorefilesSize()).thenReturn(storeSize); + when(s.getHFilesSize()).thenReturn(hfileSize); } return r; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java index 4a7258fd983..385f8c4d72d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java @@ -23,6 +23,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -31,6 +32,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; @@ -91,7 +95,8 @@ public class TestNamespaceQuotaViolationStore { } @Test - public void testTargetViolationState() { + public void testTargetViolationState() throws IOException { + mockNoSnapshotSizes(); final String NS = "ns"; TableName tn1 = TableName.valueOf(NS, "tn1"); TableName tn2 = TableName.valueOf(NS, "tn2"); @@ -123,7 +128,8 @@ public class TestNamespaceQuotaViolationStore { // Exceeds the quota, should be in violation assertEquals(true, store.getTargetState(NS, quota).getQuotaStatus().isInViolation()); - assertEquals(SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy()); + assertEquals( + SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy()); } @Test @@ -153,4 +159,9 @@ public class TestNamespaceQuotaViolationStore { assertEquals(18, size(store.filterBySubject("ns"))); } + void mockNoSnapshotSizes() throws IOException { + Table quotaTable = mock(Table.class); + when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); + when(quotaTable.get(any(Get.class))).thenReturn(new Result()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java index ae315a8c8c6..44b73b0f431 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java @@ -65,11 +65,8 @@ public class TestQuotaObserverChoreRegionReports { @Before public void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + // Increase the frequency of some of the chores for responsiveness of the test + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); conf.setInt(QuotaObserverChore.REGION_REPORT_RETENTION_DURATION_KEY, 1000); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java index dde9e71efce..4b0fa247913 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java @@ -77,11 +77,7 @@ public class TestQuotaObserverChoreWithMiniCluster { @BeforeClass public static void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY, SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class); TEST_UTIL.startMiniCluster(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaStatusRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaStatusRPCs.java index 2020e3cf41f..aeae80ad2f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaStatusRPCs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaStatusRPCs.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; import org.apache.hadoop.hbase.quotas.policies.MissingSnapshotViolationPolicyEnforcement; @@ -65,13 +66,7 @@ public class TestQuotaStatusRPCs { public static void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); // Increase the frequency of some of the chores for responsiveness of the test - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); TEST_UTIL.startMiniCluster(1); } @@ -167,7 +162,7 @@ public class TestQuotaStatusRPCs { // Write at least `tableSize` data try { helper.writeData(tn, tableSize); - } catch (SpaceLimitingException e) { + } catch (RetriesExhaustedWithDetailsException | SpaceLimitingException e) { // Pass } @@ -245,7 +240,7 @@ public class TestQuotaStatusRPCs { try { helper.writeData(tn, tableSize * 2L); - } catch (SpaceLimitingException e) { + } catch (RetriesExhaustedWithDetailsException | SpaceLimitingException e) { // Pass } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java index f10cdef4870..e4dbdc9c199 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -27,8 +29,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -37,7 +42,9 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -211,9 +218,9 @@ public class TestQuotaTableUtil { final SpaceQuotaSnapshot snapshot3 = new SpaceQuotaSnapshot( new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 512L, 1024L); List puts = new ArrayList<>(); - puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn1, snapshot1)); - puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn2, snapshot2)); - puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn3, snapshot3)); + puts.add(QuotaTableUtil.createPutForSpaceSnapshot(tn1, snapshot1)); + puts.add(QuotaTableUtil.createPutForSpaceSnapshot(tn2, snapshot2)); + puts.add(QuotaTableUtil.createPutForSpaceSnapshot(tn3, snapshot3)); final Map expectedPolicies = new HashMap<>(); expectedPolicies.put(tn1, snapshot1); expectedPolicies.put(tn2, snapshot2); @@ -232,7 +239,59 @@ public class TestQuotaTableUtil { assertEquals(expectedPolicies, actualPolicies); } + @Test + public void testSerdeTableSnapshotSizes() throws Exception { + TableName tn1 = TableName.valueOf("tn1"); + TableName tn2 = TableName.valueOf("tn2"); + try (Table quotaTable = connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + for (int i = 0; i < 3; i++) { + Put p = QuotaTableUtil.createPutForSnapshotSize(tn1, "tn1snap" + i, 1024L * (1+i)); + quotaTable.put(p); + } + for (int i = 0; i < 3; i++) { + Put p = QuotaTableUtil.createPutForSnapshotSize(tn2, "tn2snap" + i, 2048L * (1+i)); + quotaTable.put(p); + } + + verifyTableSnapshotSize(quotaTable, tn1, "tn1snap0", 1024L); + verifyTableSnapshotSize(quotaTable, tn1, "tn1snap1", 2048L); + verifyTableSnapshotSize(quotaTable, tn1, "tn1snap2", 3072L); + + verifyTableSnapshotSize(quotaTable, tn2, "tn2snap0", 2048L); + verifyTableSnapshotSize(quotaTable, tn2, "tn2snap1", 4096L); + verifyTableSnapshotSize(quotaTable, tn2, "tn2snap2", 6144L); + } + } + + @Test + public void testReadNamespaceSnapshotSizes() throws Exception { + String ns1 = "ns1"; + String ns2 = "ns2"; + String defaultNs = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR; + try (Table quotaTable = connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + quotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns1, 1024L)); + quotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns2, 2048L)); + quotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(defaultNs, 8192L)); + + assertEquals(1024L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns1)); + assertEquals(2048L, QuotaTableUtil.getNamespaceSnapshotSize(connection, ns2)); + assertEquals(8192L, QuotaTableUtil.getNamespaceSnapshotSize(connection, defaultNs)); + } + } + private TableName getUniqueTableName() { return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++); } + + private void verifyTableSnapshotSize( + Table quotaTable, TableName tn, String snapshotName, long expectedSize) throws IOException { + Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshotName)); + CellScanner cs = r.cellScanner(); + assertTrue(cs.advance()); + Cell c = cs.current(); + assertEquals(expectedSize, QuotaProtos.SpaceQuotaSnapshot.parseFrom( + UnsafeByteOperations.unsafeWrap( + c.getValueArray(), c.getValueOffset(), c.getValueLength())).getQuotaUsage()); + assertFalse(cs.advance()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeUse.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeUse.java index ed8a2f3b6fe..8584d557bb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeUse.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeUse.java @@ -67,9 +67,8 @@ public class TestRegionSizeUse { @Before public void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + // Increase the frequency of some of the chores for responsiveness of the test + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); cluster = TEST_UTIL.startMiniCluster(2); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java new file mode 100644 index 00000000000..4022e3ffeba --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore.SnapshotWithSize; +import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.NoFilesToDischarge; +import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; + +/** + * Test class for the {@link SnapshotQuotaObserverChore}. + */ +@Category(MediumTests.class) +public class TestSnapshotQuotaObserverChore { + private static final Log LOG = LogFactory.getLog(TestSnapshotQuotaObserverChore.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final AtomicLong COUNTER = new AtomicLong(); + + @Rule + public TestName testName = new TestName(); + + private Connection conn; + private Admin admin; + private SpaceQuotaHelperForTests helper; + private HMaster master; + private SnapshotQuotaObserverChore testChore; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); + // Clean up the compacted files faster than normal (15s instead of 2mins) + conf.setInt("hbase.hfile.compaction.discharger.interval", 15 * 1000); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setup() throws Exception { + conn = TEST_UTIL.getConnection(); + admin = TEST_UTIL.getAdmin(); + helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); + master = TEST_UTIL.getHBaseCluster().getMaster(); + helper.removeAllQuotas(conn); + testChore = new SnapshotQuotaObserverChore( + TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration(), master.getFileSystem(), master, + null); + } + + @Test + public void testSnapshotSizePersistence() throws IOException { + final Admin admin = TEST_UTIL.getAdmin(); + final TableName tn = TableName.valueOf("quota_snapshotSizePersistence"); + if (admin.tableExists(tn)) { + admin.disableTable(tn); + admin.deleteTable(tn); + } + HTableDescriptor desc = new HTableDescriptor(tn); + desc.addFamily(new HColumnDescriptor(QuotaTableUtil.QUOTA_FAMILY_USAGE)); + admin.createTable(desc); + + Multimap snapshotsWithSizes = HashMultimap.create(); + try (Table table = conn.getTable(tn)) { + // Writing no values will result in no records written. + verify(table, () -> { + testChore.persistSnapshotSizes(table, snapshotsWithSizes); + assertEquals(0, count(table)); + }); + + verify(table, () -> { + TableName originatingTable = TableName.valueOf("t1"); + snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss1", 1024L)); + snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss2", 4096L)); + testChore.persistSnapshotSizes(table, snapshotsWithSizes); + assertEquals(2, count(table)); + assertEquals(1024L, extractSnapshotSize(table, originatingTable, "ss1")); + assertEquals(4096L, extractSnapshotSize(table, originatingTable, "ss2")); + }); + + snapshotsWithSizes.clear(); + verify(table, () -> { + snapshotsWithSizes.put(TableName.valueOf("t1"), new SnapshotWithSize("ss1", 1024L)); + snapshotsWithSizes.put(TableName.valueOf("t2"), new SnapshotWithSize("ss2", 4096L)); + snapshotsWithSizes.put(TableName.valueOf("t3"), new SnapshotWithSize("ss3", 8192L)); + testChore.persistSnapshotSizes(table, snapshotsWithSizes); + assertEquals(3, count(table)); + assertEquals(1024L, extractSnapshotSize(table, TableName.valueOf("t1"), "ss1")); + assertEquals(4096L, extractSnapshotSize(table, TableName.valueOf("t2"), "ss2")); + assertEquals(8192L, extractSnapshotSize(table, TableName.valueOf("t3"), "ss3")); + }); + } + } + + @Test + public void testSnapshotsFromTables() throws Exception { + TableName tn1 = helper.createTableWithRegions(1); + TableName tn2 = helper.createTableWithRegions(1); + TableName tn3 = helper.createTableWithRegions(1); + + // Set a space quota on table 1 and 2 (but not 3) + admin.setQuota(QuotaSettingsFactory.limitTableSpace( + tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + admin.setQuota(QuotaSettingsFactory.limitTableSpace( + tn2, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + + // Create snapshots on each table (we didn't write any data, so just skipflush) + admin.snapshot(new SnapshotDescription(tn1 + "snapshot", tn1, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription(tn2 + "snapshot", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription(tn3 + "snapshot", tn3, SnapshotType.SKIPFLUSH)); + + Multimap mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(2, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(1, mapping.get(tn2).size()); + assertEquals(tn2 + "snapshot", mapping.get(tn2).iterator().next()); + + admin.snapshot(new SnapshotDescription(tn2 + "snapshot1", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription(tn3 + "snapshot1", tn3, SnapshotType.SKIPFLUSH)); + + mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(3, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(2, mapping.get(tn2).size()); + assertEquals( + new HashSet(Arrays.asList(tn2 + "snapshot", tn2 + "snapshot1")), mapping.get(tn2)); + } + + @Test + public void testSnapshotsFromNamespaces() throws Exception { + NamespaceDescriptor ns = NamespaceDescriptor.create("snapshots_from_namespaces").build(); + admin.createNamespace(ns); + + TableName tn1 = helper.createTableWithRegions(ns.getName(), 1); + TableName tn2 = helper.createTableWithRegions(ns.getName(), 1); + TableName tn3 = helper.createTableWithRegions(1); + + // Set a space quota on the namespace + admin.setQuota(QuotaSettingsFactory.limitNamespaceSpace( + ns.getName(), SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + + // Create snapshots on each table (we didn't write any data, so just skipflush) + admin.snapshot(new SnapshotDescription( + tn1.getQualifierAsString() + "snapshot", tn1, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription( + tn2.getQualifierAsString() + "snapshot", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription( + tn3.getQualifierAsString() + "snapshot", tn3, SnapshotType.SKIPFLUSH)); + + Multimap mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(2, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(1, mapping.get(tn2).size()); + assertEquals(tn2.getQualifierAsString() + "snapshot", mapping.get(tn2).iterator().next()); + + admin.snapshot(new SnapshotDescription( + tn2.getQualifierAsString() + "snapshot1", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription( + tn3.getQualifierAsString() + "snapshot2", tn3, SnapshotType.SKIPFLUSH)); + + mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(3, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(2, mapping.get(tn2).size()); + assertEquals( + new HashSet(Arrays.asList(tn2.getQualifierAsString() + "snapshot", + tn2.getQualifierAsString() + "snapshot1")), mapping.get(tn2)); + } + + @Test + public void testSnapshotSize() throws Exception { + // Create a table and set a quota + TableName tn1 = helper.createTableWithRegions(5); + admin.setQuota(QuotaSettingsFactory.limitTableSpace( + tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + + // Write some data and flush it + helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); + admin.flush(tn1); + + final AtomicReference lastSeenSize = new AtomicReference<>(); + // Wait for the Master chore to run to see the usage (with a fudge factor) + TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { + @Override + boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + lastSeenSize.set(snapshot.getUsage()); + return snapshot.getUsage() > 230L * SpaceQuotaHelperForTests.ONE_KILOBYTE; + } + }); + + // Create a snapshot on the table + final String snapshotName = tn1 + "snapshot"; + admin.snapshot(new SnapshotDescription(snapshotName, tn1, SnapshotType.SKIPFLUSH)); + + // Get the snapshots + Multimap snapshotsToCompute = testChore.getSnapshotsToComputeSize(); + assertEquals( + "Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size()); + + // Get the size of our snapshot + Multimap snapshotsWithSize = testChore.computeSnapshotSizes( + snapshotsToCompute); + assertEquals(1, snapshotsWithSize.size()); + SnapshotWithSize sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1)); + assertEquals(snapshotName, sws.getName()); + // The snapshot should take up no space since the table refers to it completely + assertEquals(0, sws.getSize()); + + // Write some more data, flush it, and then major_compact the table + helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); + admin.flush(tn1); + TEST_UTIL.compact(tn1, true); + + // Test table should reflect it's original size since ingest was deterministic + TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { + @Override + boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + LOG.debug("Current usage=" + snapshot.getUsage() + " lastSeenSize=" + lastSeenSize.get()); + return closeInSize( + snapshot.getUsage(), lastSeenSize.get(), SpaceQuotaHelperForTests.ONE_KILOBYTE); + } + }); + + // Wait for no compacted files on the regions of our table + TEST_UTIL.waitFor(30_000, new NoFilesToDischarge(TEST_UTIL.getMiniHBaseCluster(), tn1)); + + // Still should see only one snapshot + snapshotsToCompute = testChore.getSnapshotsToComputeSize(); + assertEquals( + "Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size()); + snapshotsWithSize = testChore.computeSnapshotSizes( + snapshotsToCompute); + assertEquals(1, snapshotsWithSize.size()); + sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1)); + assertEquals(snapshotName, sws.getName()); + // The snapshot should take up the size the table originally took up + assertEquals(lastSeenSize.get().longValue(), sws.getSize()); + } + + @Test + public void testPersistingSnapshotsForNamespaces() throws Exception { + Multimap snapshotsWithSizes = HashMultimap.create(); + TableName tn1 = TableName.valueOf("ns1:tn1"); + TableName tn2 = TableName.valueOf("ns1:tn2"); + TableName tn3 = TableName.valueOf("ns2:tn1"); + TableName tn4 = TableName.valueOf("ns2:tn2"); + TableName tn5 = TableName.valueOf("tn1"); + + snapshotsWithSizes.put(tn1, new SnapshotWithSize("", 1024L)); + snapshotsWithSizes.put(tn2, new SnapshotWithSize("", 1024L)); + snapshotsWithSizes.put(tn3, new SnapshotWithSize("", 512L)); + snapshotsWithSizes.put(tn4, new SnapshotWithSize("", 1024L)); + snapshotsWithSizes.put(tn5, new SnapshotWithSize("", 3072L)); + + Map nsSizes = testChore.groupSnapshotSizesByNamespace(snapshotsWithSizes); + assertEquals(3, nsSizes.size()); + assertEquals(2048L, (long) nsSizes.get("ns1")); + assertEquals(1536L, (long) nsSizes.get("ns2")); + assertEquals(3072L, (long) nsSizes.get(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR)); + } + + private long count(Table t) throws IOException { + try (ResultScanner rs = t.getScanner(new Scan())) { + long sum = 0; + for (Result r : rs) { + while (r.advance()) { + sum++; + } + } + return sum; + } + } + + private long extractSnapshotSize( + Table quotaTable, TableName tn, String snapshot) throws IOException { + Get g = QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot); + Result r = quotaTable.get(g); + assertNotNull(r); + CellScanner cs = r.cellScanner(); + cs.advance(); + Cell c = cs.current(); + assertNotNull(c); + return QuotaTableUtil.extractSnapshotSize( + c.getValueArray(), c.getValueOffset(), c.getValueLength()); + } + + private void verify(Table t, IOThrowingRunnable test) throws IOException { + admin.disableTable(t.getName()); + admin.truncateTable(t.getName(), false); + test.run(); + } + + @FunctionalInterface + private interface IOThrowingRunnable { + void run() throws IOException; + } + + /** + * Computes if {@code size2} is within {@code delta} of {@code size1}, inclusive. + */ + boolean closeInSize(long size1, long size2, long delta) { + long lower = size1 - delta; + long upper = size1 + delta; + return lower <= size2 && size2 <= upper; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java index e21647f14a1..888ad9eab24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java @@ -79,6 +79,7 @@ import org.junit.rules.TestName; public class TestSpaceQuotas { private static final Log LOG = LogFactory.getLog(TestSpaceQuotas.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + // Global for all tests in the class private static final AtomicLong COUNTER = new AtomicLong(0); private static final int NUM_RETRIES = 10; @@ -89,14 +90,7 @@ public class TestSpaceQuotas { @BeforeClass public static void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - // Increase the frequency of some of the chores for responsiveness of the test - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); TEST_UTIL.startMiniCluster(1); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotasWithSnapshots.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotasWithSnapshots.java new file mode 100644 index 00000000000..ebb1a9e2577 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotasWithSnapshots.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.google.common.collect.Iterables; + +/** + * Test class to exercise the inclusion of snapshots in space quotas + */ +@Category({LargeTests.class}) +public class TestSpaceQuotasWithSnapshots { + private static final Log LOG = LogFactory.getLog(TestSpaceQuotasWithSnapshots.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + // Global for all tests in the class + private static final AtomicLong COUNTER = new AtomicLong(0); + private static final long FUDGE_FOR_TABLE_SIZE = 500L * SpaceQuotaHelperForTests.ONE_KILOBYTE; + + @Rule + public TestName testName = new TestName(); + private SpaceQuotaHelperForTests helper; + private Connection conn; + private Admin admin; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void removeAllQuotas() throws Exception { + helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); + conn = TEST_UTIL.getConnection(); + admin = TEST_UTIL.getAdmin(); + } + + @Test + public void testTablesInheritSnapshotSize() throws Exception { + TableName tn = helper.createTableWithRegions(1); + LOG.info("Writing data"); + // Set a quota + QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( + tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); + admin.setQuota(settings); + // Write some data + final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; + helper.writeData(tn, initialSize); + + LOG.info("Waiting until table size reflects written data"); + // Wait until that data is seen by the master + TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= initialSize; + } + }); + + // Make sure we see the final quota usage size + waitForStableQuotaSize(conn, tn, null); + + // The actual size on disk after we wrote our data the first time + final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, tn).getUsage(); + LOG.info("Initial table size was " + actualInitialSize); + + LOG.info("Snapshot the table"); + final String snapshot1 = tn.toString() + "_snapshot1"; + admin.snapshot(snapshot1, tn); + + // Write the same data again, then flush+compact. This should make sure that + // the snapshot is referencing files that the table no longer references. + LOG.info("Write more data"); + helper.writeData(tn, initialSize); + LOG.info("Flush the table"); + admin.flush(tn); + LOG.info("Synchronously compacting the table"); + TEST_UTIL.compact(tn, true); + + final long upperBound = initialSize + FUDGE_FOR_TABLE_SIZE; + final long lowerBound = initialSize - FUDGE_FOR_TABLE_SIZE; + + // Store the actual size after writing more data and then compacting it down to one file + LOG.info("Waiting for the region reports to reflect the correct size, between (" + + lowerBound + ", " + upperBound + ")"); + TEST_UTIL.waitFor(30 * 1000, 500, new Predicate() { + @Override + public boolean evaluate() throws Exception { + long size = getRegionSizeReportForTable(conn, tn); + return size < upperBound && size > lowerBound; + } + }); + + // Make sure we see the "final" new size for the table, not some intermediate + waitForStableRegionSizeReport(conn, tn); + final long finalSize = getRegionSizeReportForTable(conn, tn); + assertNotNull("Did not expect to see a null size", finalSize); + LOG.info("Last seen size: " + finalSize); + + // Make sure the QuotaObserverChore has time to reflect the new region size reports + // (we saw above). The usage of the table should *not* decrease when we check it below, + // though, because the snapshot on our table will cause the table to "retain" the size. + TEST_UTIL.waitFor(20 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override + public boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= finalSize; + } + }); + + // The final usage should be the sum of the initial size (referenced by the snapshot) and the + // new size we just wrote above. + long expectedFinalSize = actualInitialSize + finalSize; + LOG.info( + "Expecting table usage to be " + actualInitialSize + " + " + finalSize + + " = " + expectedFinalSize); + // The size of the table (WRT quotas) should now be approximately double what it was previously + TEST_UTIL.waitFor(30 * 1000, 1000, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + LOG.debug("Checking for " + expectedFinalSize + " == " + snapshot.getUsage()); + return expectedFinalSize == snapshot.getUsage(); + } + }); + } + + @Test + public void testNamespacesInheritSnapshotSize() throws Exception { + String ns = helper.createNamespace().getName(); + TableName tn = helper.createTableWithRegions(ns, 1); + LOG.info("Writing data"); + // Set a quota + QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace( + ns, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); + admin.setQuota(settings); + + // Write some data and flush it to disk + final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; + helper.writeData(tn, initialSize); + admin.flush(tn); + + LOG.info("Waiting until namespace size reflects written data"); + // Wait until that data is seen by the master + TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, ns) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= initialSize; + } + }); + + // Make sure we see the "final" new size for the table, not some intermediate + waitForStableQuotaSize(conn, null, ns); + + // The actual size on disk after we wrote our data the first time + final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, ns).getUsage(); + LOG.info("Initial table size was " + actualInitialSize); + + LOG.info("Snapshot the table"); + final String snapshot1 = tn.getQualifierAsString() + "_snapshot1"; + admin.snapshot(snapshot1, tn); + + // Write the same data again, then flush+compact. This should make sure that + // the snapshot is referencing files that the table no longer references. + LOG.info("Write more data"); + helper.writeData(tn, initialSize); + LOG.info("Flush the table"); + admin.flush(tn); + LOG.info("Synchronously compacting the table"); + TEST_UTIL.compact(tn, true); + + final long upperBound = initialSize + FUDGE_FOR_TABLE_SIZE; + final long lowerBound = initialSize - FUDGE_FOR_TABLE_SIZE; + + LOG.info("Waiting for the region reports to reflect the correct size, between (" + + lowerBound + ", " + upperBound + ")"); + TEST_UTIL.waitFor(30 * 1000, 500, new Predicate() { + @Override + public boolean evaluate() throws Exception { + Map sizes = QuotaTableUtil.getMasterReportedTableSizes(conn); + LOG.debug("Master observed table sizes from region size reports: " + sizes); + Long size = sizes.get(tn); + if (null == size) { + return false; + } + return size < upperBound && size > lowerBound; + } + }); + + // Make sure we see the "final" new size for the table, not some intermediate + waitForStableRegionSizeReport(conn, tn); + final long finalSize = getRegionSizeReportForTable(conn, tn); + assertNotNull("Did not expect to see a null size", finalSize); + LOG.info("Final observed size of table: " + finalSize); + + // Make sure the QuotaObserverChore has time to reflect the new region size reports + // (we saw above). The usage of the table should *not* decrease when we check it below, + // though, because the snapshot on our table will cause the table to "retain" the size. + TEST_UTIL.waitFor(20 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, ns) { + @Override + public boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= finalSize; + } + }); + + // The final usage should be the sum of the initial size (referenced by the snapshot) and the + // new size we just wrote above. + long expectedFinalSize = actualInitialSize + finalSize; + LOG.info( + "Expecting namespace usage to be " + actualInitialSize + " + " + finalSize + + " = " + expectedFinalSize); + // The size of the table (WRT quotas) should now be approximately double what it was previously + TEST_UTIL.waitFor(30 * 1000, 1000, new SpaceQuotaSnapshotPredicate(conn, ns) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + LOG.debug("Checking for " + expectedFinalSize + " == " + snapshot.getUsage()); + return expectedFinalSize == snapshot.getUsage(); + } + }); + } + + @Test + public void testTablesWithSnapshots() throws Exception { + final Connection conn = TEST_UTIL.getConnection(); + final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS; + final TableName tn = helper.createTableWithRegions(10); + + // 3MB limit on the table + final long tableLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE; + TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy)); + + LOG.info("Writing first data set"); + // Write more data than should be allowed and flush it to disk + helper.writeData(tn, 1L * SpaceQuotaHelperForTests.ONE_MEGABYTE, "q1"); + + LOG.info("Creating snapshot"); + TEST_UTIL.getAdmin().snapshot(tn.toString() + "snap1", tn, SnapshotType.FLUSH); + + LOG.info("Writing second data set"); + // Write some more data + helper.writeData(tn, 1L * SpaceQuotaHelperForTests.ONE_MEGABYTE, "q2"); + + LOG.info("Flushing and major compacting table"); + // Compact the table to force the snapshot to own all of its files + TEST_UTIL.getAdmin().flush(tn); + TEST_UTIL.compact(tn, true); + + LOG.info("Checking for quota violation"); + // Wait to observe the quota moving into violation + TEST_UTIL.waitFor(60_000, 1_000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + Scan s = QuotaTableUtil.makeQuotaSnapshotScanForTable(tn); + try (Table t = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + ResultScanner rs = t.getScanner(s); + try { + Result r = Iterables.getOnlyElement(rs); + CellScanner cs = r.cellScanner(); + assertTrue(cs.advance()); + Cell c = cs.current(); + SpaceQuotaSnapshot snapshot = SpaceQuotaSnapshot.toSpaceQuotaSnapshot( + QuotaProtos.SpaceQuotaSnapshot.parseFrom( + UnsafeByteOperations.unsafeWrap( + c.getValueArray(), c.getValueOffset(), c.getValueLength()))); + LOG.info( + snapshot.getUsage() + "/" + snapshot.getLimit() + " " + snapshot.getQuotaStatus()); + // We expect to see the table move to violation + return snapshot.getQuotaStatus().isInViolation(); + } finally { + if (null != rs) { + rs.close(); + } + } + } + } + }); + } + + @Test + public void testRematerializedTablesDoNoInheritSpace() throws Exception { + TableName tn = helper.createTableWithRegions(1); + TableName tn2 = helper.getNextTableName(); + LOG.info("Writing data"); + // Set a quota on both tables + QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( + tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); + admin.setQuota(settings); + QuotaSettings settings2 = QuotaSettingsFactory.limitTableSpace( + tn2, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); + admin.setQuota(settings2); + // Write some data + final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; + helper.writeData(tn, initialSize); + + LOG.info("Waiting until table size reflects written data"); + // Wait until that data is seen by the master + TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= initialSize; + } + }); + + // Make sure we see the final quota usage size + waitForStableQuotaSize(conn, tn, null); + + // The actual size on disk after we wrote our data the first time + final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, tn).getUsage(); + LOG.info("Initial table size was " + actualInitialSize); + + LOG.info("Snapshot the table"); + final String snapshot1 = tn.toString() + "_snapshot1"; + admin.snapshot(snapshot1, tn); + + admin.cloneSnapshot(snapshot1, tn2); + + // Write some more data to the first table + helper.writeData(tn, initialSize, "q2"); + admin.flush(tn); + + // Watch the usage of the first table with some more data to know when the new + // region size reports were sent to the master + TEST_UTIL.waitFor(30_000, 1_000, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override + boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= actualInitialSize * 2; + } + }); + + // We know that reports were sent by our RS, verify that they take up zero size. + SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn2); + assertNotNull(snapshot); + assertEquals(0, snapshot.getUsage()); + + // Compact the cloned table to force it to own its own files. + TEST_UTIL.compact(tn2, true); + // After the table is compacted, it should have its own files and be the same size as originally + TEST_UTIL.waitFor(30_000, 1_000, new SpaceQuotaSnapshotPredicate(conn, tn2) { + @Override + boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() == actualInitialSize; + } + }); + } + + void waitForStableQuotaSize(Connection conn, TableName tn, String ns) throws Exception { + // For some stability in the value before proceeding + // Helps make sure that we got the actual last value, not some inbetween + AtomicLong lastValue = new AtomicLong(-1); + AtomicInteger counter = new AtomicInteger(0); + TEST_UTIL.waitFor(15_000, 500, new SpaceQuotaSnapshotPredicate(conn, tn, ns) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + LOG.debug("Last observed size=" + lastValue.get()); + if (snapshot.getUsage() == lastValue.get()) { + int numMatches = counter.incrementAndGet(); + if (numMatches >= 5) { + return true; + } + // Not yet.. + return false; + } + counter.set(0); + lastValue.set(snapshot.getUsage()); + return false; + } + }); + } + + long getRegionSizeReportForTable(Connection conn, TableName tn) throws IOException { + Map sizes = QuotaTableUtil.getMasterReportedTableSizes(conn); + Long value = sizes.get(tn); + if (null == value) { + return 0L; + } + return value.longValue(); + } + + void waitForStableRegionSizeReport(Connection conn, TableName tn) throws Exception { + // For some stability in the value before proceeding + // Helps make sure that we got the actual last value, not some inbetween + AtomicLong lastValue = new AtomicLong(-1); + AtomicInteger counter = new AtomicInteger(0); + TEST_UTIL.waitFor(15_000, 500, new Predicate() { + @Override public boolean evaluate() throws Exception { + LOG.debug("Last observed size=" + lastValue.get()); + long actual = getRegionSizeReportForTable(conn, tn); + if (actual == lastValue.get()) { + int numMatches = counter.incrementAndGet(); + if (numMatches >= 5) { + return true; + } + // Not yet.. + return false; + } + counter.set(0); + lastValue.set(actual); + return false; + } + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSuperUserQuotaPermissions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSuperUserQuotaPermissions.java index 812e7e0c2b2..3a60cbbd196 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSuperUserQuotaPermissions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSuperUserQuotaPermissions.java @@ -76,13 +76,7 @@ public class TestSuperUserQuotaPermissions { public static void setupMiniCluster() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); // Increase the frequency of some of the chores for responsiveness of the test - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName()); conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java index cefed67d9f3..5a4969a4a69 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java @@ -23,6 +23,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -30,6 +32,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; @@ -87,7 +93,8 @@ public class TestTableQuotaViolationStore { } @Test - public void testTargetViolationState() { + public void testTargetViolationState() throws IOException { + mockNoSnapshotSizes(); TableName tn1 = TableName.valueOf("violation1"); TableName tn2 = TableName.valueOf("observance1"); TableName tn3 = TableName.valueOf("observance2"); @@ -154,4 +161,12 @@ public class TestTableQuotaViolationStore { quotaRef.set(quotaWithoutSpace); assertNull(mockStore.getSpaceQuota(TableName.valueOf("foo"))); } + + void mockNoSnapshotSizes() throws IOException { + Table quotaTable = mock(Table.class); + ResultScanner scanner = mock(ResultScanner.class); + when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); + when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner); + when(scanner.iterator()).thenReturn(Collections. emptyList().iterator()); + } }