HBASE-17000 Implement computation of online region sizes and report to the Master
Includes a trivial implementation of the Master-side collection to avoid. Only enough to write a test to verify RS collection.
This commit is contained in:
parent
f74e051bce
commit
6b334cd817
File diff suppressed because it is too large
Load Diff
|
@ -141,6 +141,22 @@ message SplitTableRegionResponse {
|
|||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message RegionSpaceUse {
|
||||
optional RegionInfo region = 1; // A region identifier
|
||||
optional uint64 size = 2; // The size in bytes of the region
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports filesystem usage for regions.
|
||||
*/
|
||||
message RegionSpaceUseReportRequest {
|
||||
repeated RegionSpaceUse space_use = 1;
|
||||
}
|
||||
|
||||
message RegionSpaceUseReportResponse {
|
||||
|
||||
}
|
||||
|
||||
service RegionServerStatusService {
|
||||
/** Called when a region server first starts. */
|
||||
rpc RegionServerStartup(RegionServerStartupRequest)
|
||||
|
@ -182,4 +198,10 @@ service RegionServerStatusService {
|
|||
*/
|
||||
rpc getProcedureResult(GetProcedureResultRequest)
|
||||
returns(GetProcedureResultResponse);
|
||||
|
||||
/**
|
||||
* Reports Region filesystem space use
|
||||
*/
|
||||
rpc ReportRegionSpaceUse(RegionSpaceUseReportRequest)
|
||||
returns(RegionSpaceUseReportResponse);
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.mob.MobUtils;
|
|||
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
|
||||
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
|
@ -94,6 +95,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
|
||||
|
@ -1901,4 +1905,19 @@ public class MasterRpcServices extends RSRpcServices
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionSpaceUseReportResponse reportRegionSpaceUse(RpcController controller,
|
||||
RegionSpaceUseReportRequest request) throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
MasterQuotaManager quotaManager = this.master.getMasterQuotaManager();
|
||||
for (RegionSpaceUse report : request.getSpaceUseList()) {
|
||||
quotaManager.addRegionSize(HRegionInfo.convert(report.getRegion()), report.getSize());
|
||||
}
|
||||
return RegionSpaceUseReportResponse.newBuilder().build();
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given {@link HRegionServer}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FileSystemUtilizationChore extends ScheduledChore {
|
||||
private static final Log LOG = LogFactory.getLog(FileSystemUtilizationChore.class);
|
||||
static final String FS_UTILIZATION_CHORE_PERIOD_KEY = "hbase.regionserver.quotas.fs.utilization.chore.period";
|
||||
static final int FS_UTILIZATION_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
|
||||
|
||||
static final String FS_UTILIZATION_CHORE_DELAY_KEY = "hbase.regionserver.quotas.fs.utilization.chore.delay";
|
||||
static final long FS_UTILIZATION_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
|
||||
|
||||
static final String FS_UTILIZATION_CHORE_TIMEUNIT_KEY = "hbase.regionserver.quotas.fs.utilization.chore.timeunit";
|
||||
static final String FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
|
||||
|
||||
static final String FS_UTILIZATION_MAX_ITERATION_DURATION_KEY = "hbase.regionserver.quotas.fs.utilization.chore.max.iteration.millis";
|
||||
static final long FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT = 5000L;
|
||||
|
||||
private final HRegionServer rs;
|
||||
private final long maxIterationMillis;
|
||||
private Iterator<Region> leftoverRegions;
|
||||
|
||||
public FileSystemUtilizationChore(HRegionServer rs) {
|
||||
super(FileSystemUtilizationChore.class.getSimpleName(), rs, getPeriod(rs.getConfiguration()),
|
||||
getInitialDelay(rs.getConfiguration()), getTimeUnit(rs.getConfiguration()));
|
||||
this.rs = rs;
|
||||
this.maxIterationMillis = rs.getConfiguration().getLong(
|
||||
FS_UTILIZATION_MAX_ITERATION_DURATION_KEY, FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
final Map<HRegionInfo,Long> onlineRegionSizes = new HashMap<>();
|
||||
final Set<Region> onlineRegions = new HashSet<>(rs.getOnlineRegions());
|
||||
// Process the regions from the last run if we have any. If we are somehow having difficulty
|
||||
// processing the Regions, we want to avoid creating a backlog in memory of Region objs.
|
||||
Iterator<Region> oldRegionsToProcess = getLeftoverRegions();
|
||||
final Iterator<Region> iterator;
|
||||
final boolean processingLeftovers;
|
||||
if (null == oldRegionsToProcess) {
|
||||
iterator = onlineRegions.iterator();
|
||||
processingLeftovers = false;
|
||||
} else {
|
||||
iterator = oldRegionsToProcess;
|
||||
processingLeftovers = true;
|
||||
}
|
||||
// Reset the leftoverRegions and let the loop re-assign if necessary.
|
||||
setLeftoverRegions(null);
|
||||
long regionSizesCalculated = 0L;
|
||||
long offlineRegionsSkipped = 0L;
|
||||
long skippedSplitParents = 0L;
|
||||
long skippedRegionReplicas = 0L;
|
||||
final long start = EnvironmentEdgeManager.currentTime();
|
||||
while (iterator.hasNext()) {
|
||||
// Make sure this chore doesn't hog the thread.
|
||||
long timeRunning = EnvironmentEdgeManager.currentTime() - start;
|
||||
if (timeRunning > maxIterationMillis) {
|
||||
LOG.debug("Preempting execution of FileSystemUtilizationChore because it exceeds the"
|
||||
+ " maximum iteration configuration value. Will process remaining iterators"
|
||||
+ " on a subsequent invocation.");
|
||||
setLeftoverRegions(iterator);
|
||||
break;
|
||||
}
|
||||
|
||||
final Region region = iterator.next();
|
||||
// If we're processing leftover regions, the region may no-longer be online.
|
||||
// If so, we can skip it.
|
||||
if (processingLeftovers && !onlineRegions.contains(region)) {
|
||||
offlineRegionsSkipped++;
|
||||
continue;
|
||||
}
|
||||
// Avoid computing the size of regions which are the parent of split.
|
||||
if (region.getRegionInfo().isSplitParent()) {
|
||||
skippedSplitParents++;
|
||||
continue;
|
||||
}
|
||||
// Avoid computing the size of region replicas.
|
||||
if (HRegionInfo.DEFAULT_REPLICA_ID != region.getRegionInfo().getReplicaId()) {
|
||||
skippedRegionReplicas++;
|
||||
continue;
|
||||
}
|
||||
final long sizeInBytes = computeSize(region);
|
||||
onlineRegionSizes.put(region.getRegionInfo(), sizeInBytes);
|
||||
regionSizesCalculated++;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Computed the size of " + regionSizesCalculated + " Regions. Skipped computation"
|
||||
+ " of " + offlineRegionsSkipped + " regions due to not being online on this RS, "
|
||||
+ skippedSplitParents + " regions due to being the parent of a split, and"
|
||||
+ skippedRegionReplicas + " regions due to being region replicas.");
|
||||
}
|
||||
reportRegionSizesToMaster(onlineRegionSizes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@link Iterator} over the Regions which were skipped last invocation of the chore.
|
||||
*
|
||||
* @return Regions from the previous invocation to process, or null.
|
||||
*/
|
||||
Iterator<Region> getLeftoverRegions() {
|
||||
return leftoverRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a new collection of Regions as leftovers.
|
||||
*/
|
||||
void setLeftoverRegions(Iterator<Region> newLeftovers) {
|
||||
this.leftoverRegions = newLeftovers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes total FileSystem size for the given {@link Region}.
|
||||
*
|
||||
* @param r The region
|
||||
* @return The size, in bytes, of the Region.
|
||||
*/
|
||||
long computeSize(Region r) {
|
||||
long regionSize = 0L;
|
||||
for (Store store : r.getStores()) {
|
||||
// StoreFile/StoreFileReaders are already instantiated with the file length cached.
|
||||
// Can avoid extra NN ops.
|
||||
regionSize += store.getStorefilesSize();
|
||||
}
|
||||
return regionSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports the computed region sizes to the currently active Master.
|
||||
*
|
||||
* @param onlineRegionSizes The computed region sizes to report.
|
||||
*/
|
||||
void reportRegionSizesToMaster(Map<HRegionInfo,Long> onlineRegionSizes) {
|
||||
this.rs.reportRegionSizesForQuotas(onlineRegionSizes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the period for the chore from the configuration.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured chore period or the default value.
|
||||
*/
|
||||
static int getPeriod(Configuration conf) {
|
||||
return conf.getInt(FS_UTILIZATION_CHORE_PERIOD_KEY, FS_UTILIZATION_CHORE_PERIOD_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the initial delay for the chore from the configuration.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured chore initial delay or the default value.
|
||||
*/
|
||||
static long getInitialDelay(Configuration conf) {
|
||||
return conf.getLong(FS_UTILIZATION_CHORE_DELAY_KEY, FS_UTILIZATION_CHORE_DELAY_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the time unit for the chore period and initial delay from the configuration. The
|
||||
* configuration value for {@link #FS_UTILIZATION_CHORE_TIMEUNIT_KEY} must correspond to a
|
||||
* {@link TimeUnit} value.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured time unit for the chore period and initial delay or the default value.
|
||||
*/
|
||||
static TimeUnit getTimeUnit(Configuration conf) {
|
||||
return TimeUnit.valueOf(conf.get(FS_UTILIZATION_CHORE_TIMEUNIT_KEY,
|
||||
FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT));
|
||||
}
|
||||
}
|
|
@ -19,7 +19,10 @@
|
|||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -62,6 +65,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
private NamedLock<String> userLocks;
|
||||
private boolean enabled = false;
|
||||
private NamespaceAuditor namespaceQuotaManager;
|
||||
private ConcurrentHashMap<HRegionInfo, Long> regionSizes;
|
||||
|
||||
public MasterQuotaManager(final MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
|
@ -85,6 +89,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
namespaceLocks = new NamedLock<>();
|
||||
tableLocks = new NamedLock<>();
|
||||
userLocks = new NamedLock<>();
|
||||
regionSizes = new ConcurrentHashMap<>();
|
||||
|
||||
namespaceQuotaManager = new NamespaceAuditor(masterServices);
|
||||
namespaceQuotaManager.start();
|
||||
|
@ -515,5 +520,15 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
|
||||
}
|
||||
}
|
||||
|
||||
public void addRegionSize(HRegionInfo hri, long size) {
|
||||
// TODO Make proper API
|
||||
regionSizes.put(hri, size);
|
||||
}
|
||||
|
||||
public Map<HRegionInfo, Long> snapshotRegionSizes() {
|
||||
// TODO Make proper API
|
||||
return new HashMap<>(regionSizes);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
|
@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HealthCheckChore;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
|
@ -115,6 +117,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
|
|||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||
import org.apache.hadoop.hbase.mob.MobCacheConfig;
|
||||
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
|
@ -150,12 +153,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
|
||||
|
@ -510,6 +516,8 @@ public class HRegionServer extends HasThread implements
|
|||
|
||||
protected SecureBulkLoadManager secureBulkLoadManager;
|
||||
|
||||
protected FileSystemUtilizationChore fsUtilizationChore;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location.
|
||||
*/
|
||||
|
@ -921,6 +929,8 @@ public class HRegionServer extends HasThread implements
|
|||
// Setup the Quota Manager
|
||||
rsQuotaManager = new RegionServerQuotaManager(this);
|
||||
|
||||
this.fsUtilizationChore = new FileSystemUtilizationChore(this);
|
||||
|
||||
// Setup RPC client for master communication
|
||||
rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
|
||||
rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
|
||||
|
@ -1234,6 +1244,66 @@ public class HRegionServer extends HasThread implements
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports the given map of Regions and their size on the filesystem to the active Master.
|
||||
*
|
||||
* @param onlineRegionSizes A map of region info to size in bytes
|
||||
*/
|
||||
public void reportRegionSizesForQuotas(final Map<HRegionInfo, Long> onlineRegionSizes) {
|
||||
RegionServerStatusService.BlockingInterface rss = rssStub;
|
||||
if (rss == null) {
|
||||
// the current server could be stopping.
|
||||
LOG.trace("Skipping Region size report to HMaster as stub is null");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest(
|
||||
Objects.requireNonNull(onlineRegionSizes));
|
||||
rss.reportRegionSpaceUse(null, request);
|
||||
} catch (ServiceException se) {
|
||||
IOException ioe = ProtobufUtil.getRemoteException(se);
|
||||
if (ioe instanceof PleaseHoldException) {
|
||||
LOG.trace("Failed to report region sizes to Master because it is initializing. This will be retried.", ioe);
|
||||
// The Master is coming up. Will retry the report later. Avoid re-creating the stub.
|
||||
return;
|
||||
}
|
||||
LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
|
||||
if (rssStub == rss) {
|
||||
rssStub = null;
|
||||
}
|
||||
createRegionServerStatusStub(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
|
||||
*
|
||||
* @param regionSizes Map of region info to size in bytes.
|
||||
* @return The corresponding protocol buffer message.
|
||||
*/
|
||||
RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(Map<HRegionInfo,Long> regionSizes) {
|
||||
RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
|
||||
for (Entry<HRegionInfo, Long> entry : Objects.requireNonNull(regionSizes).entrySet()) {
|
||||
request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
return request.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a pair of {@link HRegionInfo} and {@code long} into a {@link RegionSpaceUse}
|
||||
* protobuf message.
|
||||
*
|
||||
* @param regionInfo The HRegionInfo
|
||||
* @param sizeInBytes The size in bytes of the Region
|
||||
* @return The protocol buffer
|
||||
*/
|
||||
RegionSpaceUse convertRegionSize(HRegionInfo regionInfo, Long sizeInBytes) {
|
||||
return RegionSpaceUse.newBuilder()
|
||||
.setRegion(HRegionInfo.convert(Objects.requireNonNull(regionInfo)))
|
||||
.setSize(Objects.requireNonNull(sizeInBytes))
|
||||
.build();
|
||||
}
|
||||
|
||||
ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
|
||||
throws IOException {
|
||||
// We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
|
||||
|
@ -1816,6 +1886,7 @@ public class HRegionServer extends HasThread implements
|
|||
if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
|
||||
if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
|
||||
if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
|
||||
if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
|
||||
|
||||
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
||||
// an unhandled exception, it will just exit.
|
||||
|
@ -2325,6 +2396,7 @@ public class HRegionServer extends HasThread implements
|
|||
if (this.healthCheckChore != null) healthCheckChore.cancel(true);
|
||||
if (this.storefileRefresher != null) storefileRefresher.cancel(true);
|
||||
if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
|
||||
if (this.fsUtilizationChore != null) fsUtilizationChore.cancel(true);
|
||||
|
||||
if (this.cacheFlusher != null) {
|
||||
this.cacheFlusher.join();
|
||||
|
|
|
@ -0,0 +1,357 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* Test class for {@link FileSystemUtilizationChore}.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestFileSystemUtilizationChore {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testNoOnlineRegions() {
|
||||
// One region with a store size of one.
|
||||
final List<Long> regionSizes = Collections.emptyList();
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(regionSizes)))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
|
||||
|
||||
final Region region = mockRegionWithSize(regionSizes);
|
||||
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(region));
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testRegionSizes() {
|
||||
// One region with a store size of one.
|
||||
final List<Long> regionSizes = Arrays.asList(1024L);
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(regionSizes)))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
|
||||
|
||||
final Region region = mockRegionWithSize(regionSizes);
|
||||
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(region));
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testMultipleRegionSizes() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
|
||||
// Three regions with multiple store sizes
|
||||
final List<Long> r1Sizes = Arrays.asList(1024L, 2048L);
|
||||
final long r1Sum = sum(r1Sizes);
|
||||
final List<Long> r2Sizes = Arrays.asList(1024L * 1024L);
|
||||
final long r2Sum = sum(r2Sizes);
|
||||
final List<Long> r3Sizes = Arrays.asList(10L * 1024L * 1024L);
|
||||
final long r3Sum = sum(r3Sizes);
|
||||
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum, r2Sum, r3Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
|
||||
|
||||
final Region r1 = mockRegionWithSize(r1Sizes);
|
||||
final Region r2 = mockRegionWithSize(r2Sizes);
|
||||
final Region r3 = mockRegionWithSize(r3Sizes);
|
||||
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(r1, r2, r3));
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultConfigurationProperties() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
// Verify that the expected default values are actually represented.
|
||||
assertEquals(
|
||||
FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_DEFAULT, chore.getPeriod());
|
||||
assertEquals(
|
||||
FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_DEFAULT, chore.getInitialDelay());
|
||||
assertEquals(
|
||||
TimeUnit.valueOf(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT),
|
||||
chore.getTimeUnit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonDefaultConfigurationProperties() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
// Override the default values
|
||||
final int period = 60 * 10;
|
||||
final long delay = 30L;
|
||||
final TimeUnit timeUnit = TimeUnit.SECONDS;
|
||||
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, period);
|
||||
conf.setLong(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, delay);
|
||||
conf.set(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_TIMEUNIT_KEY, timeUnit.name());
|
||||
|
||||
// Verify that the chore reports these non-default values
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
assertEquals(period, chore.getPeriod());
|
||||
assertEquals(delay, chore.getInitialDelay());
|
||||
assertEquals(timeUnit, chore.getTimeUnit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessingLeftoverRegions() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
|
||||
// Some leftover regions from a previous chore()
|
||||
final List<Long> leftover1Sizes = Arrays.asList(1024L, 4096L);
|
||||
final long leftover1Sum = sum(leftover1Sizes);
|
||||
final List<Long> leftover2Sizes = Arrays.asList(2048L);
|
||||
final long leftover2Sum = sum(leftover2Sizes);
|
||||
|
||||
final Region lr1 = mockRegionWithSize(leftover1Sizes);
|
||||
final Region lr2 = mockRegionWithSize(leftover2Sizes);
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs) {
|
||||
@Override
|
||||
Iterator<Region> getLeftoverRegions() {
|
||||
return Arrays.asList(lr1, lr2).iterator();
|
||||
}
|
||||
};
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(leftover1Sum, leftover2Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
|
||||
|
||||
// We shouldn't compute all of these region sizes, just the leftovers
|
||||
final Region r1 = mockRegionWithSize(Arrays.asList(1024L, 2048L));
|
||||
final Region r2 = mockRegionWithSize(Arrays.asList(1024L * 1024L));
|
||||
final Region r3 = mockRegionWithSize(Arrays.asList(10L * 1024L * 1024L));
|
||||
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(r1, r2, r3, lr1, lr2));
|
||||
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessingNowOfflineLeftoversAreIgnored() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
|
||||
// Some leftover regions from a previous chore()
|
||||
final List<Long> leftover1Sizes = Arrays.asList(1024L, 4096L);
|
||||
final long leftover1Sum = sum(leftover1Sizes);
|
||||
final List<Long> leftover2Sizes = Arrays.asList(2048L);
|
||||
final long leftover2Sum = sum(leftover2Sizes);
|
||||
|
||||
final Region lr1 = mockRegionWithSize(leftover1Sizes);
|
||||
final Region lr2 = mockRegionWithSize(leftover2Sizes);
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs) {
|
||||
@Override
|
||||
Iterator<Region> getLeftoverRegions() {
|
||||
return Arrays.asList(lr1, lr2).iterator();
|
||||
}
|
||||
};
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(leftover1Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
|
||||
|
||||
// We shouldn't compute all of these region sizes, just the leftovers
|
||||
final Region r1 = mockRegionWithSize(Arrays.asList(1024L, 2048L));
|
||||
final Region r2 = mockRegionWithSize(Arrays.asList(1024L * 1024L));
|
||||
final Region r3 = mockRegionWithSize(Arrays.asList(10L * 1024L * 1024L));
|
||||
// lr2 is no longer online, so it should be ignored
|
||||
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(r1, r2, r3, lr1));
|
||||
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testIgnoreSplitParents() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
|
||||
// Three regions with multiple store sizes
|
||||
final List<Long> r1Sizes = Arrays.asList(1024L, 2048L);
|
||||
final long r1Sum = sum(r1Sizes);
|
||||
final List<Long> r2Sizes = Arrays.asList(1024L * 1024L);
|
||||
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
|
||||
|
||||
final Region r1 = mockRegionWithSize(r1Sizes);
|
||||
final Region r2 = mockSplitParentRegionWithSize(r2Sizes);
|
||||
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(r1, r2));
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testIgnoreRegionReplicas() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
|
||||
// Three regions with multiple store sizes
|
||||
final List<Long> r1Sizes = Arrays.asList(1024L, 2048L);
|
||||
final long r1Sum = sum(r1Sizes);
|
||||
final List<Long> r2Sizes = Arrays.asList(1024L * 1024L);
|
||||
|
||||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<HRegionInfo,Long>) any(Map.class));
|
||||
|
||||
final Region r1 = mockRegionWithSize(r1Sizes);
|
||||
final Region r2 = mockRegionReplicaWithSize(r2Sizes);
|
||||
when(rs.getOnlineRegions()).thenReturn(Arrays.asList(r1, r2));
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an HBase Configuration object for the default values.
|
||||
*/
|
||||
private Configuration getDefaultHBaseConfiguration() {
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
conf.addResource("hbase-default.xml");
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an HRegionServer using the given Configuration.
|
||||
*/
|
||||
private HRegionServer mockRegionServer(Configuration conf) {
|
||||
final HRegionServer rs = mock(HRegionServer.class);
|
||||
when(rs.getConfiguration()).thenReturn(conf);
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sums the collection of non-null numbers.
|
||||
*/
|
||||
private long sum(Collection<Long> values) {
|
||||
long sum = 0L;
|
||||
for (Long value : values) {
|
||||
assertNotNull(value);
|
||||
sum += value;
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a region with a number of Stores equal to the length of {@code storeSizes}. Each
|
||||
* {@link Store} will have a reported size corresponding to the element in {@code storeSizes}.
|
||||
*
|
||||
* @param storeSizes A list of sizes for each Store.
|
||||
* @return A mocked Region.
|
||||
*/
|
||||
private Region mockRegionWithSize(Collection<Long> storeSizes) {
|
||||
final Region r = mock(Region.class);
|
||||
final HRegionInfo info = mock(HRegionInfo.class);
|
||||
when(r.getRegionInfo()).thenReturn(info);
|
||||
List<Store> stores = new ArrayList<>();
|
||||
when(r.getStores()).thenReturn(stores);
|
||||
for (Long storeSize : storeSizes) {
|
||||
final Store s = mock(Store.class);
|
||||
stores.add(s);
|
||||
when(s.getStorefilesSize()).thenReturn(storeSize);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a region which is the parent of a split.
|
||||
*
|
||||
* @param storeSizes A list of sizes for each Store.
|
||||
* @return A mocked Region.
|
||||
*/
|
||||
private Region mockSplitParentRegionWithSize(Collection<Long> storeSizes) {
|
||||
final Region r = mockRegionWithSize(storeSizes);
|
||||
final HRegionInfo info = r.getRegionInfo();
|
||||
when(info.isSplitParent()).thenReturn(true);
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a region who has a replicaId of <code>1</code>.
|
||||
*
|
||||
* @param storeSizes A list of sizes for each Store.
|
||||
* @return A mocked Region.
|
||||
*/
|
||||
private Region mockRegionReplicaWithSize(Collection<Long> storeSizes) {
|
||||
final Region r = mockRegionWithSize(storeSizes);
|
||||
final HRegionInfo info = r.getRegionInfo();
|
||||
when(info.getReplicaId()).thenReturn(1);
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
* An Answer implementation which verifies the sum of the Region sizes to report is as expected.
|
||||
*/
|
||||
private static class ExpectedRegionSizeSummationAnswer implements Answer<Void> {
|
||||
private final long expectedSize;
|
||||
|
||||
public ExpectedRegionSizeSummationAnswer(long expectedSize) {
|
||||
this.expectedSize = expectedSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
assertEquals(1, args.length);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<HRegionInfo,Long> regionSizes = (Map<HRegionInfo,Long>) args[0];
|
||||
long sum = 0L;
|
||||
for (Long regionSize : regionSizes.values()) {
|
||||
sum += regionSize;
|
||||
}
|
||||
assertEquals(expectedSize, sum);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
/**
|
||||
* Test class which verifies that region sizes are reported to the master.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestRegionSizeUse {
|
||||
private static final Log LOG = LogFactory.getLog(TestRegionSizeUse.class);
|
||||
private static final int SIZE_PER_VALUE = 256;
|
||||
private static final int NUM_SPLITS = 10;
|
||||
private static final String F1 = "f1";
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private MiniHBaseCluster cluster;
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
|
||||
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
|
||||
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
cluster = TEST_UTIL.startMiniCluster(2);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicRegionSizeReports() throws Exception {
|
||||
final long bytesWritten = 5L * 1024L * 1024L; // 5MB
|
||||
final TableName tn = writeData(bytesWritten);
|
||||
LOG.debug("Data was written to HBase");
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
// Push the data to disk.
|
||||
admin.flush(tn);
|
||||
LOG.debug("Data flushed to disk");
|
||||
// Get the final region distribution
|
||||
final List<HRegionInfo> regions = TEST_UTIL.getAdmin().getTableRegions(tn);
|
||||
|
||||
HMaster master = cluster.getMaster();
|
||||
MasterQuotaManager quotaManager = master.getMasterQuotaManager();
|
||||
Map<HRegionInfo,Long> regionSizes = quotaManager.snapshotRegionSizes();
|
||||
// Wait until we get all of the region reports for our table
|
||||
// The table may split, so make sure we have at least as many as expected right after we
|
||||
// finished writing the data.
|
||||
int observedRegions = numRegionsForTable(tn, regionSizes);
|
||||
while (observedRegions < regions.size()) {
|
||||
LOG.debug("Expecting more regions. Saw " + observedRegions
|
||||
+ " region sizes reported, expected at least " + regions.size());
|
||||
Thread.sleep(1000);
|
||||
regionSizes = quotaManager.snapshotRegionSizes();
|
||||
observedRegions = numRegionsForTable(tn, regionSizes);
|
||||
}
|
||||
|
||||
LOG.debug("Observed region sizes by the HMaster: " + regionSizes);
|
||||
long totalRegionSize = 0L;
|
||||
for (Long regionSize : regionSizes.values()) {
|
||||
totalRegionSize += regionSize;
|
||||
}
|
||||
assertTrue("Expected region size report to exceed " + bytesWritten + ", but was "
|
||||
+ totalRegionSize + ". RegionSizes=" + regionSizes, bytesWritten < totalRegionSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes at least {@code sizeInBytes} bytes of data to HBase and returns the TableName used.
|
||||
*
|
||||
* @param sizeInBytes The amount of data to write in bytes.
|
||||
* @return The table the data was written to
|
||||
*/
|
||||
private TableName writeData(long sizeInBytes) throws IOException {
|
||||
final Connection conn = TEST_UTIL.getConnection();
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final TableName tn = TableName.valueOf(testName.getMethodName());
|
||||
|
||||
// Delete the old table
|
||||
if (admin.tableExists(tn)) {
|
||||
admin.disableTable(tn);
|
||||
admin.deleteTable(tn);
|
||||
}
|
||||
|
||||
// Create the table
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tn);
|
||||
tableDesc.addFamily(new HColumnDescriptor(F1));
|
||||
admin.createTable(tableDesc, Bytes.toBytes("1"), Bytes.toBytes("9"), NUM_SPLITS);
|
||||
|
||||
final Table table = conn.getTable(tn);
|
||||
try {
|
||||
List<Put> updates = new ArrayList<>();
|
||||
long bytesToWrite = sizeInBytes;
|
||||
long rowKeyId = 0L;
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
final Random r = new Random();
|
||||
while (bytesToWrite > 0L) {
|
||||
sb.setLength(0);
|
||||
sb.append(Long.toString(rowKeyId));
|
||||
// Use the reverse counter as the rowKey to get even spread across all regions
|
||||
Put p = new Put(Bytes.toBytes(sb.reverse().toString()));
|
||||
byte[] value = new byte[SIZE_PER_VALUE];
|
||||
r.nextBytes(value);
|
||||
p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value);
|
||||
updates.add(p);
|
||||
|
||||
// Batch 50K worth of updates
|
||||
if (updates.size() > 50) {
|
||||
table.put(updates);
|
||||
updates.clear();
|
||||
}
|
||||
|
||||
// Just count the value size, ignore the size of rowkey + column
|
||||
bytesToWrite -= SIZE_PER_VALUE;
|
||||
rowKeyId++;
|
||||
}
|
||||
|
||||
// Write the final batch
|
||||
if (!updates.isEmpty()) {
|
||||
table.put(updates);
|
||||
}
|
||||
|
||||
return tn;
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the number of regions for the given table that have a positive size.
|
||||
*
|
||||
* @param tn The TableName in question
|
||||
* @param regions A collection of region sizes
|
||||
* @return The number of regions for the given table.
|
||||
*/
|
||||
private int numRegionsForTable(TableName tn, Map<HRegionInfo,Long> regions) {
|
||||
int sum = 0;
|
||||
for (Entry<HRegionInfo,Long> entry : regions.entrySet()) {
|
||||
if (tn.equals(entry.getKey().getTable()) && 0 < entry.getValue()) {
|
||||
sum++;
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test class for isolated (non-cluster) tests surrounding the report
|
||||
* of Region space use to the Master by RegionServers.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestRegionServerRegionSpaceUseReport {
|
||||
|
||||
@Test
|
||||
public void testConversion() {
|
||||
TableName tn = TableName.valueOf("table1");
|
||||
HRegionInfo hri1 = new HRegionInfo(tn, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
HRegionInfo hri2 = new HRegionInfo(tn, Bytes.toBytes("b"), Bytes.toBytes("c"));
|
||||
HRegionInfo hri3 = new HRegionInfo(tn, Bytes.toBytes("c"), Bytes.toBytes("d"));
|
||||
Map<HRegionInfo,Long> sizes = new HashMap<>();
|
||||
sizes.put(hri1, 1024L * 1024L);
|
||||
sizes.put(hri2, 1024L * 1024L * 8L);
|
||||
sizes.put(hri3, 1024L * 1024L * 32L);
|
||||
|
||||
// Call the real method to convert the map into a protobuf
|
||||
HRegionServer rs = mock(HRegionServer.class);
|
||||
doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any(Map.class));
|
||||
doCallRealMethod().when(rs).convertRegionSize(any(HRegionInfo.class), anyLong());
|
||||
|
||||
RegionSpaceUseReportRequest requests = rs.buildRegionSpaceUseReportRequest(sizes);
|
||||
assertEquals(sizes.size(), requests.getSpaceUseCount());
|
||||
for (RegionSpaceUse spaceUse : requests.getSpaceUseList()) {
|
||||
RegionInfo ri = spaceUse.getRegion();
|
||||
HRegionInfo hri = HRegionInfo.convert(ri);
|
||||
Long expectedSize = sizes.remove(hri);
|
||||
assertNotNull("Could not find size for HRI: " + hri, expectedSize);
|
||||
assertEquals(expectedSize.longValue(), spaceUse.getSize());
|
||||
}
|
||||
assertTrue("Should not have any space use entries left: " + sizes, sizes.isEmpty());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testNullMap() {
|
||||
// Call the real method to convert the map into a protobuf
|
||||
HRegionServer rs = mock(HRegionServer.class);
|
||||
doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any(Map.class));
|
||||
doCallRealMethod().when(rs).convertRegionSize(any(HRegionInfo.class), anyLong());
|
||||
|
||||
rs.buildRegionSpaceUseReportRequest(null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testMalformedMap() {
|
||||
TableName tn = TableName.valueOf("table1");
|
||||
HRegionInfo hri1 = new HRegionInfo(tn, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
Map<HRegionInfo,Long> sizes = new HashMap<>();
|
||||
sizes.put(hri1, null);
|
||||
|
||||
// Call the real method to convert the map into a protobuf
|
||||
HRegionServer rs = mock(HRegionServer.class);
|
||||
doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any(Map.class));
|
||||
doCallRealMethod().when(rs).convertRegionSize(any(HRegionInfo.class), anyLong());
|
||||
|
||||
rs.buildRegionSpaceUseReportRequest(sizes);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue