HBASE-17568 Better handle stale/missing region size reports

* Expire region reports in the master after a timeout.
* Move regions in violation out of violation when insufficient
    region size reports are observed.
This commit is contained in:
Josh Elser 2017-02-03 16:33:47 -05:00
parent 8159eae781
commit 91b4d2e827
5 changed files with 412 additions and 12 deletions

View File

@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.Snapshot
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
@ -1942,8 +1943,9 @@ public class MasterRpcServices extends RSRpcServices
return RegionSpaceUseReportResponse.newBuilder().build();
}
MasterQuotaManager quotaManager = this.master.getMasterQuotaManager();
final long now = EnvironmentEdgeManager.currentTime();
for (RegionSpaceUse report : request.getSpaceUseList()) {
quotaManager.addRegionSize(HRegionInfo.convert(report.getRegion()), report.getSize());
quotaManager.addRegionSize(HRegionInfo.convert(report.getRegion()), report.getSize(), now);
}
return RegionSpaceUseReportResponse.newBuilder().build();
} catch (Exception e) {

View File

@ -22,9 +22,12 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -47,6 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.ThrottleRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
import com.google.common.annotations.VisibleForTesting;
/**
* Master Quota Manager.
* It is responsible for initialize the quota table on the first-run and
@ -68,7 +73,7 @@ public class MasterQuotaManager implements RegionStateListener {
private NamedLock<String> userLocks;
private boolean enabled = false;
private NamespaceAuditor namespaceQuotaManager;
private ConcurrentHashMap<HRegionInfo, Long> regionSizes;
private ConcurrentHashMap<HRegionInfo, SizeSnapshotWithTimestamp> regionSizes;
public MasterQuotaManager(final MasterServices masterServices) {
this.masterServices = masterServices;
@ -531,21 +536,88 @@ public class MasterQuotaManager implements RegionStateListener {
}
}
public void addRegionSize(HRegionInfo hri, long size) {
/**
* Holds the size of a region at the given time, millis since the epoch.
*/
private static class SizeSnapshotWithTimestamp {
private final long size;
private final long time;
public SizeSnapshotWithTimestamp(long size, long time) {
this.size = size;
this.time = time;
}
public long getSize() {
return size;
}
public long getTime() {
return time;
}
public boolean equals(Object o) {
if (o instanceof SizeSnapshotWithTimestamp) {
SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
return size == other.size && time == other.time;
}
return false;
}
public int hashCode() {
HashCodeBuilder hcb = new HashCodeBuilder();
return hcb.append(size).append(time).toHashCode();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(32);
sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, ");
sb.append("time=").append(time).append("}");
return sb.toString();
}
}
@VisibleForTesting
void initializeRegionSizes() {
assert null == regionSizes;
this.regionSizes = new ConcurrentHashMap<>();
}
public void addRegionSize(HRegionInfo hri, long size, long time) {
if (null == regionSizes) {
return;
}
// TODO Make proper API?
// TODO Prevent from growing indefinitely
regionSizes.put(hri, size);
regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time));
}
public Map<HRegionInfo, Long> snapshotRegionSizes() {
if (null == regionSizes) {
return EMPTY_MAP;
}
// TODO Make proper API?
return new HashMap<>(regionSizes);
Map<HRegionInfo, Long> copy = new HashMap<>();
for (Entry<HRegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) {
copy.put(entry.getKey(), entry.getValue().getSize());
}
return copy;
}
int pruneEntriesOlderThan(long timeToPruneBefore) {
if (null == regionSizes) {
return 0;
}
int numEntriesRemoved = 0;
Iterator<Entry<HRegionInfo,SizeSnapshotWithTimestamp>> iterator =
regionSizes.entrySet().iterator();
while (iterator.hasNext()) {
long currentEntryTime = iterator.next().getValue().getTime();
if (currentEntryTime < timeToPruneBefore) {
iterator.remove();
numEntriesRemoved++;
}
}
return numEntriesRemoved;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
@ -69,6 +70,11 @@ public class QuotaObserverChore extends ScheduledChore {
"hbase.master.quotas.observer.report.percent";
static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
static final String REGION_REPORT_RETENTION_DURATION_KEY =
"hbase.master.quotas.region.report.retention.millis";
static final long REGION_REPORT_RETENTION_DURATION_DEFAULT =
1000 * 60 * 10; // 10 minutes
private final Connection conn;
private final Configuration conf;
private final MasterQuotaManager quotaManager;
@ -83,6 +89,9 @@ public class QuotaObserverChore extends ScheduledChore {
private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots;
private final Map<String,SpaceQuotaSnapshot> namespaceQuotaSnapshots;
// The time, in millis, that region reports should be kept by the master
private final long regionReportLifetimeMillis;
/*
* Encapsulates logic for tracking the state of a table/namespace WRT space quotas
*/
@ -108,6 +117,8 @@ public class QuotaObserverChore extends ScheduledChore {
this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
this.tableQuotaSnapshots = new HashMap<>();
this.namespaceQuotaSnapshots = new HashMap<>();
this.regionReportLifetimeMillis = conf.getLong(
REGION_REPORT_RETENTION_DURATION_KEY, REGION_REPORT_RETENTION_DURATION_DEFAULT);
}
@Override
@ -136,18 +147,40 @@ public class QuotaObserverChore extends ScheduledChore {
LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports");
}
// Remove the "old" region reports
pruneOldRegionReports();
// Create the stores to track table and namespace snapshots
initializeSnapshotStores(reportedRegionSpaceUse);
// Filter out tables for which we don't have adequate regionspace reports yet.
// Important that we do this after we instantiate the stores above
tablesWithQuotas.filterInsufficientlyReportedTables(tableSnapshotStore);
// This gives us a set of Tables which may or may not be violating their quota.
// To be safe, we want to make sure that these are not in violation.
Set<TableName> tablesInLimbo = tablesWithQuotas.filterInsufficientlyReportedTables(
tableSnapshotStore);
if (LOG.isTraceEnabled()) {
LOG.trace("Filtered insufficiently reported tables, left with " +
reportedRegionSpaceUse.size() + " regions reported");
}
for (TableName tableInLimbo : tablesInLimbo) {
final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
if (currentSnapshot.getQuotaStatus().isInViolation()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
+ " reported than required.");
}
SpaceQuotaSnapshot targetSnapshot = new SpaceQuotaSnapshot(
SpaceQuotaStatus.notInViolation(), currentSnapshot.getUsage(),
currentSnapshot.getLimit());
this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
// Update it in the Table QuotaStore so that memory is consistent with no violation.
tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
}
}
// Transition each table to/from quota violation based on the current and target state.
// Only table quotas are enacted.
final Set<TableName> tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables();
@ -346,6 +379,19 @@ public class QuotaObserverChore extends ScheduledChore {
}
}
/**
* Removes region reports over a certain age.
*/
void pruneOldRegionReports() {
final long now = EnvironmentEdgeManager.currentTime();
final long pruneTime = now - regionReportLifetimeMillis;
final int numRemoved = quotaManager.pruneEntriesOlderThan(pruneTime);
if (LOG.isTraceEnabled()) {
LOG.trace("Removed " + numRemoved + " old region size reports that were older than "
+ pruneTime + ".");
}
}
/**
* Computes the set of all tables that have quotas defined. This includes tables with quotas
* explicitly set on them, in addition to tables that exist namespaces which have a quota
@ -577,8 +623,8 @@ public class QuotaObserverChore extends ScheduledChore {
* Filters out all tables for which the Master currently doesn't have enough region space
* reports received from RegionServers yet.
*/
public void filterInsufficientlyReportedTables(QuotaSnapshotStore<TableName> tableStore)
throws IOException {
public Set<TableName> filterInsufficientlyReportedTables(
QuotaSnapshotStore<TableName> tableStore) throws IOException {
final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
Set<TableName> tablesToRemove = new HashSet<>();
for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) {
@ -612,6 +658,7 @@ public class QuotaObserverChore extends ScheduledChore {
tablesWithTableQuotas.remove(tableToRemove);
tablesWithNamespaceQuotas.remove(tableToRemove);
}
return tablesToRemove;
}
/**

View File

@ -16,9 +16,13 @@
*/
package org.apache.hadoop.hbase.quotas;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
@ -31,7 +35,49 @@ public class TestMasterQuotaManager {
public void testUninitializedQuotaManangerDoesNotFail() {
MasterServices masterServices = mock(MasterServices.class);
MasterQuotaManager manager = new MasterQuotaManager(masterServices);
manager.addRegionSize(null, 0);
manager.addRegionSize(null, 0, 0);
assertNotNull(manager.snapshotRegionSizes());
}
@Test
public void testOldEntriesRemoved() {
MasterServices masterServices = mock(MasterServices.class);
MasterQuotaManager manager = new MasterQuotaManager(masterServices);
manager.initializeRegionSizes();
// Mock out some regions
TableName tableName = TableName.valueOf("foo");
HRegionInfo region1 = new HRegionInfo(tableName, null, toBytes("a"));
HRegionInfo region2 = new HRegionInfo(tableName, toBytes("a"), toBytes("b"));
HRegionInfo region3 = new HRegionInfo(tableName, toBytes("b"), toBytes("c"));
HRegionInfo region4 = new HRegionInfo(tableName, toBytes("c"), toBytes("d"));
HRegionInfo region5 = new HRegionInfo(tableName, toBytes("d"), null);
final long size = 0;
long time1 = 10;
manager.addRegionSize(region1, size, time1);
manager.addRegionSize(region2, size, time1);
long time2 = 20;
manager.addRegionSize(region3, size, time2);
manager.addRegionSize(region4, size, time2);
long time3 = 30;
manager.addRegionSize(region5, size, time3);
assertEquals(5, manager.snapshotRegionSizes().size());
// Prune nothing
assertEquals(0, manager.pruneEntriesOlderThan(0));
assertEquals(5, manager.snapshotRegionSizes().size());
assertEquals(0, manager.pruneEntriesOlderThan(10));
assertEquals(5, manager.snapshotRegionSizes().size());
// Prune the elements at time1
assertEquals(2, manager.pruneEntriesOlderThan(15));
assertEquals(3, manager.snapshotRegionSizes().size());
// Prune the elements at time2
assertEquals(2, manager.pruneEntriesOlderThan(30));
assertEquals(1, manager.snapshotRegionSizes().size());
}
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* A test case to verify that region reports are expired when they are not sent.
*/
@Category(LargeTests.class)
public class TestQuotaObserverChoreRegionReports {
private static final Log LOG = LogFactory.getLog(TestQuotaObserverChoreRegionReports.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule
public TestName testName = new TestName();
@Before
public void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.setInt(QuotaObserverChore.REGION_REPORT_RETENTION_DURATION_KEY, 1000);
}
@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testReportExpiration() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Send reports every 30 seconds
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 25000);
// Expire the reports after 5 seconds
conf.setInt(QuotaObserverChore.REGION_REPORT_RETENTION_DURATION_KEY, 5000);
TEST_UTIL.startMiniCluster(1);
final String FAM1 = "f1";
final HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
// Wait for the master to finish initialization.
while (null == master.getMasterQuotaManager()) {
LOG.debug("MasterQuotaManager is null, waiting...");
Thread.sleep(500);
}
final MasterQuotaManager quotaManager = master.getMasterQuotaManager();
// Create a table
final TableName tn = TableName.valueOf("reportExpiration");
HTableDescriptor tableDesc = new HTableDescriptor(tn);
tableDesc.addFamily(new HColumnDescriptor(FAM1));
TEST_UTIL.getAdmin().createTable(tableDesc);
// No reports right after we created this table.
assertEquals(0, getRegionReportsForTable(quotaManager.snapshotRegionSizes(), tn));
// Set a quota
final long sizeLimit = 100L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
TEST_UTIL.getAdmin().setQuota(settings);
// We should get one report for the one region we have.
Waiter.waitFor(TEST_UTIL.getConfiguration(), 45000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
int numReports = getRegionReportsForTable(quotaManager.snapshotRegionSizes(), tn);
LOG.debug("Saw " + numReports + " reports for " + tn + " while waiting for 1");
return numReports == 1;
}
});
// We should then see no reports for the single region
Waiter.waitFor(TEST_UTIL.getConfiguration(), 15000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
int numReports = getRegionReportsForTable(quotaManager.snapshotRegionSizes(), tn);
LOG.debug("Saw " + numReports + " reports for " + tn + " while waiting for none");
return numReports == 0;
}
});
}
@Test
public void testMissingReportsRemovesQuota() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Expire the reports after 5 seconds
conf.setInt(QuotaObserverChore.REGION_REPORT_RETENTION_DURATION_KEY, 5000);
TEST_UTIL.startMiniCluster(1);
final String FAM1 = "f1";
// Create a table
final TableName tn = TableName.valueOf("quotaAcceptanceWithoutReports");
HTableDescriptor tableDesc = new HTableDescriptor(tn);
tableDesc.addFamily(new HColumnDescriptor(FAM1));
TEST_UTIL.getAdmin().createTable(tableDesc);
// Set a quota
final long sizeLimit = 1L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
final Admin admin = TEST_UTIL.getAdmin();
admin.setQuota(settings);
final Connection conn = TEST_UTIL.getConnection();
// Write enough data to invalidate the quota
Put p = new Put(Bytes.toBytes("row1"));
byte[] bytes = new byte[10];
Arrays.fill(bytes, (byte) 2);
for (int i = 0; i < 200; i++) {
p.addColumn(Bytes.toBytes(FAM1), Bytes.toBytes("qual" + i), bytes);
}
conn.getTable(tn).put(p);
admin.flush(tn);
// Wait for the table to move into violation
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
SpaceQuotaSnapshot snapshot = getSnapshotForTable(conn, tn);
if (null == snapshot) {
return false;
}
return snapshot.getQuotaStatus().isInViolation();
}
});
// Close the region, prevent the server from sending new status reports.
List<HRegionInfo> regions = admin.getTableRegions(tn);
assertEquals(1, regions.size());
HRegionInfo hri = regions.get(0);
admin.closeRegion(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), hri);
// We should see this table move out of violation after the report expires.
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
SpaceQuotaSnapshot snapshot = getSnapshotForTable(conn, tn);
if (null == snapshot) {
return false;
}
return !snapshot.getQuotaStatus().isInViolation();
}
});
// The QuotaObserverChore's memory should also show it not in violation.
final HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
QuotaSnapshotStore<TableName> tableStore =
master.getQuotaObserverChore().getTableSnapshotStore();
SpaceQuotaSnapshot snapshot = tableStore.getCurrentState(tn);
assertFalse("Quota should not be in violation", snapshot.getQuotaStatus().isInViolation());
}
private SpaceQuotaSnapshot getSnapshotForTable(
Connection conn, TableName tn) throws IOException {
try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME);
ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
Map<TableName,SpaceQuotaSnapshot> activeViolations = new HashMap<>();
for (Result result : scanner) {
try {
QuotaTableUtil.extractQuotaSnapshot(result, activeViolations);
} catch (IllegalArgumentException e) {
final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
LOG.error(msg, e);
throw new IOException(msg, e);
}
}
return activeViolations.get(tn);
}
}
private int getRegionReportsForTable(Map<HRegionInfo,Long> reports, TableName tn) {
int numReports = 0;
for (Entry<HRegionInfo,Long> entry : reports.entrySet()) {
if (tn.equals(entry.getKey().getTable())) {
numReports++;
}
}
return numReports;
}
}