HBASE-22460 : Reopen regions with very high Store Ref Counts (#750)
Signed-off-by Anoop Sam John <anoopsamjohn@apache.org>
This commit is contained in:
parent
d9c36e0dcf
commit
33e8156ebc
|
@ -365,6 +365,11 @@ public class RegionLoad implements RegionMetrics {
|
|||
return metrics.getStoreRefCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxStoreFileRefCount() {
|
||||
return metrics.getMaxStoreFileRefCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.lang.Object#toString()
|
||||
*/
|
||||
|
|
|
@ -148,4 +148,10 @@ public interface RegionMetrics {
|
|||
* @return the reference count for the stores of this region
|
||||
*/
|
||||
int getStoreRefCount();
|
||||
|
||||
/**
|
||||
* @return the max reference count for any store file among all stores files
|
||||
* of this region
|
||||
*/
|
||||
int getMaxStoreFileRefCount();
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ public final class RegionMetricsBuilder {
|
|||
.setStoreCount(regionLoadPB.getStores())
|
||||
.setStoreFileCount(regionLoadPB.getStorefiles())
|
||||
.setStoreRefCount(regionLoadPB.getStoreRefCount())
|
||||
.setMaxStoreFileRefCount(regionLoadPB.getMaxStoreFileRefCount())
|
||||
.setStoreFileSize(new Size(regionLoadPB.getStorefileSizeMB(), Size.Unit.MEGABYTE))
|
||||
.setStoreSequenceIds(regionLoadPB.getStoreCompleteSequenceIdList().stream()
|
||||
.collect(Collectors.toMap(
|
||||
|
@ -111,6 +112,7 @@ public final class RegionMetricsBuilder {
|
|||
.setStores(regionMetrics.getStoreCount())
|
||||
.setStorefiles(regionMetrics.getStoreFileCount())
|
||||
.setStoreRefCount(regionMetrics.getStoreRefCount())
|
||||
.setMaxStoreFileRefCount(regionMetrics.getMaxStoreFileRefCount())
|
||||
.setStorefileSizeMB((int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE))
|
||||
.addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId()))
|
||||
.setStoreUncompressedSizeMB(
|
||||
|
@ -126,6 +128,7 @@ public final class RegionMetricsBuilder {
|
|||
private int storeCount;
|
||||
private int storeFileCount;
|
||||
private int storeRefCount;
|
||||
private int maxStoreFileRefCount;
|
||||
private long compactingCellCount;
|
||||
private long compactedCellCount;
|
||||
private Size storeFileSize = Size.ZERO;
|
||||
|
@ -158,6 +161,10 @@ public final class RegionMetricsBuilder {
|
|||
this.storeRefCount = value;
|
||||
return this;
|
||||
}
|
||||
public RegionMetricsBuilder setMaxStoreFileRefCount(int value) {
|
||||
this.maxStoreFileRefCount = value;
|
||||
return this;
|
||||
}
|
||||
public RegionMetricsBuilder setCompactingCellCount(long value) {
|
||||
this.compactingCellCount = value;
|
||||
return this;
|
||||
|
@ -228,6 +235,7 @@ public final class RegionMetricsBuilder {
|
|||
storeCount,
|
||||
storeFileCount,
|
||||
storeRefCount,
|
||||
maxStoreFileRefCount,
|
||||
compactingCellCount,
|
||||
compactedCellCount,
|
||||
storeFileSize,
|
||||
|
@ -251,6 +259,7 @@ public final class RegionMetricsBuilder {
|
|||
private final int storeCount;
|
||||
private final int storeFileCount;
|
||||
private final int storeRefCount;
|
||||
private final int maxStoreFileRefCount;
|
||||
private final long compactingCellCount;
|
||||
private final long compactedCellCount;
|
||||
private final Size storeFileSize;
|
||||
|
@ -271,6 +280,7 @@ public final class RegionMetricsBuilder {
|
|||
int storeCount,
|
||||
int storeFileCount,
|
||||
int storeRefCount,
|
||||
int maxStoreFileRefCount,
|
||||
final long compactingCellCount,
|
||||
long compactedCellCount,
|
||||
Size storeFileSize,
|
||||
|
@ -291,6 +301,7 @@ public final class RegionMetricsBuilder {
|
|||
this.storeCount = storeCount;
|
||||
this.storeFileCount = storeFileCount;
|
||||
this.storeRefCount = storeRefCount;
|
||||
this.maxStoreFileRefCount = maxStoreFileRefCount;
|
||||
this.compactingCellCount = compactingCellCount;
|
||||
this.compactedCellCount = compactedCellCount;
|
||||
this.storeFileSize = Preconditions.checkNotNull(storeFileSize);
|
||||
|
@ -329,6 +340,11 @@ public final class RegionMetricsBuilder {
|
|||
return storeRefCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxStoreFileRefCount() {
|
||||
return maxStoreFileRefCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getStoreFileSize() {
|
||||
return storeFileSize;
|
||||
|
@ -417,6 +433,8 @@ public final class RegionMetricsBuilder {
|
|||
this.getStoreFileCount());
|
||||
Strings.appendKeyValue(sb, "storeRefCount",
|
||||
this.getStoreRefCount());
|
||||
Strings.appendKeyValue(sb, "maxStoreFileRefCount",
|
||||
this.getMaxStoreFileRefCount());
|
||||
Strings.appendKeyValue(sb, "uncompressedStoreFileSize",
|
||||
this.getUncompressedStoreFileSize());
|
||||
Strings.appendKeyValue(sb, "lastMajorCompactionTimestamp",
|
||||
|
|
|
@ -343,6 +343,8 @@ public final class ServerMetricsBuilder {
|
|||
public String toString() {
|
||||
int storeCount = 0;
|
||||
int storeFileCount = 0;
|
||||
int storeRefCount = 0;
|
||||
int maxStoreFileRefCount = 0;
|
||||
long uncompressedStoreFileSizeMB = 0;
|
||||
long storeFileSizeMB = 0;
|
||||
long memStoreSizeMB = 0;
|
||||
|
@ -357,6 +359,9 @@ public final class ServerMetricsBuilder {
|
|||
for (RegionMetrics r : getRegionMetrics().values()) {
|
||||
storeCount += r.getStoreCount();
|
||||
storeFileCount += r.getStoreFileCount();
|
||||
storeRefCount += r.getStoreRefCount();
|
||||
int currentMaxStoreFileRefCount = r.getMaxStoreFileRefCount();
|
||||
maxStoreFileRefCount = Math.max(maxStoreFileRefCount, currentMaxStoreFileRefCount);
|
||||
uncompressedStoreFileSizeMB += r.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE);
|
||||
storeFileSizeMB += r.getStoreFileSize().get(Size.Unit.MEGABYTE);
|
||||
memStoreSizeMB += r.getMemStoreSize().get(Size.Unit.MEGABYTE);
|
||||
|
@ -377,6 +382,8 @@ public final class ServerMetricsBuilder {
|
|||
Strings.appendKeyValue(sb, "maxHeapMB", getMaxHeapSize());
|
||||
Strings.appendKeyValue(sb, "numberOfStores", storeCount);
|
||||
Strings.appendKeyValue(sb, "numberOfStorefiles", storeFileCount);
|
||||
Strings.appendKeyValue(sb, "storeRefCount", storeRefCount);
|
||||
Strings.appendKeyValue(sb, "maxStoreFileRefCount", maxStoreFileRefCount);
|
||||
Strings.appendKeyValue(sb, "storefileUncompressedSizeMB", uncompressedStoreFileSizeMB);
|
||||
Strings.appendKeyValue(sb, "storefileSizeMB", storeFileSizeMB);
|
||||
if (uncompressedStoreFileSizeMB != 0) {
|
||||
|
|
|
@ -1501,6 +1501,13 @@ public final class HConstants {
|
|||
// User defined Default TTL config key
|
||||
public static final String DEFAULT_SNAPSHOT_TTL_CONFIG_KEY = "hbase.master.snapshot.ttl";
|
||||
|
||||
// Regions Recovery based on high storeFileRefCount threshold value
|
||||
public static final String STORE_FILE_REF_COUNT_THRESHOLD =
|
||||
"hbase.regions.recovery.store.file.ref.count";
|
||||
|
||||
// default -1 indicates there is no threshold on high storeRefCount
|
||||
public static final int DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD = -1;
|
||||
|
||||
/**
|
||||
* Configurations for master executor services.
|
||||
*/
|
||||
|
|
|
@ -1877,4 +1877,33 @@ possible configurations would overwhelm and obscure the important.
|
|||
automatically deleted until it is manually deleted
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master.regions.recovery.check.interval</name>
|
||||
<value>1200000</value>
|
||||
<description>
|
||||
Regions Recovery Chore interval in milliseconds.
|
||||
This chore keeps running at this interval to
|
||||
find all regions with configurable max store file ref count
|
||||
and reopens them.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regions.recovery.store.file.ref.count</name>
|
||||
<value>-1</value>
|
||||
<description>
|
||||
Very large ref count on a file indicates
|
||||
that it is a ref leak on that object. Such files
|
||||
can not be removed even after it is invalidated
|
||||
via compaction. Only way to recover in such
|
||||
scenario is to reopen the region which can
|
||||
release all resources, like the refcount, leases, etc.
|
||||
This config represents Store files Ref Count threshold
|
||||
value considered for reopening regions.
|
||||
Any region with store files ref count > this value
|
||||
would be eligible for reopening by master.
|
||||
Default value -1 indicates this feature is turned off.
|
||||
Only positive integer value should be provided to enable
|
||||
this feature.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -233,6 +233,7 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
|
|||
String STOREFILE_COUNT_DESC = "Number of Store Files";
|
||||
String STORE_REF_COUNT = "storeRefCount";
|
||||
String STORE_REF_COUNT_DESC = "Store reference count";
|
||||
String MAX_STORE_FILE_REF_COUNT = "maxStoreFileRefCount";
|
||||
String MEMSTORE_SIZE = "memStoreSize";
|
||||
String MEMSTORE_SIZE_DESC = "Size of the memstore";
|
||||
String STOREFILE_SIZE = "storeFileSize";
|
||||
|
|
|
@ -157,4 +157,10 @@ public interface MetricsRegionWrapper {
|
|||
* @return the number of references active on the store
|
||||
*/
|
||||
long getStoreRefCount();
|
||||
|
||||
/**
|
||||
* @return the max number of references active on any store file among
|
||||
* all store files that belong to this region
|
||||
*/
|
||||
long getMaxStoreFileRefCount();
|
||||
}
|
||||
|
|
|
@ -217,6 +217,10 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
regionNamePrefix + MetricsRegionServerSource.STORE_REF_COUNT,
|
||||
MetricsRegionServerSource.STORE_REF_COUNT),
|
||||
this.regionWrapper.getStoreRefCount());
|
||||
mrb.addGauge(Interns.info(
|
||||
regionNamePrefix + MetricsRegionServerSource.MAX_STORE_FILE_REF_COUNT,
|
||||
MetricsRegionServerSource.MAX_STORE_FILE_REF_COUNT),
|
||||
this.regionWrapper.getMaxStoreFileRefCount());
|
||||
mrb.addGauge(Interns.info(
|
||||
regionNamePrefix + MetricsRegionServerSource.MEMSTORE_SIZE,
|
||||
MetricsRegionServerSource.MEMSTORE_SIZE_DESC),
|
||||
|
|
|
@ -99,6 +99,11 @@ public class TestMetricsRegionSourceImpl {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxStoreFileRefCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMemStoreSize() {
|
||||
return 0;
|
||||
|
|
|
@ -149,6 +149,12 @@ message RegionLoad {
|
|||
|
||||
/** the number of references active on the store */
|
||||
optional int32 store_ref_count = 21 [default = 0];
|
||||
|
||||
/**
|
||||
* The max number of references active on single store file among all store files
|
||||
* that belong to given region
|
||||
*/
|
||||
optional int32 max_store_file_ref_count = 22 [default = 0];
|
||||
}
|
||||
|
||||
/* Server-level protobufs */
|
||||
|
|
|
@ -145,6 +145,12 @@ message RegionLoad {
|
|||
|
||||
/** the number of references active on the store */
|
||||
optional int32 store_ref_count = 21 [default = 0];
|
||||
|
||||
/**
|
||||
* The max number of references active on single store file among all store files
|
||||
* that belong to given region
|
||||
*/
|
||||
optional int32 max_store_file_ref_count = 22 [default = 0];
|
||||
}
|
||||
|
||||
/* Server-level protobufs */
|
||||
|
|
|
@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
|
|||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
||||
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
|
||||
|
@ -414,6 +415,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// monitor for distributed procedures
|
||||
private MasterProcedureManagerHost mpmHost;
|
||||
|
||||
private RegionsRecoveryChore regionsRecoveryChore = null;
|
||||
// it is assigned after 'initialized' guard set to true, so should be volatile
|
||||
private volatile MasterQuotaManager quotaManager;
|
||||
private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
|
||||
|
@ -1441,6 +1443,20 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
|
||||
getChoreService().scheduleChore(hfileCleaner);
|
||||
|
||||
// Regions Reopen based on very high storeFileRefCount is considered enabled
|
||||
// only if hbase.regions.recovery.store.file.ref.count has value > 0
|
||||
final int maxStoreFileRefCount = conf.getInt(
|
||||
HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
|
||||
HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
|
||||
if (maxStoreFileRefCount > 0) {
|
||||
this.regionsRecoveryChore = new RegionsRecoveryChore(this, conf, this);
|
||||
getChoreService().scheduleChore(this.regionsRecoveryChore);
|
||||
} else {
|
||||
LOG.info("Reopening regions with very high storeFileRefCount is disabled. " +
|
||||
"Provide threshold value > 0 for {} to enable it.",
|
||||
HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
|
||||
}
|
||||
|
||||
replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
|
||||
replicationPeerManager);
|
||||
getChoreService().scheduleChore(replicationBarrierCleaner);
|
||||
|
@ -1608,6 +1624,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
choreService.cancelChore(this.replicationBarrierCleaner);
|
||||
choreService.cancelChore(this.snapshotCleanerChore);
|
||||
choreService.cancelChore(this.hbckChore);
|
||||
choreService.cancelChore(this.regionsRecoveryChore);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3689,6 +3706,38 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reopen regions provided in the argument
|
||||
*
|
||||
* @param tableName The current table name
|
||||
* @param regionNames The region names of the regions to reopen
|
||||
* @param nonceGroup Identifier for the source of the request, a client or process
|
||||
* @param nonce A unique identifier for this operation from the client or process identified by
|
||||
* <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
|
||||
* @return procedure Id
|
||||
* @throws IOException if reopening region fails while running procedure
|
||||
*/
|
||||
long reopenRegions(final TableName tableName, final List<byte[]> regionNames,
|
||||
final long nonceGroup, final long nonce)
|
||||
throws IOException {
|
||||
|
||||
return MasterProcedureUtil
|
||||
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
|
||||
|
||||
@Override
|
||||
protected void run() throws IOException {
|
||||
submitProcedure(new ReopenTableRegionsProcedure(tableName, regionNames));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getDescription() {
|
||||
return "ReopenTableRegionsProcedure";
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerManager getReplicationPeerManager() {
|
||||
return replicationPeerManager;
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
|
||||
|
||||
/**
|
||||
* This chore, every time it runs, will try to recover regions with high store ref count
|
||||
* by reopening them
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionsRecoveryChore extends ScheduledChore {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionsRecoveryChore.class);
|
||||
|
||||
private static final String REGIONS_RECOVERY_CHORE_NAME = "RegionsRecoveryChore";
|
||||
|
||||
private static final String REGIONS_RECOVERY_INTERVAL =
|
||||
"hbase.master.regions.recovery.check.interval";
|
||||
|
||||
private static final int DEFAULT_REGIONS_RECOVERY_INTERVAL = 1200 * 1000; // Default 20 min ?
|
||||
|
||||
private static final String ERROR_REOPEN_REIONS_MSG =
|
||||
"Error reopening regions with high storeRefCount. ";
|
||||
|
||||
private final HMaster hMaster;
|
||||
private final int storeFileRefCountThreshold;
|
||||
|
||||
private static final PerClientRandomNonceGenerator NONCE_GENERATOR =
|
||||
PerClientRandomNonceGenerator.get();
|
||||
|
||||
/**
|
||||
* Construct RegionsRecoveryChore with provided params
|
||||
*
|
||||
* @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
|
||||
* @param configuration The configuration params to be used
|
||||
* @param hMaster HMaster instance to initiate RegionTableRegions
|
||||
*/
|
||||
RegionsRecoveryChore(final Stoppable stopper, final Configuration configuration,
|
||||
final HMaster hMaster) {
|
||||
|
||||
super(REGIONS_RECOVERY_CHORE_NAME, stopper, configuration.getInt(REGIONS_RECOVERY_INTERVAL,
|
||||
DEFAULT_REGIONS_RECOVERY_INTERVAL));
|
||||
this.hMaster = hMaster;
|
||||
this.storeFileRefCountThreshold = configuration.getInt(
|
||||
HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
|
||||
HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"Starting up Regions Recovery chore for reopening regions based on storeFileRefCount...");
|
||||
}
|
||||
try {
|
||||
// only if storeFileRefCountThreshold > 0, consider the feature turned on
|
||||
if (storeFileRefCountThreshold > 0) {
|
||||
final ClusterMetrics clusterMetrics = hMaster.getClusterMetrics();
|
||||
final Map<ServerName, ServerMetrics> serverMetricsMap =
|
||||
clusterMetrics.getLiveServerMetrics();
|
||||
final Map<TableName, List<byte[]>> tableToReopenRegionsMap =
|
||||
getTableToRegionsByRefCount(serverMetricsMap);
|
||||
if (MapUtils.isNotEmpty(tableToReopenRegionsMap)) {
|
||||
tableToReopenRegionsMap.forEach((tableName, regionNames) -> {
|
||||
try {
|
||||
LOG.warn("Reopening regions due to high storeFileRefCount. " +
|
||||
"TableName: {} , noOfRegions: {}", tableName, regionNames.size());
|
||||
hMaster.reopenRegions(tableName, regionNames, NONCE_GENERATOR.getNonceGroup(),
|
||||
NONCE_GENERATOR.newNonce());
|
||||
} catch (IOException e) {
|
||||
LOG.error("{} tableName: {}, regionNames: {}", ERROR_REOPEN_REIONS_MSG,
|
||||
tableName, regionNames, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Reopening regions with very high storeFileRefCount is disabled. " +
|
||||
"Provide threshold value > 0 for {} to enable it.",
|
||||
HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error while reopening regions based on storeRefCount threshold", e);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"Exiting Regions Recovery chore for reopening regions based on storeFileRefCount...");
|
||||
}
|
||||
}
|
||||
|
||||
private Map<TableName, List<byte[]>> getTableToRegionsByRefCount(
|
||||
final Map<ServerName, ServerMetrics> serverMetricsMap) {
|
||||
|
||||
final Map<TableName, List<byte[]>> tableToReopenRegionsMap = new HashMap<>();
|
||||
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
|
||||
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
|
||||
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
|
||||
// For each region, each store file can have different ref counts
|
||||
// We need to find maximum of all such ref counts and if that max count
|
||||
// is beyond a threshold value, we should reopen the region.
|
||||
// Here, we take max ref count of all store files and not the cumulative
|
||||
// count of all store files
|
||||
final int maxStoreFileRefCount = regionMetrics.getMaxStoreFileRefCount();
|
||||
|
||||
if (maxStoreFileRefCount > storeFileRefCountThreshold) {
|
||||
final byte[] regionName = regionMetrics.getRegionName();
|
||||
prepareTableToReopenRegionsMap(tableToReopenRegionsMap, regionName,
|
||||
maxStoreFileRefCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
return tableToReopenRegionsMap;
|
||||
|
||||
}
|
||||
|
||||
private void prepareTableToReopenRegionsMap(
|
||||
final Map<TableName, List<byte[]>> tableToReopenRegionsMap,
|
||||
final byte[] regionName, final int regionStoreRefCount) {
|
||||
|
||||
final RegionInfo regionInfo = hMaster.getAssignmentManager().getRegionInfo(regionName);
|
||||
final TableName tableName = regionInfo.getTable();
|
||||
if (TableName.isMetaTableName(tableName)) {
|
||||
// Do not reopen regions of meta table even if it has
|
||||
// high store file reference count
|
||||
return;
|
||||
}
|
||||
LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
|
||||
regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
|
||||
tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>());
|
||||
tableToReopenRegionsMap.get(tableName).add(regionName);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -18,9 +18,11 @@
|
|||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||
|
@ -29,11 +31,14 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
|
||||
|
@ -50,15 +55,27 @@ public class ReopenTableRegionsProcedure
|
|||
|
||||
private TableName tableName;
|
||||
|
||||
// Specify specific regions of a table to reopen.
|
||||
// if specified null, all regions of the table will be reopened.
|
||||
private final List<byte[]> regionNames;
|
||||
|
||||
private List<HRegionLocation> regions = Collections.emptyList();
|
||||
|
||||
private RetryCounter retryCounter;
|
||||
|
||||
public ReopenTableRegionsProcedure() {
|
||||
regionNames = null;
|
||||
}
|
||||
|
||||
public ReopenTableRegionsProcedure(TableName tableName) {
|
||||
this.tableName = tableName;
|
||||
this.regionNames = null;
|
||||
}
|
||||
|
||||
public ReopenTableRegionsProcedure(final TableName tableName,
|
||||
final List<byte[]> regionNames) {
|
||||
this.tableName = tableName;
|
||||
this.regionNames = regionNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,8 +109,9 @@ public class ReopenTableRegionsProcedure
|
|||
LOG.info("Table {} is disabled, give up reopening its regions", tableName);
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
regions =
|
||||
env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName);
|
||||
List<HRegionLocation> tableRegions = env.getAssignmentManager()
|
||||
.getRegionStates().getRegionsOfTableForReopen(tableName);
|
||||
regions = getRegionLocationsForReopen(tableRegions);
|
||||
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
|
||||
|
@ -149,6 +167,26 @@ public class ReopenTableRegionsProcedure
|
|||
}
|
||||
}
|
||||
|
||||
private List<HRegionLocation> getRegionLocationsForReopen(
|
||||
List<HRegionLocation> tableRegionsForReopen) {
|
||||
|
||||
List<HRegionLocation> regionsToReopen = new ArrayList<>();
|
||||
if (CollectionUtils.isNotEmpty(regionNames) &&
|
||||
CollectionUtils.isNotEmpty(tableRegionsForReopen)) {
|
||||
for (byte[] regionName : regionNames) {
|
||||
for (HRegionLocation hRegionLocation : tableRegionsForReopen) {
|
||||
if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) {
|
||||
regionsToReopen.add(hRegionLocation);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
regionsToReopen = tableRegionsForReopen;
|
||||
}
|
||||
return regionsToReopen;
|
||||
}
|
||||
|
||||
/**
|
||||
* At end of timeout, wake ourselves up so we run again.
|
||||
*/
|
||||
|
|
|
@ -1650,6 +1650,8 @@ public class HRegionServer extends HasThread implements
|
|||
byte[] name = r.getRegionInfo().getRegionName();
|
||||
int stores = 0;
|
||||
int storefiles = 0;
|
||||
int storeRefCount = 0;
|
||||
int maxStoreFileRefCount = 0;
|
||||
int storeUncompressedSizeMB = 0;
|
||||
int storefileSizeMB = 0;
|
||||
int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
|
||||
|
@ -1663,6 +1665,10 @@ public class HRegionServer extends HasThread implements
|
|||
stores += storeList.size();
|
||||
for (HStore store : storeList) {
|
||||
storefiles += store.getStorefilesCount();
|
||||
int currentStoreRefCount = store.getStoreRefCount();
|
||||
storeRefCount += currentStoreRefCount;
|
||||
int currentMaxStoreFileRefCount = store.getMaxStoreFileRefCount();
|
||||
maxStoreFileRefCount = Math.max(maxStoreFileRefCount, currentMaxStoreFileRefCount);
|
||||
storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
|
||||
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
|
||||
//TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
|
||||
|
@ -1690,6 +1696,8 @@ public class HRegionServer extends HasThread implements
|
|||
regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
|
||||
.setStores(stores)
|
||||
.setStorefiles(storefiles)
|
||||
.setStoreRefCount(storeRefCount)
|
||||
.setMaxStoreFileRefCount(maxStoreFileRefCount)
|
||||
.setStoreUncompressedSizeMB(storeUncompressedSizeMB)
|
||||
.setStorefileSizeMB(storefileSizeMB)
|
||||
.setMemStoreSizeMB(memstoreSizeMB)
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Map;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalDouble;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -2672,4 +2673,20 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
.filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
|
||||
.mapToInt(HStoreFile::getRefCount).sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return get maximum ref count of storeFile among all HStore Files
|
||||
* for the HStore
|
||||
*/
|
||||
public int getMaxStoreFileRefCount() {
|
||||
OptionalInt maxStoreFileRefCount = this.storeEngine.getStoreFileManager()
|
||||
.getStorefiles()
|
||||
.stream()
|
||||
.filter(sf -> sf.getReader() != null)
|
||||
.filter(HStoreFile::isHFile)
|
||||
.mapToInt(HStoreFile::getRefCount)
|
||||
.max();
|
||||
return maxStoreFileRefCount.isPresent() ? maxStoreFileRefCount.getAsInt() : 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
private Runnable runnable;
|
||||
private long numStoreFiles;
|
||||
private long storeRefCount;
|
||||
private long maxStoreFileRefCount;
|
||||
private long memstoreSize;
|
||||
private long storeFileSize;
|
||||
private long maxStoreFileAge;
|
||||
|
@ -125,6 +126,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
return storeRefCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxStoreFileRefCount() {
|
||||
return maxStoreFileRefCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRequestCount() {
|
||||
return this.region.getReadRequestsCount();
|
||||
|
@ -228,6 +234,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
public void run() {
|
||||
long tempNumStoreFiles = 0;
|
||||
int tempStoreRefCount = 0;
|
||||
int tempMaxStoreFileRefCount = 0;
|
||||
long tempMemstoreSize = 0;
|
||||
long tempStoreFileSize = 0;
|
||||
long tempMaxStoreFileAge = 0;
|
||||
|
@ -235,13 +242,16 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
long tempNumReferenceFiles = 0;
|
||||
long tempMaxCompactionQueueSize = 0;
|
||||
long tempMaxFlushQueueSize = 0;
|
||||
|
||||
long avgAgeNumerator = 0;
|
||||
long numHFiles = 0;
|
||||
if (region.stores != null) {
|
||||
for (HStore store : region.stores.values()) {
|
||||
tempNumStoreFiles += store.getStorefilesCount();
|
||||
tempStoreRefCount += store.getStoreRefCount();
|
||||
int currentStoreRefCount = store.getStoreRefCount();
|
||||
tempStoreRefCount += currentStoreRefCount;
|
||||
int currentMaxStoreFileRefCount = store.getMaxStoreFileRefCount();
|
||||
tempMaxStoreFileRefCount = Math.max(tempMaxStoreFileRefCount,
|
||||
currentMaxStoreFileRefCount);
|
||||
tempMemstoreSize += store.getMemStoreSize().getDataSize();
|
||||
tempStoreFileSize += store.getStorefilesSize();
|
||||
OptionalLong storeMaxStoreFileAge = store.getMaxStoreFileAge();
|
||||
|
@ -269,6 +279,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
|
||||
numStoreFiles = tempNumStoreFiles;
|
||||
storeRefCount = tempStoreRefCount;
|
||||
maxStoreFileRefCount = tempMaxStoreFileRefCount;
|
||||
memstoreSize = tempMemstoreSize;
|
||||
storeFileSize = tempStoreFileSize;
|
||||
maxStoreFileAge = tempMaxStoreFileAge;
|
||||
|
|
|
@ -0,0 +1,608 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionStatesCount;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test for RegionsRecoveryChore
|
||||
*/
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestRegionsRecoveryChore {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionsRecoveryChore.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestRegionsRecoveryChore.class);
|
||||
|
||||
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
|
||||
|
||||
private static final String UTF_8_CHARSET = StandardCharsets.UTF_8.name();
|
||||
|
||||
private HMaster hMaster;
|
||||
|
||||
private AssignmentManager assignmentManager;
|
||||
|
||||
private RegionsRecoveryChore regionsRecoveryChore;
|
||||
|
||||
private static int regionNo;
|
||||
public static final byte[][] REGION_NAME_LIST = new byte[][]{
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 50, 49, 95, 51},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 50, 53, 95, 51},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 50, 54, 95, 52},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 51, 50, 95, 53},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 51, 49, 95, 52},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 51, 48, 95, 51},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 50, 48, 95, 50},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 50, 52, 95, 50},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 50, 57, 95, 50},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 51, 53, 95, 50},
|
||||
new byte[]{114, 101, 103, 105, 111, 110, 49, 48, 56, 95, 49, 49}
|
||||
};
|
||||
|
||||
private Configuration getCustomConf() {
|
||||
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
|
||||
conf.setInt("hbase.master.regions.recovery.check.interval", 100);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
this.hMaster = Mockito.mock(HMaster.class);
|
||||
this.assignmentManager = Mockito.mock(AssignmentManager.class);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
Mockito.verifyNoMoreInteractions(this.hMaster);
|
||||
Mockito.verifyNoMoreInteractions(this.assignmentManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionReopensWithStoreRefConfig() throws Exception {
|
||||
regionNo = 0;
|
||||
ClusterMetrics clusterMetrics = TestRegionsRecoveryChore.getClusterMetrics(4);
|
||||
final Map<ServerName, ServerMetrics> serverMetricsMap =
|
||||
clusterMetrics.getLiveServerMetrics();
|
||||
LOG.debug("All Region Names with refCount....");
|
||||
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
|
||||
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
|
||||
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
|
||||
LOG.debug("name: " + new String(regionMetrics.getRegionName()) + " refCount: " +
|
||||
regionMetrics.getStoreRefCount());
|
||||
}
|
||||
}
|
||||
Mockito.when(hMaster.getClusterMetrics()).thenReturn(clusterMetrics);
|
||||
Mockito.when(hMaster.getAssignmentManager()).thenReturn(assignmentManager);
|
||||
for (byte[] regionName : REGION_NAME_LIST) {
|
||||
Mockito.when(assignmentManager.getRegionInfo(regionName))
|
||||
.thenReturn(TestRegionsRecoveryChore.getRegionInfo(regionName));
|
||||
}
|
||||
Stoppable stoppable = new StoppableImplementation();
|
||||
Configuration configuration = getCustomConf();
|
||||
configuration.setInt("hbase.regions.recovery.store.file.ref.count", 300);
|
||||
regionsRecoveryChore = new RegionsRecoveryChore(stoppable, configuration, hMaster);
|
||||
regionsRecoveryChore.chore();
|
||||
|
||||
// Verify that we need to reopen regions of 2 tables
|
||||
Mockito.verify(hMaster, Mockito.times(2)).reopenRegions(Mockito.any(), Mockito.anyList(),
|
||||
Mockito.anyLong(), Mockito.anyLong());
|
||||
Mockito.verify(hMaster, Mockito.times(1)).getClusterMetrics();
|
||||
|
||||
// Verify that we need to reopen total 3 regions that have refCount > 300
|
||||
Mockito.verify(hMaster, Mockito.times(3)).getAssignmentManager();
|
||||
Mockito.verify(assignmentManager, Mockito.times(3))
|
||||
.getRegionInfo(Mockito.any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionReopensWithLessThreshold() throws Exception {
|
||||
regionNo = 0;
|
||||
ClusterMetrics clusterMetrics = TestRegionsRecoveryChore.getClusterMetrics(4);
|
||||
final Map<ServerName, ServerMetrics> serverMetricsMap =
|
||||
clusterMetrics.getLiveServerMetrics();
|
||||
LOG.debug("All Region Names with refCount....");
|
||||
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
|
||||
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
|
||||
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
|
||||
LOG.debug("name: " + new String(regionMetrics.getRegionName()) + " refCount: " +
|
||||
regionMetrics.getStoreRefCount());
|
||||
}
|
||||
}
|
||||
Mockito.when(hMaster.getClusterMetrics()).thenReturn(clusterMetrics);
|
||||
Mockito.when(hMaster.getAssignmentManager()).thenReturn(assignmentManager);
|
||||
for (byte[] regionName : REGION_NAME_LIST) {
|
||||
Mockito.when(assignmentManager.getRegionInfo(regionName))
|
||||
.thenReturn(TestRegionsRecoveryChore.getRegionInfo(regionName));
|
||||
}
|
||||
Stoppable stoppable = new StoppableImplementation();
|
||||
Configuration configuration = getCustomConf();
|
||||
configuration.setInt("hbase.regions.recovery.store.file.ref.count", 400);
|
||||
regionsRecoveryChore = new RegionsRecoveryChore(stoppable, configuration, hMaster);
|
||||
regionsRecoveryChore.chore();
|
||||
|
||||
// Verify that we need to reopen regions of only 1 table
|
||||
Mockito.verify(hMaster, Mockito.times(1)).reopenRegions(Mockito.any(), Mockito.anyList(),
|
||||
Mockito.anyLong(), Mockito.anyLong());
|
||||
Mockito.verify(hMaster, Mockito.times(1)).getClusterMetrics();
|
||||
|
||||
// Verify that we need to reopen only 1 region with refCount > 400
|
||||
Mockito.verify(hMaster, Mockito.times(1)).getAssignmentManager();
|
||||
Mockito.verify(assignmentManager, Mockito.times(1))
|
||||
.getRegionInfo(Mockito.any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionReopensWithoutStoreRefConfig() throws Exception {
|
||||
regionNo = 0;
|
||||
ClusterMetrics clusterMetrics = TestRegionsRecoveryChore.getClusterMetrics(10);
|
||||
final Map<ServerName, ServerMetrics> serverMetricsMap =
|
||||
clusterMetrics.getLiveServerMetrics();
|
||||
LOG.debug("All Region Names with refCount....");
|
||||
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
|
||||
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
|
||||
for (RegionMetrics regionMetrics : regionMetricsMap.values()) {
|
||||
LOG.debug("name: " + new String(regionMetrics.getRegionName()) + " refCount: " +
|
||||
regionMetrics.getStoreRefCount());
|
||||
}
|
||||
}
|
||||
Mockito.when(hMaster.getClusterMetrics()).thenReturn(clusterMetrics);
|
||||
Mockito.when(hMaster.getAssignmentManager()).thenReturn(assignmentManager);
|
||||
for (byte[] regionName : REGION_NAME_LIST) {
|
||||
Mockito.when(assignmentManager.getRegionInfo(regionName))
|
||||
.thenReturn(TestRegionsRecoveryChore.getRegionInfo(regionName));
|
||||
}
|
||||
Stoppable stoppable = new StoppableImplementation();
|
||||
Configuration configuration = getCustomConf();
|
||||
configuration.unset("hbase.regions.recovery.store.file.ref.count");
|
||||
regionsRecoveryChore = new RegionsRecoveryChore(stoppable, configuration, hMaster);
|
||||
regionsRecoveryChore.chore();
|
||||
|
||||
// Verify that by default the feature is turned off so no regions
|
||||
// should be reopened
|
||||
Mockito.verify(hMaster, Mockito.times(0)).reopenRegions(Mockito.any(), Mockito.anyList(),
|
||||
Mockito.anyLong(), Mockito.anyLong());
|
||||
|
||||
// default maxStoreFileRefCount is -1 (no regions to be reopened using AM)
|
||||
Mockito.verify(hMaster, Mockito.times(0)).getAssignmentManager();
|
||||
Mockito.verify(assignmentManager, Mockito.times(0))
|
||||
.getRegionInfo(Mockito.any());
|
||||
}
|
||||
|
||||
private static ClusterMetrics getClusterMetrics(int noOfLiveServer) {
|
||||
ClusterMetrics clusterMetrics = new ClusterMetrics() {
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getHBaseVersion() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getDeadServerNames() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ServerName, ServerMetrics> getLiveServerMetrics() {
|
||||
Map<ServerName, ServerMetrics> liveServerMetrics = new HashMap<>();
|
||||
for (int i = 0; i < noOfLiveServer; i++) {
|
||||
ServerName serverName = ServerName.valueOf("rs_" + i, 16010, 12345);
|
||||
liveServerMetrics.put(serverName, TestRegionsRecoveryChore.getServerMetrics(i + 3));
|
||||
}
|
||||
return liveServerMetrics;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public ServerName getMasterName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getBackupMasterNames() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionState> getRegionStatesInTransition() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getClusterId() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getMasterCoprocessorNames() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Boolean getBalancerOn() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMasterInfoPort() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getServersName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<TableName, RegionStatesCount> getTableRegionStatesCount() {
|
||||
return null;
|
||||
}
|
||||
|
||||
};
|
||||
return clusterMetrics;
|
||||
}
|
||||
|
||||
private static ServerMetrics getServerMetrics(int noOfRegions) {
|
||||
ServerMetrics serverMetrics = new ServerMetrics() {
|
||||
|
||||
@Override
|
||||
public ServerName getServerName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRequestCountPerSecond() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRequestCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getUsedHeapSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getMaxHeapSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInfoServerPort() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ReplicationLoadSource> getReplicationLoadSourceList() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public ReplicationLoadSink getReplicationLoadSink() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<byte[], RegionMetrics> getRegionMetrics() {
|
||||
Map<byte[], RegionMetrics> regionMetricsMap = new HashMap<>();
|
||||
for (int i = 0; i < noOfRegions; i++) {
|
||||
byte[] regionName = Bytes.toBytes("region" + regionNo + "_" + i);
|
||||
regionMetricsMap.put(regionName,
|
||||
TestRegionsRecoveryChore.getRegionMetrics(regionName, 100 * i));
|
||||
++regionNo;
|
||||
}
|
||||
return regionMetricsMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getCoprocessorNames() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReportTimestamp() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastReportTimestamp() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
};
|
||||
return serverMetrics;
|
||||
}
|
||||
|
||||
private static RegionMetrics getRegionMetrics(byte[] regionName, int storeRefCount) {
|
||||
RegionMetrics regionMetrics = new RegionMetrics() {
|
||||
|
||||
@Override
|
||||
public byte[] getRegionName() {
|
||||
return regionName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStoreCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStoreFileCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getStoreFileSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getMemStoreSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRequestCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRequestCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilteredReadRequestCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getStoreFileIndexSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getStoreFileRootLevelIndexSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getStoreFileUncompressedDataIndexSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getBloomFilterSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompactingCellCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompactedCellCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompletedSequenceId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<byte[], Long> getStoreSequenceId() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Size getUncompressedStoreFileSize() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getDataLocality() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastMajorCompactionTimestamp() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStoreRefCount() {
|
||||
return storeRefCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxStoreFileRefCount() {
|
||||
return storeRefCount;
|
||||
}
|
||||
|
||||
};
|
||||
return regionMetrics;
|
||||
}
|
||||
|
||||
private static RegionInfo getRegionInfo(byte[] regionNameBytes) {
|
||||
RegionInfo regionInfo = new RegionInfo() {
|
||||
|
||||
@Override
|
||||
public String getShortNameToLog() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRegionId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRegionName() {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRegionNameAsString() {
|
||||
try {
|
||||
return new String(regionNameBytes, UTF_8_CHARSET);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getEncodedName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getEncodedNameAsBytes() {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getStartKey() {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getEndKey() {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTable() {
|
||||
String regionName;
|
||||
try {
|
||||
regionName = new String(regionNameBytes, UTF_8_CHARSET);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
regionName = "";
|
||||
}
|
||||
int regionNo = Integer.parseInt(regionName.split("_")[1]);
|
||||
TableName tableName = TableName.valueOf("table_" + regionNo % 3);
|
||||
return tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReplicaId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSplit() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSplitParent() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMetaRegion() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsRange(byte[] rangeStartKey, byte[] rangeEndKey) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsRow(byte[] row) {
|
||||
return false;
|
||||
}
|
||||
|
||||
};
|
||||
return regionInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple helper class that just keeps track of whether or not its stopped.
|
||||
*/
|
||||
private static class StoppableImplementation implements Stoppable {
|
||||
|
||||
private volatile boolean stop = false;
|
||||
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
this.stop = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
return this.stop;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -65,6 +65,11 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxStoreFileRefCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMemStoreSize() {
|
||||
return 103;
|
||||
|
|
|
@ -2027,6 +2027,46 @@ A comma-separated list of
|
|||
`0`
|
||||
|
||||
|
||||
[[hbase.master.regions.recovery.check.interval]]
|
||||
*`hbase.master.regions.recovery.check.interval`*::
|
||||
+
|
||||
.Description
|
||||
|
||||
Regions Recovery Chore interval in milliseconds.
|
||||
This chore keeps running at this interval to
|
||||
find all regions with configurable max store file ref count
|
||||
and reopens them.
|
||||
|
||||
+
|
||||
.Default
|
||||
`1200000`
|
||||
|
||||
|
||||
[[hbase.regions.recovery.store.file.ref.count]]
|
||||
*`hbase.regions.recovery.store.file.ref.count`*::
|
||||
+
|
||||
.Description
|
||||
|
||||
Very large ref count on a file indicates
|
||||
that it is a ref leak on that object. Such files
|
||||
can not be removed even after it is invalidated
|
||||
via compaction. Only way to recover in such
|
||||
scenario is to reopen the region which can
|
||||
release all resources, like the refcount, leases, etc.
|
||||
This config represents Store files Ref Count threshold
|
||||
value considered for reopening regions.
|
||||
Any region with store files ref count > this value
|
||||
would be eligible for reopening by master.
|
||||
Default value -1 indicates this feature is turned off.
|
||||
Only positive integer value should be provided to enable
|
||||
this feature.
|
||||
|
||||
+
|
||||
.Default
|
||||
`-1`
|
||||
|
||||
|
||||
|
||||
[[hbase.region.replica.replication.enabled]]
|
||||
*`hbase.region.replica.replication.enabled`*::
|
||||
+
|
||||
|
|
|
@ -3491,3 +3491,25 @@ have auto normalization turned on …..…..….
|
|||
server=hbase-test-rc-5.openstacklocal,16020,1469419333913}
|
||||
2016-07-26 07:39:46,246 DEBUG [AM.ZK.Worker-pool2-t278] master.RegionStates: Onlined d6b5625df331cfec84dce4f1122c567f on hbase-test-rc-5.openstacklocal,16020,1469419333913 {ENCODED => d6b5625df331cfec84dce4f1122c567f, NAME => 'table_h2osxu3wat,154717,1469518785661.d6b5625df331cfec84dce4f1122c567f.', STARTKEY => '154717', ENDKEY => '3'}
|
||||
----
|
||||
|
||||
|
||||
[[auto_reopen_regions]]
|
||||
== Auto Region Reopen
|
||||
|
||||
We can leak store reader references if a coprocessor or core function somehow
|
||||
opens a scanner, or wraps one, and then does not take care to call close on the
|
||||
scanner or the wrapped instance. Leaked store files can not be removed even
|
||||
after it is invalidated via compaction.
|
||||
A reasonable mitigation for a reader reference
|
||||
leak would be a fast reopen of the region on the same server.
|
||||
This will release all resources, like the refcount, leases, etc.
|
||||
The clients should gracefully ride over this like any other region in
|
||||
transition.
|
||||
By default this auto reopen of region feature would be disabled.
|
||||
To enabled it, please provide high ref count value for config
|
||||
`hbase.regions.recovery.store.file.ref.count`.
|
||||
|
||||
Please refer to config descriptions for
|
||||
`hbase.master.regions.recovery.check.interval` and
|
||||
`hbase.regions.recovery.store.file.ref.count`.
|
||||
|
||||
|
|
Loading…
Reference in New Issue