HBASE-22460 : Reopen regions with very high Store Ref Counts (#600)

Signed-off-by Anoop Sam John <anoopsamjohn@apache.org>
This commit is contained in:
Viraj Jasani 2019-10-23 21:16:47 +05:30 committed by Anoop Sam John
parent 2ad62b0162
commit 14dcf1d0c6
21 changed files with 1077 additions and 4 deletions

View File

@ -154,4 +154,10 @@ public interface RegionMetrics {
* @return the reference count for the stores of this region
*/
int getStoreRefCount();
/**
* @return the max reference count for any store file among all stores files
* of this region
*/
int getMaxStoreFileRefCount();
}

View File

@ -66,6 +66,7 @@ public final class RegionMetricsBuilder {
.setStoreCount(regionLoadPB.getStores())
.setStoreFileCount(regionLoadPB.getStorefiles())
.setStoreRefCount(regionLoadPB.getStoreRefCount())
.setMaxStoreFileRefCount(regionLoadPB.getMaxStoreFileRefCount())
.setStoreFileSize(new Size(regionLoadPB.getStorefileSizeMB(), Size.Unit.MEGABYTE))
.setStoreSequenceIds(regionLoadPB.getStoreCompleteSequenceIdList().stream()
.collect(Collectors.toMap(
@ -113,6 +114,7 @@ public final class RegionMetricsBuilder {
.setStores(regionMetrics.getStoreCount())
.setStorefiles(regionMetrics.getStoreFileCount())
.setStoreRefCount(regionMetrics.getStoreRefCount())
.setMaxStoreFileRefCount(regionMetrics.getMaxStoreFileRefCount())
.setStorefileSizeMB((int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE))
.addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId()))
.setStoreUncompressedSizeMB(
@ -128,6 +130,7 @@ public final class RegionMetricsBuilder {
private int storeCount;
private int storeFileCount;
private int storeRefCount;
private int maxStoreFileRefCount;
private long compactingCellCount;
private long compactedCellCount;
private Size storeFileSize = Size.ZERO;
@ -161,6 +164,10 @@ public final class RegionMetricsBuilder {
this.storeRefCount = value;
return this;
}
public RegionMetricsBuilder setMaxStoreFileRefCount(int value) {
this.maxStoreFileRefCount = value;
return this;
}
public RegionMetricsBuilder setCompactingCellCount(long value) {
this.compactingCellCount = value;
return this;
@ -235,6 +242,7 @@ public final class RegionMetricsBuilder {
storeCount,
storeFileCount,
storeRefCount,
maxStoreFileRefCount,
compactingCellCount,
compactedCellCount,
storeFileSize,
@ -259,6 +267,7 @@ public final class RegionMetricsBuilder {
private final int storeCount;
private final int storeFileCount;
private final int storeRefCount;
private final int maxStoreFileRefCount;
private final long compactingCellCount;
private final long compactedCellCount;
private final Size storeFileSize;
@ -280,6 +289,7 @@ public final class RegionMetricsBuilder {
int storeCount,
int storeFileCount,
int storeRefCount,
int maxStoreFileRefCount,
final long compactingCellCount,
long compactedCellCount,
Size storeFileSize,
@ -301,6 +311,7 @@ public final class RegionMetricsBuilder {
this.storeCount = storeCount;
this.storeFileCount = storeFileCount;
this.storeRefCount = storeRefCount;
this.maxStoreFileRefCount = maxStoreFileRefCount;
this.compactingCellCount = compactingCellCount;
this.compactedCellCount = compactedCellCount;
this.storeFileSize = Preconditions.checkNotNull(storeFileSize);
@ -340,6 +351,11 @@ public final class RegionMetricsBuilder {
return storeRefCount;
}
@Override
public int getMaxStoreFileRefCount() {
return maxStoreFileRefCount;
}
@Override
public Size getStoreFileSize() {
return storeFileSize;
@ -433,6 +449,8 @@ public final class RegionMetricsBuilder {
this.getStoreFileCount());
Strings.appendKeyValue(sb, "storeRefCount",
this.getStoreRefCount());
Strings.appendKeyValue(sb, "maxStoreFileRefCount",
this.getMaxStoreFileRefCount());
Strings.appendKeyValue(sb, "uncompressedStoreFileSize",
this.getUncompressedStoreFileSize());
Strings.appendKeyValue(sb, "lastMajorCompactionTimestamp",

View File

@ -343,6 +343,8 @@ public final class ServerMetricsBuilder {
public String toString() {
int storeCount = 0;
int storeFileCount = 0;
int storeRefCount = 0;
int maxStoreFileRefCount = 0;
long uncompressedStoreFileSizeMB = 0;
long storeFileSizeMB = 0;
long memStoreSizeMB = 0;
@ -358,6 +360,9 @@ public final class ServerMetricsBuilder {
for (RegionMetrics r : getRegionMetrics().values()) {
storeCount += r.getStoreCount();
storeFileCount += r.getStoreFileCount();
storeRefCount += r.getStoreRefCount();
int currentMaxStoreFileRefCount = r.getMaxStoreFileRefCount();
maxStoreFileRefCount = Math.max(maxStoreFileRefCount, currentMaxStoreFileRefCount);
uncompressedStoreFileSizeMB += r.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE);
storeFileSizeMB += r.getStoreFileSize().get(Size.Unit.MEGABYTE);
memStoreSizeMB += r.getMemStoreSize().get(Size.Unit.MEGABYTE);
@ -379,6 +384,8 @@ public final class ServerMetricsBuilder {
Strings.appendKeyValue(sb, "maxHeapMB", getMaxHeapSize());
Strings.appendKeyValue(sb, "numberOfStores", storeCount);
Strings.appendKeyValue(sb, "numberOfStorefiles", storeFileCount);
Strings.appendKeyValue(sb, "storeRefCount", storeRefCount);
Strings.appendKeyValue(sb, "maxStoreFileRefCount", maxStoreFileRefCount);
Strings.appendKeyValue(sb, "storefileUncompressedSizeMB", uncompressedStoreFileSizeMB);
Strings.appendKeyValue(sb, "storefileSizeMB", storeFileSizeMB);
if (uncompressedStoreFileSizeMB != 0) {

View File

@ -1474,6 +1474,13 @@ public final class HConstants {
// User defined Default TTL config key
public static final String DEFAULT_SNAPSHOT_TTL_CONFIG_KEY = "hbase.master.snapshot.ttl";
// Regions Recovery based on high storeFileRefCount threshold value
public static final String STORE_FILE_REF_COUNT_THRESHOLD =
"hbase.regions.recovery.store.file.ref.count";
// default -1 indicates there is no threshold on high storeRefCount
public static final int DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD = -1;
/**
* Configurations for master executor services.
*/

View File

@ -1901,4 +1901,33 @@ possible configurations would overwhelm and obscure the important.
automatically deleted until it is manually deleted
</description>
</property>
<property>
<name>hbase.master.regions.recovery.check.interval</name>
<value>1200000</value>
<description>
Regions Recovery Chore interval in milliseconds.
This chore keeps running at this interval to
find all regions with configurable max store file ref count
and reopens them.
</description>
</property>
<property>
<name>hbase.regions.recovery.store.file.ref.count</name>
<value>-1</value>
<description>
Very large ref count on a file indicates
that it is a ref leak on that object. Such files
can not be removed even after it is invalidated
via compaction. Only way to recover in such
scenario is to reopen the region which can
release all resources, like the refcount, leases, etc.
This config represents Store files Ref Count threshold
value considered for reopening regions.
Any region with store files ref count > this value
would be eligible for reopening by master.
Default value -1 indicates this feature is turned off.
Only positive integer value should be provided to enable
this feature.
</description>
</property>
</configuration>

View File

@ -233,6 +233,7 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String STOREFILE_COUNT_DESC = "Number of Store Files";
String STORE_REF_COUNT = "storeRefCount";
String STORE_REF_COUNT_DESC = "Store reference count";
String MAX_STORE_FILE_REF_COUNT = "maxStoreFileRefCount";
String MEMSTORE_SIZE = "memStoreSize";
String MEMSTORE_SIZE_DESC = "Size of the memstore";
String STOREFILE_SIZE = "storeFileSize";

View File

@ -164,4 +164,10 @@ public interface MetricsRegionWrapper {
* @return the number of references active on the store
*/
long getStoreRefCount();
/**
* @return the max number of references active on any store file among
* all store files that belong to this region
*/
long getMaxStoreFileRefCount();
}

View File

@ -217,6 +217,10 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
regionNamePrefix + MetricsRegionServerSource.STORE_REF_COUNT,
MetricsRegionServerSource.STORE_REF_COUNT),
this.regionWrapper.getStoreRefCount());
mrb.addGauge(Interns.info(
regionNamePrefix + MetricsRegionServerSource.MAX_STORE_FILE_REF_COUNT,
MetricsRegionServerSource.MAX_STORE_FILE_REF_COUNT),
this.regionWrapper.getMaxStoreFileRefCount());
mrb.addGauge(Interns.info(
regionNamePrefix + MetricsRegionServerSource.MEMSTORE_SIZE,
MetricsRegionServerSource.MEMSTORE_SIZE_DESC),

View File

@ -99,6 +99,11 @@ public class TestMetricsRegionSourceImpl {
return 0;
}
@Override
public long getMaxStoreFileRefCount() {
return 0;
}
@Override
public long getMemStoreSize() {
return 0;

View File

@ -149,6 +149,12 @@ message RegionLoad {
/** the number of references active on the store */
optional int32 store_ref_count = 21 [default = 0];
/**
* The max number of references active on single store file among all store files
* that belong to given region
*/
optional int32 max_store_file_ref_count = 22 [default = 0];
}
/* Server-level protobufs */

View File

@ -145,6 +145,12 @@ message RegionLoad {
/** the number of references active on the store */
optional int32 store_ref_count = 21 [default = 0];
/**
* The max number of references active on single store file among all store files
* that belong to given region
*/
optional int32 max_store_file_ref_count = 22 [default = 0];
}
/* Server-level protobufs */

View File

@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
@ -421,6 +422,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// monitor for distributed procedures
private MasterProcedureManagerHost mpmHost;
private RegionsRecoveryChore regionsRecoveryChore = null;
// it is assigned after 'initialized' guard set to true, so should be volatile
private volatile MasterQuotaManager quotaManager;
private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
@ -1464,6 +1466,20 @@ public class HMaster extends HRegionServer implements MasterServices {
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);
// Regions Reopen based on very high storeFileRefCount is considered enabled
// only if hbase.regions.recovery.store.file.ref.count has value > 0
final int maxStoreFileRefCount = conf.getInt(
HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
if (maxStoreFileRefCount > 0) {
this.regionsRecoveryChore = new RegionsRecoveryChore(this, conf, this);
getChoreService().scheduleChore(this.regionsRecoveryChore);
} else {
LOG.info("Reopening regions with very high storeFileRefCount is disabled. " +
"Provide threshold value > 0 for {} to enable it.",
HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
}
replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
replicationPeerManager);
getChoreService().scheduleChore(replicationBarrierCleaner);
@ -1631,6 +1647,7 @@ public class HMaster extends HRegionServer implements MasterServices {
choreService.cancelChore(this.replicationBarrierCleaner);
choreService.cancelChore(this.snapshotCleanerChore);
choreService.cancelChore(this.hbckChore);
choreService.cancelChore(this.regionsRecoveryChore);
}
}
@ -3732,6 +3749,38 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
/**
* Reopen regions provided in the argument
*
* @param tableName The current table name
* @param regionNames The region names of the regions to reopen
* @param nonceGroup Identifier for the source of the request, a client or process
* @param nonce A unique identifier for this operation from the client or process identified by
* <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
* @return procedure Id
* @throws IOException if reopening region fails while running procedure
*/
long reopenRegions(final TableName tableName, final List<byte[]> regionNames,
final long nonceGroup, final long nonce)
throws IOException {
return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
submitProcedure(new ReopenTableRegionsProcedure(tableName, regionNames));
}
@Override
protected String getDescription() {
return "ReopenTableRegionsProcedure";
}
});
}
@Override
public ReplicationPeerManager getReplicationPeerManager() {
return replicationPeerManager;

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
/**
* This chore, every time it runs, will try to recover regions with high store ref count
* by reopening them
*/
@InterfaceAudience.Private
public class RegionsRecoveryChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(RegionsRecoveryChore.class);
private static final String REGIONS_RECOVERY_CHORE_NAME = "RegionsRecoveryChore";
private static final String REGIONS_RECOVERY_INTERVAL =
"hbase.master.regions.recovery.check.interval";
private static final int DEFAULT_REGIONS_RECOVERY_INTERVAL = 1200 * 1000; // Default 20 min ?
private static final String ERROR_REOPEN_REIONS_MSG =
"Error reopening regions with high storeRefCount. ";
private final HMaster hMaster;
private final int storeFileRefCountThreshold;
private static final PerClientRandomNonceGenerator NONCE_GENERATOR =
PerClientRandomNonceGenerator.get();
/**
* Construct RegionsRecoveryChore with provided params
*
* @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
* @param configuration The configuration params to be used
* @param hMaster HMaster instance to initiate RegionTableRegions
*/
RegionsRecoveryChore(final Stoppable stopper, final Configuration configuration,
final HMaster hMaster) {
super(REGIONS_RECOVERY_CHORE_NAME, stopper, configuration.getInt(REGIONS_RECOVERY_INTERVAL,
DEFAULT_REGIONS_RECOVERY_INTERVAL));
this.hMaster = hMaster;
this.storeFileRefCountThreshold = configuration.getInt(
HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
}
@Override
protected void chore() {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Starting up Regions Recovery chore for reopening regions based on storeFileRefCount...");
}
try {
// only if storeFileRefCountThreshold > 0, consider the feature turned on
if (storeFileRefCountThreshold > 0) {
final ClusterMetrics clusterMetrics = hMaster.getClusterMetrics();
final Map<ServerName, ServerMetrics> serverMetricsMap =
clusterMetrics.getLiveServerMetrics();
final Map<TableName, List<byte[]>> tableToReopenRegionsMap =
getTableToRegionsByRefCount(serverMetricsMap);
if (MapUtils.isNotEmpty(tableToReopenRegionsMap)) {
tableToReopenRegionsMap.forEach((tableName, regionNames) -> {
try {
LOG.warn("Reopening regions due to high storeFileRefCount. " +
"TableName: {} , noOfRegions: {}", tableName, regionNames.size());
hMaster.reopenRegions(tableName, regionNames, NONCE_GENERATOR.getNonceGroup(),
NONCE_GENERATOR.newNonce());
} catch (IOException e) {
LOG.error("{} tableName: {}, regionNames: {}", ERROR_REOPEN_REIONS_MSG,
tableName, regionNames, e);
}
});
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Reopening regions with very high storeFileRefCount is disabled. " +
"Provide threshold value > 0 for {} to enable it.",
HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
}
}
} catch (Exception e) {
LOG.error("Error while reopening regions based on storeRefCount threshold", e);
}
if (LOG.isTraceEnabled()) {
LOG.trace(
"Exiting Regions Recovery chore for reopening regions based on storeFileRefCount...");
}
}
private Map<TableName, List<byte[]>> getTableToRegionsByRefCount(
final Map<ServerName, ServerMetrics> serverMetricsMap) {
final Map<TableName, List<byte[]>> tableToReopenRegionsMap = new HashMap<>();
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
// For each region, each store file can have different ref counts
// We need to find maximum of all such ref counts and if that max count
// is beyond a threshold value, we should reopen the region.
// Here, we take max ref count of all store files and not the cumulative
// count of all store files
final int maxStoreFileRefCount = regionMetrics.getMaxStoreFileRefCount();
if (maxStoreFileRefCount > storeFileRefCountThreshold) {
final byte[] regionName = regionMetrics.getRegionName();
prepareTableToReopenRegionsMap(tableToReopenRegionsMap, regionName,
maxStoreFileRefCount);
}
}
}
return tableToReopenRegionsMap;
}
private void prepareTableToReopenRegionsMap(
final Map<TableName, List<byte[]>> tableToReopenRegionsMap,
final byte[] regionName, final int regionStoreRefCount) {
final RegionInfo regionInfo = hMaster.getAssignmentManager().getRegionInfo(regionName);
final TableName tableName = regionInfo.getTable();
if (TableName.isMetaTableName(tableName)) {
// Do not reopen regions of meta table even if it has
// high store file reference count
return;
}
LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>());
tableToReopenRegionsMap.get(tableName).add(regionName);
}
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
@ -29,11 +31,14 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
@ -50,15 +55,27 @@ public class ReopenTableRegionsProcedure
private TableName tableName;
// Specify specific regions of a table to reopen.
// if specified null, all regions of the table will be reopened.
private final List<byte[]> regionNames;
private List<HRegionLocation> regions = Collections.emptyList();
private RetryCounter retryCounter;
public ReopenTableRegionsProcedure() {
regionNames = null;
}
public ReopenTableRegionsProcedure(TableName tableName) {
this.tableName = tableName;
this.regionNames = null;
}
public ReopenTableRegionsProcedure(final TableName tableName,
final List<byte[]> regionNames) {
this.tableName = tableName;
this.regionNames = regionNames;
}
@Override
@ -92,8 +109,9 @@ public class ReopenTableRegionsProcedure
LOG.info("Table {} is disabled, give up reopening its regions", tableName);
return Flow.NO_MORE_STATE;
}
regions =
env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName);
List<HRegionLocation> tableRegions = env.getAssignmentManager()
.getRegionStates().getRegionsOfTableForReopen(tableName);
regions = getRegionLocationsForReopen(tableRegions);
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
@ -149,6 +167,26 @@ public class ReopenTableRegionsProcedure
}
}
private List<HRegionLocation> getRegionLocationsForReopen(
List<HRegionLocation> tableRegionsForReopen) {
List<HRegionLocation> regionsToReopen = new ArrayList<>();
if (CollectionUtils.isNotEmpty(regionNames) &&
CollectionUtils.isNotEmpty(tableRegionsForReopen)) {
for (byte[] regionName : regionNames) {
for (HRegionLocation hRegionLocation : tableRegionsForReopen) {
if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) {
regionsToReopen.add(hRegionLocation);
break;
}
}
}
} else {
regionsToReopen = tableRegionsForReopen;
}
return regionsToReopen;
}
/**
* At end of timeout, wake ourselves up so we run again.
*/

View File

@ -1655,6 +1655,8 @@ public class HRegionServer extends HasThread implements
byte[] name = r.getRegionInfo().getRegionName();
int stores = 0;
int storefiles = 0;
int storeRefCount = 0;
int maxStoreFileRefCount = 0;
int storeUncompressedSizeMB = 0;
int storefileSizeMB = 0;
int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
@ -1668,6 +1670,10 @@ public class HRegionServer extends HasThread implements
stores += storeList.size();
for (HStore store : storeList) {
storefiles += store.getStorefilesCount();
int currentStoreRefCount = store.getStoreRefCount();
storeRefCount += currentStoreRefCount;
int currentMaxStoreFileRefCount = store.getMaxStoreFileRefCount();
maxStoreFileRefCount = Math.max(maxStoreFileRefCount, currentMaxStoreFileRefCount);
storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
//TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
@ -1695,6 +1701,8 @@ public class HRegionServer extends HasThread implements
regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
.setStores(stores)
.setStorefiles(storefiles)
.setStoreRefCount(storeRefCount)
.setMaxStoreFileRefCount(maxStoreFileRefCount)
.setStoreUncompressedSizeMB(storeUncompressedSizeMB)
.setStorefileSizeMB(storefileSizeMB)
.setMemStoreSizeMB(memstoreSizeMB)

View File

@ -33,6 +33,7 @@ import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
@ -2780,4 +2781,20 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
.filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
.mapToInt(HStoreFile::getRefCount).sum();
}
/**
* @return get maximum ref count of storeFile among all HStore Files
* for the HStore
*/
public int getMaxStoreFileRefCount() {
OptionalInt maxStoreFileRefCount = this.storeEngine.getStoreFileManager()
.getStorefiles()
.stream()
.filter(sf -> sf.getReader() != null)
.filter(HStoreFile::isHFile)
.mapToInt(HStoreFile::getRefCount)
.max();
return maxStoreFileRefCount.isPresent() ? maxStoreFileRefCount.getAsInt() : 0;
}
}

View File

@ -49,6 +49,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
private Runnable runnable;
private long numStoreFiles;
private long storeRefCount;
private long maxStoreFileRefCount;
private long memstoreSize;
private long storeFileSize;
private long maxStoreFileAge;
@ -125,6 +126,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
return storeRefCount;
}
@Override
public long getMaxStoreFileRefCount() {
return maxStoreFileRefCount;
}
@Override
public long getReadRequestCount() {
return this.region.getReadRequestsCount();
@ -233,6 +239,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
public void run() {
long tempNumStoreFiles = 0;
int tempStoreRefCount = 0;
int tempMaxStoreFileRefCount = 0;
long tempMemstoreSize = 0;
long tempStoreFileSize = 0;
long tempMaxStoreFileAge = 0;
@ -240,13 +247,16 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
long tempNumReferenceFiles = 0;
long tempMaxCompactionQueueSize = 0;
long tempMaxFlushQueueSize = 0;
long avgAgeNumerator = 0;
long numHFiles = 0;
if (region.stores != null) {
for (HStore store : region.stores.values()) {
tempNumStoreFiles += store.getStorefilesCount();
tempStoreRefCount += store.getStoreRefCount();
int currentStoreRefCount = store.getStoreRefCount();
tempStoreRefCount += currentStoreRefCount;
int currentMaxStoreFileRefCount = store.getMaxStoreFileRefCount();
tempMaxStoreFileRefCount = Math.max(tempMaxStoreFileRefCount,
currentMaxStoreFileRefCount);
tempMemstoreSize += store.getMemStoreSize().getDataSize();
tempStoreFileSize += store.getStorefilesSize();
OptionalLong storeMaxStoreFileAge = store.getMaxStoreFileAge();
@ -274,6 +284,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
numStoreFiles = tempNumStoreFiles;
storeRefCount = tempStoreRefCount;
maxStoreFileRefCount = tempMaxStoreFileRefCount;
memstoreSize = tempMemstoreSize;
storeFileSize = tempStoreFileSize;
maxStoreFileAge = tempMaxStoreFileAge;

View File

@ -0,0 +1,613 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for RegionsRecoveryChore
*/
@Category({MasterTests.class, SmallTests.class})
public class TestRegionsRecoveryChore {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionsRecoveryChore.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRegionsRecoveryChore.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private static final String UTF_8_CHARSET = StandardCharsets.UTF_8.name();
private HMaster hMaster;
private AssignmentManager assignmentManager;
private RegionsRecoveryChore regionsRecoveryChore;
private static int regionNo;
public static final byte[][] REGION_NAME_LIST = new byte[][]{
new byte[]{114, 101, 103, 105, 111, 110, 50, 49, 95, 51},
new byte[]{114, 101, 103, 105, 111, 110, 50, 53, 95, 51},
new byte[]{114, 101, 103, 105, 111, 110, 50, 54, 95, 52},
new byte[]{114, 101, 103, 105, 111, 110, 51, 50, 95, 53},
new byte[]{114, 101, 103, 105, 111, 110, 51, 49, 95, 52},
new byte[]{114, 101, 103, 105, 111, 110, 51, 48, 95, 51},
new byte[]{114, 101, 103, 105, 111, 110, 50, 48, 95, 50},
new byte[]{114, 101, 103, 105, 111, 110, 50, 52, 95, 50},
new byte[]{114, 101, 103, 105, 111, 110, 50, 57, 95, 50},
new byte[]{114, 101, 103, 105, 111, 110, 51, 53, 95, 50},
new byte[]{114, 101, 103, 105, 111, 110, 49, 48, 56, 95, 49, 49}
};
private Configuration getCustomConf() {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setInt("hbase.master.regions.recovery.check.interval", 100);
return conf;
}
@Before
public void setUp() throws Exception {
this.hMaster = Mockito.mock(HMaster.class);
this.assignmentManager = Mockito.mock(AssignmentManager.class);
}
@After
public void tearDown() throws Exception {
Mockito.verifyNoMoreInteractions(this.hMaster);
Mockito.verifyNoMoreInteractions(this.assignmentManager);
}
@Test
public void testRegionReopensWithStoreRefConfig() throws Exception {
regionNo = 0;
ClusterMetrics clusterMetrics = TestRegionsRecoveryChore.getClusterMetrics(4);
final Map<ServerName, ServerMetrics> serverMetricsMap =
clusterMetrics.getLiveServerMetrics();
LOG.debug("All Region Names with refCount....");
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
LOG.debug("name: " + new String(regionMetrics.getRegionName()) + " refCount: " +
regionMetrics.getStoreRefCount());
}
}
Mockito.when(hMaster.getClusterMetrics()).thenReturn(clusterMetrics);
Mockito.when(hMaster.getAssignmentManager()).thenReturn(assignmentManager);
for (byte[] regionName : REGION_NAME_LIST) {
Mockito.when(assignmentManager.getRegionInfo(regionName))
.thenReturn(TestRegionsRecoveryChore.getRegionInfo(regionName));
}
Stoppable stoppable = new StoppableImplementation();
Configuration configuration = getCustomConf();
configuration.setInt("hbase.regions.recovery.store.file.ref.count", 300);
regionsRecoveryChore = new RegionsRecoveryChore(stoppable, configuration, hMaster);
regionsRecoveryChore.chore();
// Verify that we need to reopen regions of 2 tables
Mockito.verify(hMaster, Mockito.times(2)).reopenRegions(Mockito.any(), Mockito.anyList(),
Mockito.anyLong(), Mockito.anyLong());
Mockito.verify(hMaster, Mockito.times(1)).getClusterMetrics();
// Verify that we need to reopen total 3 regions that have refCount > 300
Mockito.verify(hMaster, Mockito.times(3)).getAssignmentManager();
Mockito.verify(assignmentManager, Mockito.times(3))
.getRegionInfo(Mockito.any());
}
@Test
public void testRegionReopensWithLessThreshold() throws Exception {
regionNo = 0;
ClusterMetrics clusterMetrics = TestRegionsRecoveryChore.getClusterMetrics(4);
final Map<ServerName, ServerMetrics> serverMetricsMap =
clusterMetrics.getLiveServerMetrics();
LOG.debug("All Region Names with refCount....");
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
LOG.debug("name: " + new String(regionMetrics.getRegionName()) + " refCount: " +
regionMetrics.getStoreRefCount());
}
}
Mockito.when(hMaster.getClusterMetrics()).thenReturn(clusterMetrics);
Mockito.when(hMaster.getAssignmentManager()).thenReturn(assignmentManager);
for (byte[] regionName : REGION_NAME_LIST) {
Mockito.when(assignmentManager.getRegionInfo(regionName))
.thenReturn(TestRegionsRecoveryChore.getRegionInfo(regionName));
}
Stoppable stoppable = new StoppableImplementation();
Configuration configuration = getCustomConf();
configuration.setInt("hbase.regions.recovery.store.file.ref.count", 400);
regionsRecoveryChore = new RegionsRecoveryChore(stoppable, configuration, hMaster);
regionsRecoveryChore.chore();
// Verify that we need to reopen regions of only 1 table
Mockito.verify(hMaster, Mockito.times(1)).reopenRegions(Mockito.any(), Mockito.anyList(),
Mockito.anyLong(), Mockito.anyLong());
Mockito.verify(hMaster, Mockito.times(1)).getClusterMetrics();
// Verify that we need to reopen only 1 region with refCount > 400
Mockito.verify(hMaster, Mockito.times(1)).getAssignmentManager();
Mockito.verify(assignmentManager, Mockito.times(1))
.getRegionInfo(Mockito.any());
}
@Test
public void testRegionReopensWithoutStoreRefConfig() throws Exception {
regionNo = 0;
ClusterMetrics clusterMetrics = TestRegionsRecoveryChore.getClusterMetrics(10);
final Map<ServerName, ServerMetrics> serverMetricsMap =
clusterMetrics.getLiveServerMetrics();
LOG.debug("All Region Names with refCount....");
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
LOG.debug("name: " + new String(regionMetrics.getRegionName()) + " refCount: " +
regionMetrics.getStoreRefCount());
}
}
Mockito.when(hMaster.getClusterMetrics()).thenReturn(clusterMetrics);
Mockito.when(hMaster.getAssignmentManager()).thenReturn(assignmentManager);
for (byte[] regionName : REGION_NAME_LIST) {
Mockito.when(assignmentManager.getRegionInfo(regionName))
.thenReturn(TestRegionsRecoveryChore.getRegionInfo(regionName));
}
Stoppable stoppable = new StoppableImplementation();
Configuration configuration = getCustomConf();
configuration.unset("hbase.regions.recovery.store.file.ref.count");
regionsRecoveryChore = new RegionsRecoveryChore(stoppable, configuration, hMaster);
regionsRecoveryChore.chore();
// Verify that by default the feature is turned off so no regions
// should be reopened
Mockito.verify(hMaster, Mockito.times(0)).reopenRegions(Mockito.any(), Mockito.anyList(),
Mockito.anyLong(), Mockito.anyLong());
// default maxStoreFileRefCount is -1 (no regions to be reopened using AM)
Mockito.verify(hMaster, Mockito.times(0)).getAssignmentManager();
Mockito.verify(assignmentManager, Mockito.times(0))
.getRegionInfo(Mockito.any());
}
private static ClusterMetrics getClusterMetrics(int noOfLiveServer) {
ClusterMetrics clusterMetrics = new ClusterMetrics() {
@Nullable
@Override
public String getHBaseVersion() {
return null;
}
@Override
public List<ServerName> getDeadServerNames() {
return null;
}
@Override
public Map<ServerName, ServerMetrics> getLiveServerMetrics() {
Map<ServerName, ServerMetrics> liveServerMetrics = new HashMap<>();
for (int i = 0; i < noOfLiveServer; i++) {
ServerName serverName = ServerName.valueOf("rs_" + i, 16010, 12345);
liveServerMetrics.put(serverName, TestRegionsRecoveryChore.getServerMetrics(i + 3));
}
return liveServerMetrics;
}
@Nullable
@Override
public ServerName getMasterName() {
return null;
}
@Override
public List<ServerName> getBackupMasterNames() {
return null;
}
@Override
public List<RegionState> getRegionStatesInTransition() {
return null;
}
@Nullable
@Override
public String getClusterId() {
return null;
}
@Override
public List<String> getMasterCoprocessorNames() {
return null;
}
@Nullable
@Override
public Boolean getBalancerOn() {
return null;
}
@Override
public int getMasterInfoPort() {
return 0;
}
@Override
public List<ServerName> getServersName() {
return null;
}
@Override
public Map<TableName, RegionStatesCount> getTableRegionStatesCount() {
return null;
}
};
return clusterMetrics;
}
private static ServerMetrics getServerMetrics(int noOfRegions) {
ServerMetrics serverMetrics = new ServerMetrics() {
@Override
public ServerName getServerName() {
return null;
}
@Override
public long getRequestCountPerSecond() {
return 0;
}
@Override
public long getRequestCount() {
return 0;
}
@Override
public Size getUsedHeapSize() {
return null;
}
@Override
public Size getMaxHeapSize() {
return null;
}
@Override
public int getInfoServerPort() {
return 0;
}
@Override
public List<ReplicationLoadSource> getReplicationLoadSourceList() {
return null;
}
@Override
public Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap() {
return null;
}
@Nullable
@Override
public ReplicationLoadSink getReplicationLoadSink() {
return null;
}
@Override
public Map<byte[], RegionMetrics> getRegionMetrics() {
Map<byte[], RegionMetrics> regionMetricsMap = new HashMap<>();
for (int i = 0; i < noOfRegions; i++) {
byte[] regionName = Bytes.toBytes("region" + regionNo + "_" + i);
regionMetricsMap.put(regionName,
TestRegionsRecoveryChore.getRegionMetrics(regionName, 100 * i));
++regionNo;
}
return regionMetricsMap;
}
@Override
public Set<String> getCoprocessorNames() {
return null;
}
@Override
public long getReportTimestamp() {
return 0;
}
@Override
public long getLastReportTimestamp() {
return 0;
}
};
return serverMetrics;
}
private static RegionMetrics getRegionMetrics(byte[] regionName, int storeRefCount) {
RegionMetrics regionMetrics = new RegionMetrics() {
@Override
public byte[] getRegionName() {
return regionName;
}
@Override
public int getStoreCount() {
return 0;
}
@Override
public int getStoreFileCount() {
return 0;
}
@Override
public Size getStoreFileSize() {
return null;
}
@Override
public Size getMemStoreSize() {
return null;
}
@Override
public long getReadRequestCount() {
return 0;
}
@Override
public long getWriteRequestCount() {
return 0;
}
@Override
public long getCpRequestCount() {
return 0;
}
@Override
public long getFilteredReadRequestCount() {
return 0;
}
@Override
public Size getStoreFileIndexSize() {
return null;
}
@Override
public Size getStoreFileRootLevelIndexSize() {
return null;
}
@Override
public Size getStoreFileUncompressedDataIndexSize() {
return null;
}
@Override
public Size getBloomFilterSize() {
return null;
}
@Override
public long getCompactingCellCount() {
return 0;
}
@Override
public long getCompactedCellCount() {
return 0;
}
@Override
public long getCompletedSequenceId() {
return 0;
}
@Override
public Map<byte[], Long> getStoreSequenceId() {
return null;
}
@Override
public Size getUncompressedStoreFileSize() {
return null;
}
@Override
public float getDataLocality() {
return 0;
}
@Override
public long getLastMajorCompactionTimestamp() {
return 0;
}
@Override
public int getStoreRefCount() {
return storeRefCount;
}
@Override
public int getMaxStoreFileRefCount() {
return storeRefCount;
}
};
return regionMetrics;
}
private static RegionInfo getRegionInfo(byte[] regionNameBytes) {
RegionInfo regionInfo = new RegionInfo() {
@Override
public String getShortNameToLog() {
return null;
}
@Override
public long getRegionId() {
return 0;
}
@Override
public byte[] getRegionName() {
return new byte[0];
}
@Override
public String getRegionNameAsString() {
try {
return new String(regionNameBytes, UTF_8_CHARSET);
} catch (UnsupportedEncodingException e) {
return "";
}
}
@Override
public String getEncodedName() {
return null;
}
@Override
public byte[] getEncodedNameAsBytes() {
return new byte[0];
}
@Override
public byte[] getStartKey() {
return new byte[0];
}
@Override
public byte[] getEndKey() {
return new byte[0];
}
@Override
public TableName getTable() {
String regionName;
try {
regionName = new String(regionNameBytes, UTF_8_CHARSET);
} catch (UnsupportedEncodingException e) {
regionName = "";
}
int regionNo = Integer.parseInt(regionName.split("_")[1]);
TableName tableName = TableName.valueOf("table_" + regionNo % 3);
return tableName;
}
@Override
public int getReplicaId() {
return 0;
}
@Override
public boolean isSplit() {
return false;
}
@Override
public boolean isOffline() {
return false;
}
@Override
public boolean isSplitParent() {
return false;
}
@Override
public boolean isMetaRegion() {
return false;
}
@Override
public boolean containsRange(byte[] rangeStartKey, byte[] rangeEndKey) {
return false;
}
@Override
public boolean containsRow(byte[] row) {
return false;
}
};
return regionInfo;
}
/**
* Simple helper class that just keeps track of whether or not its stopped.
*/
private static class StoppableImplementation implements Stoppable {
private volatile boolean stop = false;
@Override
public void stop(String why) {
this.stop = true;
}
@Override
public boolean isStopped() {
return this.stop;
}
}
}

View File

@ -65,6 +65,11 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
return 0;
}
@Override
public long getMaxStoreFileRefCount() {
return 0;
}
@Override
public long getMemStoreSize() {
return 103;

View File

@ -2161,3 +2161,43 @@ The percent of region server RPC threads failed to abort RS.
+
.Default
`0`
[[hbase.master.regions.recovery.check.interval]]
*`hbase.master.regions.recovery.check.interval`*::
+
.Description
Regions Recovery Chore interval in milliseconds.
This chore keeps running at this interval to
find all regions with configurable max store file ref count
and reopens them.
+
.Default
`1200000`
[[hbase.regions.recovery.store.file.ref.count]]
*`hbase.regions.recovery.store.file.ref.count`*::
+
.Description
Very large ref count on a file indicates
that it is a ref leak on that object. Such files
can not be removed even after it is invalidated
via compaction. Only way to recover in such
scenario is to reopen the region which can
release all resources, like the refcount, leases, etc.
This config represents Store files Ref Count threshold
value considered for reopening regions.
Any region with store files ref count > this value
would be eligible for reopening by master.
Default value -1 indicates this feature is turned off.
Only positive integer value should be provided to enable
this feature.
+
.Default
`-1`

View File

@ -3618,3 +3618,26 @@ have auto normalization turned on …..…..….
server=hbase-test-rc-5.openstacklocal,16020,1469419333913}
2016-07-26 07:39:46,246 DEBUG [AM.ZK.Worker-pool2-t278] master.RegionStates: Onlined d6b5625df331cfec84dce4f1122c567f on hbase-test-rc-5.openstacklocal,16020,1469419333913 {ENCODED => d6b5625df331cfec84dce4f1122c567f, NAME => 'table_h2osxu3wat,154717,1469518785661.d6b5625df331cfec84dce4f1122c567f.', STARTKEY => '154717', ENDKEY => '3'}
----
[[auto_reopen_regions]]
== Auto Region Reopen
We can leak store reader references if a coprocessor or core function somehow
opens a scanner, or wraps one, and then does not take care to call close on the
scanner or the wrapped instance. Leaked store files can not be removed even
after it is invalidated via compaction.
A reasonable mitigation for a reader reference
leak would be a fast reopen of the region on the same server.
This will release all resources, like the refcount, leases, etc.
The clients should gracefully ride over this like any other region in
transition.
By default this auto reopen of region feature would be disabled.
To enabled it, please provide high ref count value for config
`hbase.regions.recovery.store.file.ref.count`.
Please refer to config descriptions for
`hbase.master.regions.recovery.check.interval` and
`hbase.regions.recovery.store.file.ref.count`.