HBASE-18133 Decrease quota reaction latency by HBase
Certain operations in HBase are known to directly affect the utilization of tables on HDFS. When these actions occur, we can circumvent the normal path and notify the Master directly. This results in a much faster response to changes in HDFS usage. This requires FS scanning by the RS to be decoupled from the reporting of sizes to the Master. An API inside each RS is made so that any operation can hook into this call in the face of other operations (e.g. compaction, flush, bulk load). Signed-off-by: Ted Yu <yuzhihong@gmail.com>
This commit is contained in:
parent
393ab302ab
commit
bdedcc5631
|
@ -28,6 +28,14 @@ public interface MetricsRegionServerQuotaSource extends BaseSource {
|
|||
String METRICS_DESCRIPTION = "Metrics about HBase RegionServer Quotas";
|
||||
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
|
||||
|
||||
String NUM_TABLES_IN_VIOLATION_NAME = "numTablesInViolation";
|
||||
String NUM_SPACE_SNAPSHOTS_RECEIVED_NAME = "numSpaceSnapshotsReceived";
|
||||
String FILE_SYSTEM_UTILIZATION_CHORE_TIME = "fileSystemUtilizationChoreTime";
|
||||
String SPACE_QUOTA_REFRESHER_CHORE_TIME = "spaceQuotaRefresherChoreTime";
|
||||
|
||||
String NUM_REGION_SIZE_REPORT_NAME = "numRegionSizeReports";
|
||||
String REGION_SIZE_REPORTING_CHORE_TIME_NAME = "regionSizeReportingChoreTime";
|
||||
|
||||
/**
|
||||
* Updates the metric tracking how many tables this RegionServer has marked as in violation
|
||||
* of their space quota.
|
||||
|
@ -57,4 +65,20 @@ public interface MetricsRegionServerQuotaSource extends BaseSource {
|
|||
* @param time The execution time of the chore in milliseconds.
|
||||
*/
|
||||
void incrementSpaceQuotaRefresherChoreTime(long time);
|
||||
|
||||
/**
|
||||
* Updates the metric tracking how many region size reports were sent from this RegionServer to
|
||||
* the Master. These reports contain information on the size of each Region hosted locally.
|
||||
*
|
||||
* @param numReportsSent The number of region size reports sent
|
||||
*/
|
||||
void incrementNumRegionSizeReportsSent(long numReportsSent);
|
||||
|
||||
/**
|
||||
* Updates the metric tracking how much time was spent sending region size reports to the Master
|
||||
* by the RegionSizeReportingChore.
|
||||
*
|
||||
* @param time The execution time in milliseconds.
|
||||
*/
|
||||
void incrementRegionSizeReportingChoreTime(long time);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
|
||||
import org.apache.hadoop.hbase.metrics.Counter;
|
||||
import org.apache.hadoop.hbase.metrics.Meter;
|
||||
import org.apache.hadoop.hbase.metrics.Timer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Implementation of {@link MetricsRegionServerQuotaSource}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsRegionServerQuotaSourceImpl extends BaseSourceImpl implements
|
||||
MetricsRegionServerQuotaSource {
|
||||
|
||||
private final Meter tablesInViolationCounter;
|
||||
private final Meter spaceQuotaSnapshotsReceived;
|
||||
private final Timer fileSystemUtilizationChoreTimer;
|
||||
private final Timer spaceQuotaRefresherChoreTimer;
|
||||
private final Counter regionSizeReportCounter;
|
||||
private final Timer regionSizeReportingChoreTimer;
|
||||
|
||||
public MetricsRegionServerQuotaSourceImpl() {
|
||||
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
|
||||
}
|
||||
|
||||
public MetricsRegionServerQuotaSourceImpl(String metricsName, String metricsDescription,
|
||||
String metricsContext, String metricsJmxContext) {
|
||||
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
|
||||
|
||||
tablesInViolationCounter = this.registry.meter(NUM_TABLES_IN_VIOLATION_NAME);
|
||||
spaceQuotaSnapshotsReceived = this.registry.meter(NUM_SPACE_SNAPSHOTS_RECEIVED_NAME);
|
||||
fileSystemUtilizationChoreTimer = this.registry.timer(FILE_SYSTEM_UTILIZATION_CHORE_TIME);
|
||||
spaceQuotaRefresherChoreTimer = this.registry.timer(SPACE_QUOTA_REFRESHER_CHORE_TIME);
|
||||
regionSizeReportCounter = this.registry.counter(NUM_REGION_SIZE_REPORT_NAME);
|
||||
regionSizeReportingChoreTimer = registry.timer(REGION_SIZE_REPORTING_CHORE_TIME_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateNumTablesInSpaceQuotaViolation(long tablesInViolation) {
|
||||
this.tablesInViolationCounter.mark(tablesInViolation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateNumTableSpaceQuotaSnapshots(long numSnapshots) {
|
||||
this.spaceQuotaSnapshotsReceived.mark(numSnapshots);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementSpaceQuotaFileSystemScannerChoreTime(long time) {
|
||||
this.fileSystemUtilizationChoreTimer.updateMillis(time);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementSpaceQuotaRefresherChoreTime(long time) {
|
||||
this.spaceQuotaRefresherChoreTimer.updateMillis(time);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementNumRegionSizeReportsSent(long numReportsSent) {
|
||||
regionSizeReportCounter.increment(numReportsSent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementRegionSizeReportingChoreTime(long time) {
|
||||
regionSizeReportingChoreTimer.update(time, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
org.apache.hadoop.hbase.regionserver.MetricsRegionServerQuotaSourceImpl
|
|
@ -16,10 +16,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -36,7 +34,9 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given {@link HRegionServer}.
|
||||
* A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given
|
||||
* {@link HRegionServer}. The results of this computation are stored in the
|
||||
* {@link RegionServerSpaceQuotaManager}'s {@link RegionSizeStore} object.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FileSystemUtilizationChore extends ScheduledChore {
|
||||
|
@ -53,9 +53,6 @@ public class FileSystemUtilizationChore extends ScheduledChore {
|
|||
static final String FS_UTILIZATION_MAX_ITERATION_DURATION_KEY = "hbase.regionserver.quotas.fs.utilization.chore.max.iteration.millis";
|
||||
static final long FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT = 5000L;
|
||||
|
||||
private int numberOfCyclesToSkip = 0, prevNumberOfCyclesToSkip = 0;
|
||||
private static final int CYCLE_UPPER_BOUND = 32;
|
||||
|
||||
private final HRegionServer rs;
|
||||
private final long maxIterationMillis;
|
||||
private Iterator<Region> leftoverRegions;
|
||||
|
@ -70,11 +67,7 @@ public class FileSystemUtilizationChore extends ScheduledChore {
|
|||
|
||||
@Override
|
||||
protected void chore() {
|
||||
if (numberOfCyclesToSkip > 0) {
|
||||
numberOfCyclesToSkip--;
|
||||
return;
|
||||
}
|
||||
final Map<RegionInfo, Long> onlineRegionSizes = new HashMap<>();
|
||||
final RegionSizeStore regionSizeStore = getRegionSizeStore();
|
||||
final Set<Region> onlineRegions = new HashSet<>(rs.getRegions());
|
||||
// Process the regions from the last run if we have any. If we are somehow having difficulty
|
||||
// processing the Regions, we want to avoid creating a backlog in memory of Region objs.
|
||||
|
@ -100,7 +93,7 @@ public class FileSystemUtilizationChore extends ScheduledChore {
|
|||
long timeRunning = EnvironmentEdgeManager.currentTime() - start;
|
||||
if (timeRunning > maxIterationMillis) {
|
||||
LOG.debug("Preempting execution of FileSystemUtilizationChore because it exceeds the"
|
||||
+ " maximum iteration configuration value. Will process remaining iterators"
|
||||
+ " maximum iteration configuration value. Will process remaining Regions"
|
||||
+ " on a subsequent invocation.");
|
||||
setLeftoverRegions(iterator);
|
||||
break;
|
||||
|
@ -124,7 +117,7 @@ public class FileSystemUtilizationChore extends ScheduledChore {
|
|||
continue;
|
||||
}
|
||||
final long sizeInBytes = computeSize(region);
|
||||
onlineRegionSizes.put(region.getRegionInfo(), sizeInBytes);
|
||||
regionSizeStore.put(region.getRegionInfo(), sizeInBytes);
|
||||
regionSizesCalculated++;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -133,14 +126,6 @@ public class FileSystemUtilizationChore extends ScheduledChore {
|
|||
+ skippedSplitParents + " regions due to being the parent of a split, and"
|
||||
+ skippedRegionReplicas + " regions due to being region replicas.");
|
||||
}
|
||||
if (!reportRegionSizesToMaster(onlineRegionSizes)) {
|
||||
// backoff reporting
|
||||
numberOfCyclesToSkip = prevNumberOfCyclesToSkip > 0 ? 2 * prevNumberOfCyclesToSkip : 1;
|
||||
if (numberOfCyclesToSkip > CYCLE_UPPER_BOUND) {
|
||||
numberOfCyclesToSkip = CYCLE_UPPER_BOUND;
|
||||
}
|
||||
prevNumberOfCyclesToSkip = numberOfCyclesToSkip;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -176,15 +161,9 @@ public class FileSystemUtilizationChore extends ScheduledChore {
|
|||
return regionSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports the computed region sizes to the currently active Master.
|
||||
*
|
||||
* @param onlineRegionSizes The computed region sizes to report.
|
||||
* @return {@code false} if FileSystemUtilizationChore should pause reporting to master,
|
||||
* {@code true} otherwise.
|
||||
*/
|
||||
boolean reportRegionSizesToMaster(Map<RegionInfo,Long> onlineRegionSizes) {
|
||||
return this.rs.reportRegionSizesForQuotas(onlineRegionSizes);
|
||||
// VisibleForTesting
|
||||
RegionSizeStore getRegionSizeStore() {
|
||||
return rs.getRegionServerSpaceQuotaManager().getRegionSizeStore();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A {@link RegionSizeStore} implementation that stores nothing.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class NoOpRegionSizeStore implements RegionSizeStore {
|
||||
private static final NoOpRegionSizeStore INSTANCE = new NoOpRegionSizeStore();
|
||||
|
||||
private NoOpRegionSizeStore() {}
|
||||
|
||||
public static NoOpRegionSizeStore getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Entry<RegionInfo,RegionSize>> iterator() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionSize getRegionSize(RegionInfo regionInfo) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(RegionInfo regionInfo, long size) {}
|
||||
|
||||
@Override
|
||||
public void incrementRegionSize(RegionInfo regionInfo, long delta) {}
|
||||
|
||||
@Override
|
||||
public RegionSize remove(RegionInfo regionInfo) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {}
|
||||
}
|
|
@ -55,6 +55,8 @@ public class RegionServerSpaceQuotaManager {
|
|||
private boolean started = false;
|
||||
private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
|
||||
private SpaceViolationPolicyEnforcementFactory factory;
|
||||
private RegionSizeStore regionSizeStore;
|
||||
private RegionSizeReportingChore regionSizeReporter;
|
||||
|
||||
public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
|
||||
this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
|
||||
|
@ -67,6 +69,8 @@ public class RegionServerSpaceQuotaManager {
|
|||
this.factory = factory;
|
||||
this.enforcedPolicies = new ConcurrentHashMap<>();
|
||||
this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
|
||||
// Initialize the size store to not track anything -- create the real one if we're start()'ed
|
||||
this.regionSizeStore = NoOpRegionSizeStore.getInstance();
|
||||
}
|
||||
|
||||
public synchronized void start() throws IOException {
|
||||
|
@ -79,8 +83,13 @@ public class RegionServerSpaceQuotaManager {
|
|||
LOG.warn("RegionServerSpaceQuotaManager has already been started!");
|
||||
return;
|
||||
}
|
||||
// Start the chores
|
||||
this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
|
||||
rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
|
||||
this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
|
||||
rsServices.getChoreService().scheduleChore(regionSizeReporter);
|
||||
// Instantiate the real RegionSizeStore
|
||||
this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore();
|
||||
started = true;
|
||||
}
|
||||
|
||||
|
@ -89,6 +98,10 @@ public class RegionServerSpaceQuotaManager {
|
|||
spaceQuotaRefresher.cancel();
|
||||
spaceQuotaRefresher = null;
|
||||
}
|
||||
if (regionSizeReporter != null) {
|
||||
regionSizeReporter.cancel();
|
||||
regionSizeReporter = null;
|
||||
}
|
||||
started = false;
|
||||
}
|
||||
|
||||
|
@ -211,6 +224,15 @@ public class RegionServerSpaceQuotaManager {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
|
||||
*
|
||||
* @return A {@link RegionSizeStore} implementation.
|
||||
*/
|
||||
public RegionSizeStore getRegionSizeStore() {
|
||||
return regionSizeStore;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the collection of tables which have quota violation policies enforced on
|
||||
* this RegionServer.
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Interface that encapsulates optionally sending a Region's size to the master.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface RegionSize extends HeapSize {
|
||||
|
||||
/**
|
||||
* Updates the size of the Region.
|
||||
*
|
||||
* @param newSize the new size of the Region
|
||||
* @return {@code this}
|
||||
*/
|
||||
RegionSize setSize(long newSize);
|
||||
|
||||
/**
|
||||
* Atomically adds the provided {@code delta} to the region size.
|
||||
*
|
||||
* @param delta The change in size in bytes of the region.
|
||||
* @return {@code this}
|
||||
*/
|
||||
RegionSize incrementSize(long delta);
|
||||
|
||||
/**
|
||||
* Returns the size of the region.
|
||||
*
|
||||
* @return The size in bytes.
|
||||
*/
|
||||
long getSize();
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* An object encapsulating a Region's size and whether it's been reported to the master since
|
||||
* the value last changed.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionSizeImpl implements RegionSize {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionSizeImpl.class);
|
||||
private static final long HEAP_SIZE = ClassSize.OBJECT + ClassSize.ATOMIC_LONG +
|
||||
ClassSize.REFERENCE;
|
||||
private final AtomicLong size;
|
||||
|
||||
public RegionSizeImpl(long initialSize) {
|
||||
// A region can never be negative in size. We can prevent this from being a larger problem, but
|
||||
// we will need to leave ourselves a note to figure out how we got here.
|
||||
if (initialSize < 0L && LOG.isTraceEnabled()) {
|
||||
LOG.trace("Nonsensical negative Region size being constructed, this is likely an error",
|
||||
new Exception());
|
||||
}
|
||||
this.size = new AtomicLong(initialSize < 0L ? 0L : initialSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return HEAP_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionSizeImpl setSize(long newSize) {
|
||||
// Set the new size before advertising that we need to tell the master about it. Worst case
|
||||
// we have to wait for the next period to report it.
|
||||
size.set(newSize);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionSizeImpl incrementSize(long delta) {
|
||||
size.addAndGet(delta);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return size.get();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A Chore which sends the region size reports on this RegionServer to the Master.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionSizeReportingChore extends ScheduledChore {
|
||||
private static final Log LOG = LogFactory.getLog(RegionSizeReportingChore.class);
|
||||
|
||||
static final String REGION_SIZE_REPORTING_CHORE_PERIOD_KEY =
|
||||
"hbase.regionserver.quotas.region.size.reporting.chore.period";
|
||||
static final int REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT = 1000 * 60;
|
||||
|
||||
static final String REGION_SIZE_REPORTING_CHORE_DELAY_KEY =
|
||||
"hbase.regionserver.quotas.region.size.reporting.chore.delay";
|
||||
static final long REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT = 1000 * 30;
|
||||
|
||||
static final String REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY =
|
||||
"hbase.regionserver.quotas.region.size.reporting.chore.timeunit";
|
||||
static final String REGION_SIZE_REPORTING_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
|
||||
|
||||
private final RegionServerServices rsServices;
|
||||
private final MetricsRegionServer metrics;
|
||||
|
||||
public RegionSizeReportingChore(RegionServerServices rsServices) {
|
||||
super(
|
||||
RegionSizeReportingChore.class.getSimpleName(), rsServices,
|
||||
getPeriod(rsServices.getConfiguration()), getInitialDelay(rsServices.getConfiguration()),
|
||||
getTimeUnit(rsServices.getConfiguration()));
|
||||
this.rsServices = rsServices;
|
||||
this.metrics = rsServices.getMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
final long start = System.nanoTime();
|
||||
try {
|
||||
_chore();
|
||||
} finally {
|
||||
if (metrics != null) {
|
||||
metrics.incrementRegionSizeReportingChoreTime(
|
||||
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _chore() {
|
||||
final RegionServerSpaceQuotaManager quotaManager =
|
||||
rsServices.getRegionServerSpaceQuotaManager();
|
||||
// Get the HRegionInfo for each online region
|
||||
HashSet<RegionInfo> onlineRegionInfos = getOnlineRegionInfos(rsServices.getRegions());
|
||||
RegionSizeStore store = quotaManager.getRegionSizeStore();
|
||||
// Remove all sizes for non-online regions
|
||||
removeNonOnlineRegions(store, onlineRegionInfos);
|
||||
rsServices.reportRegionSizesForQuotas(store);
|
||||
}
|
||||
|
||||
HashSet<RegionInfo> getOnlineRegionInfos(List<? extends Region> onlineRegions) {
|
||||
HashSet<RegionInfo> regionInfos = new HashSet<>();
|
||||
onlineRegions.forEach((region) -> regionInfos.add(region.getRegionInfo()));
|
||||
return regionInfos;
|
||||
}
|
||||
|
||||
void removeNonOnlineRegions(RegionSizeStore store, Set<RegionInfo> onlineRegions) {
|
||||
// We have to remove regions which are no longer online from the store, otherwise they will
|
||||
// continue to be sent to the Master which will prevent size report expiration.
|
||||
if (onlineRegions.isEmpty()) {
|
||||
// Easy-case, no online regions means no size reports
|
||||
store.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
Iterator<Entry<RegionInfo,RegionSize>> iter = store.iterator();
|
||||
int numEntriesRemoved = 0;
|
||||
while (iter.hasNext()) {
|
||||
Entry<RegionInfo,RegionSize> entry = iter.next();
|
||||
RegionInfo regionInfo = entry.getKey();
|
||||
if (!onlineRegions.contains(regionInfo)) {
|
||||
numEntriesRemoved++;
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Removed " + numEntriesRemoved + " region sizes before reporting to Master "
|
||||
+ "because they are for non-online regions.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the period for the chore from the configuration.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured chore period or the default value.
|
||||
*/
|
||||
static int getPeriod(Configuration conf) {
|
||||
return conf.getInt(
|
||||
REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the initial delay for the chore from the configuration.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured chore initial delay or the default value.
|
||||
*/
|
||||
static long getInitialDelay(Configuration conf) {
|
||||
return conf.getLong(
|
||||
REGION_SIZE_REPORTING_CHORE_DELAY_KEY, REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the time unit for the chore period and initial delay from the configuration. The
|
||||
* configuration value for {@link #REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY} must correspond to a
|
||||
* {@link TimeUnit} value.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @return The configured time unit for the chore period and initial delay or the default value.
|
||||
*/
|
||||
static TimeUnit getTimeUnit(Configuration conf) {
|
||||
return TimeUnit.valueOf(conf.get(REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY,
|
||||
REGION_SIZE_REPORTING_CHORE_TIMEUNIT_DEFAULT));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* An interface for concurrently storing and updating the size of a Region.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface RegionSizeStore extends Iterable<Entry<RegionInfo,RegionSize>>, HeapSize {
|
||||
|
||||
/**
|
||||
* Returns the size for the give region if one exists. If no size exists, {@code null} is
|
||||
* returned.
|
||||
*
|
||||
* @param regionInfo The region whose size is being fetched.
|
||||
* @return The size in bytes of the region or null if no size is stored.
|
||||
*/
|
||||
RegionSize getRegionSize(RegionInfo regionInfo);
|
||||
|
||||
/**
|
||||
* Atomically sets the given {@code size} for a region.
|
||||
*
|
||||
* @param regionInfo An identifier for a region.
|
||||
* @param size The size in bytes of the region.
|
||||
*/
|
||||
void put(RegionInfo regionInfo, long size);
|
||||
|
||||
/**
|
||||
* Atomically alter the size of a region.
|
||||
*
|
||||
* @param regionInfo The region to update.
|
||||
* @param delta The change in size for the region, positive or negative.
|
||||
*/
|
||||
void incrementRegionSize(RegionInfo regionInfo, long delta);
|
||||
|
||||
/**
|
||||
* Removes the mapping for the given key, returning the value if one exists in the store.
|
||||
*
|
||||
* @param regionInfo The key to remove from the store
|
||||
* @return The value removed from the store if one exists, otherwise null.
|
||||
*/
|
||||
RegionSize remove(RegionInfo regionInfo);
|
||||
|
||||
/**
|
||||
* Returns the number of entries in the store.
|
||||
*
|
||||
* @return The number of entries in the store.
|
||||
*/
|
||||
int size();
|
||||
|
||||
/**
|
||||
* Returns if the store is empty.
|
||||
*
|
||||
* @return true if there are no entries in the store, otherwise false.
|
||||
*/
|
||||
boolean isEmpty();
|
||||
|
||||
/**
|
||||
* Removes all entries from the store.
|
||||
*/
|
||||
void clear();
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A factory class for creating implementations of {@link RegionSizeStore}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class RegionSizeStoreFactory {
|
||||
private static final RegionSizeStoreFactory INSTANCE = new RegionSizeStoreFactory();
|
||||
|
||||
private RegionSizeStoreFactory() {}
|
||||
|
||||
public static RegionSizeStoreFactory getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
public RegionSizeStore createStore() {
|
||||
// Presently, there is only one implementation.
|
||||
return new RegionSizeStoreImpl();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A {@link RegionSizeStore} implementation backed by a ConcurrentHashMap. We expected similar
|
||||
* amounts of reads and writes to the "store", so using a RWLock is not going to provide any
|
||||
* exceptional gains.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionSizeStoreImpl implements RegionSizeStore {
|
||||
private static final Log LOG = LogFactory.getLog(RegionSizeStoreImpl.class);
|
||||
private static final long sizeOfEntry = ClassSize.align(
|
||||
ClassSize.CONCURRENT_HASHMAP_ENTRY
|
||||
+ ClassSize.OBJECT + Bytes.SIZEOF_LONG
|
||||
// TODO Have RegionInfo implement HeapSize. 100B is an approximation based on a heapdump.
|
||||
+ ClassSize.OBJECT + 100);
|
||||
private final ConcurrentHashMap<RegionInfo,RegionSize> store;
|
||||
|
||||
public RegionSizeStoreImpl() {
|
||||
store = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Entry<RegionInfo,RegionSize>> iterator() {
|
||||
return store.entrySet().iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionSize getRegionSize(RegionInfo regionInfo) {
|
||||
return store.get(regionInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(RegionInfo regionInfo, long size) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Setting space quota size for " + regionInfo + " to " + size);
|
||||
}
|
||||
// Atomic. Either sets the new size for the first time, or replaces the existing value.
|
||||
store.compute(regionInfo,
|
||||
(key,value) -> value == null ? new RegionSizeImpl(size) : value.setSize(size));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementRegionSize(RegionInfo regionInfo, long delta) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Updating space quota size for " + regionInfo + " with a delta of " + delta);
|
||||
}
|
||||
// Atomic. Recomputes the stored value with the delta if there is one, otherwise use the delta.
|
||||
store.compute(regionInfo,
|
||||
(key,value) -> value == null ? new RegionSizeImpl(delta) : value.incrementSize(delta));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionSize remove(RegionInfo regionInfo) {
|
||||
return store.remove(regionInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
// Will have to iterate over each element if RegionInfo implements HeapSize, for now it's just
|
||||
// a simple calculation.
|
||||
return sizeOfEntry * store.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return store.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return store.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
store.clear();
|
||||
}
|
||||
}
|
|
@ -80,12 +80,13 @@ public interface SpaceViolationPolicyEnforcement {
|
|||
boolean shouldCheckBulkLoads();
|
||||
|
||||
/**
|
||||
* Checks the file at the given path against <code>this</code> policy and the current
|
||||
* {@link SpaceQuotaSnapshot}. If the file would violate the policy, a
|
||||
* Computes the size of the file(s) at the given path against <code>this</code> policy and the
|
||||
* current {@link SpaceQuotaSnapshot}. If the file would violate the policy, a
|
||||
* {@link SpaceLimitingException} will be thrown.
|
||||
*
|
||||
* @param paths The paths in HDFS to files to be bulk loaded.
|
||||
* @return The size, in bytes, of the files that would be loaded.
|
||||
*/
|
||||
void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException;
|
||||
long computeBulkLoadSize(FileSystem fs, List<String> paths) throws SpaceLimitingException;
|
||||
|
||||
}
|
||||
|
|
|
@ -16,14 +16,19 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.quotas.policies;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Abstract implementation for {@link SpaceViolationPolicyEnforcement}.
|
||||
|
@ -74,4 +79,27 @@ public abstract class AbstractViolationPolicyEnforcement
|
|||
public boolean areCompactionsDisabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the size of a single file on the filesystem. If the size cannot be computed for some
|
||||
* reason, a {@link SpaceLimitingException} is thrown, as the file may violate a quota. If the
|
||||
* provided path does not reference a file, an {@link IllegalArgumentException} is thrown.
|
||||
*
|
||||
* @param fs The FileSystem which the path refers to a file upon
|
||||
* @param path The path on the {@code fs} to a file whose size is being checked
|
||||
* @return The size in bytes of the file
|
||||
*/
|
||||
long getFileSize(FileSystem fs, String path) throws SpaceLimitingException {
|
||||
final FileStatus status;
|
||||
try {
|
||||
status = fs.getFileStatus(new Path(Objects.requireNonNull(path)));
|
||||
} catch (IOException e) {
|
||||
throw new SpaceLimitingException(
|
||||
getPolicyName(), "Could not verify length of file to bulk load: " + path, e);
|
||||
}
|
||||
if (!status.isFile()) {
|
||||
throw new IllegalArgumentException(path + " is not a file.");
|
||||
}
|
||||
return status.getLen();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,11 +18,8 @@ package org.apache.hadoop.hbase.quotas.policies;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
|
||||
|
@ -58,33 +55,22 @@ public class DefaultViolationPolicyEnforcement extends AbstractViolationPolicyEn
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException {
|
||||
public long computeBulkLoadSize(FileSystem fs, List<String> paths) throws SpaceLimitingException {
|
||||
// Compute the amount of space that could be used to save some arithmetic in the for-loop
|
||||
final long sizeAvailableForBulkLoads = quotaSnapshot.getLimit() - quotaSnapshot.getUsage();
|
||||
long size = 0L;
|
||||
for (String path : paths) {
|
||||
size += addSingleFile(fs, path);
|
||||
try {
|
||||
size += getFileSize(fs, path);
|
||||
} catch (IOException e) {
|
||||
throw new SpaceLimitingException(
|
||||
getPolicyName(), "Colud not verify length of file to bulk load: " + path, e);
|
||||
}
|
||||
if (size > sizeAvailableForBulkLoads) {
|
||||
break;
|
||||
throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths
|
||||
+ " is disallowed because the file(s) exceed the limits of a space quota.");
|
||||
}
|
||||
}
|
||||
if (size > sizeAvailableForBulkLoads) {
|
||||
throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths
|
||||
+ " is disallowed because the file(s) exceed the limits of a space quota.");
|
||||
}
|
||||
}
|
||||
|
||||
private long addSingleFile(FileSystem fs, String path) throws SpaceLimitingException {
|
||||
final FileStatus status;
|
||||
try {
|
||||
status = fs.getFileStatus(new Path(Objects.requireNonNull(path)));
|
||||
} catch (IOException e) {
|
||||
throw new SpaceLimitingException(
|
||||
getPolicyName(), "Could not verify length of file to bulk load", e);
|
||||
}
|
||||
if (!status.isFile()) {
|
||||
throw new IllegalArgumentException(path + " is not a file.");
|
||||
}
|
||||
return status.getLen();
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,13 @@ public class MissingSnapshotViolationPolicyEnforcement extends AbstractViolation
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkBulkLoad(FileSystem fs, List<String> paths) {}
|
||||
public long computeBulkLoadSize(FileSystem fs, List<String> paths) throws SpaceLimitingException {
|
||||
long size = 0;
|
||||
for (String path : paths) {
|
||||
size += getFileSize(fs, path);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enable() throws IOException {}
|
||||
|
|
|
@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.ipc.RpcCall;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
|
||||
|
@ -2645,6 +2646,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Set down the memstore size by amount of flush.
|
||||
this.decrMemStoreSize(prepareResult.totalFlushableSize);
|
||||
|
||||
// Increase the size of this Region for the purposes of quota. Noop if quotas are disabled.
|
||||
// During startup, quota manager may not be initialized yet.
|
||||
if (rsServices != null) {
|
||||
RegionServerSpaceQuotaManager quotaManager = rsServices.getRegionServerSpaceQuotaManager();
|
||||
if (quotaManager != null) {
|
||||
quotaManager.getRegionSizeStore().incrementRegionSize(
|
||||
this.getRegionInfo(), flushedOutputFileSize);
|
||||
}
|
||||
}
|
||||
|
||||
if (wal != null) {
|
||||
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
|
||||
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
|
||||
|
|
|
@ -120,6 +120,8 @@ import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
|
|||
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSize;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
|
@ -1209,10 +1211,10 @@ public class HRegionServer extends HasThread implements
|
|||
/**
|
||||
* Reports the given map of Regions and their size on the filesystem to the active Master.
|
||||
*
|
||||
* @param onlineRegionSizes A map of region info to size in bytes
|
||||
* @param regionSizeStore The store containing region sizes
|
||||
* @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
|
||||
*/
|
||||
public boolean reportRegionSizesForQuotas(final Map<RegionInfo, Long> onlineRegionSizes) {
|
||||
public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
|
||||
RegionServerStatusService.BlockingInterface rss = rssStub;
|
||||
if (rss == null) {
|
||||
// the current server could be stopping.
|
||||
|
@ -1220,9 +1222,7 @@ public class HRegionServer extends HasThread implements
|
|||
return true;
|
||||
}
|
||||
try {
|
||||
RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest(
|
||||
Objects.requireNonNull(onlineRegionSizes));
|
||||
rss.reportRegionSpaceUse(null, request);
|
||||
buildReportAndSend(rss, regionSizeStore);
|
||||
} catch (ServiceException se) {
|
||||
IOException ioe = ProtobufUtil.getRemoteException(se);
|
||||
if (ioe instanceof PleaseHoldException) {
|
||||
|
@ -1250,16 +1250,34 @@ public class HRegionServer extends HasThread implements
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the region size report and sends it to the master. Upon successful sending of the
|
||||
* report, the region sizes that were sent are marked as sent.
|
||||
*
|
||||
* @param rss The stub to send to the Master
|
||||
* @param regionSizeStore The store containing region sizes
|
||||
*/
|
||||
void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
|
||||
RegionSizeStore regionSizeStore) throws ServiceException {
|
||||
RegionSpaceUseReportRequest request =
|
||||
buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
|
||||
rss.reportRegionSpaceUse(null, request);
|
||||
// Record the number of size reports sent
|
||||
if (metricsRegionServer != null) {
|
||||
metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
|
||||
*
|
||||
* @param regionSizes Map of region info to size in bytes.
|
||||
* @param regionSizeStore The size in bytes of regions
|
||||
* @return The corresponding protocol buffer message.
|
||||
*/
|
||||
RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(Map<RegionInfo,Long> regionSizes) {
|
||||
RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
|
||||
RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
|
||||
for (Entry<RegionInfo, Long> entry : Objects.requireNonNull(regionSizes).entrySet()) {
|
||||
request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue()));
|
||||
for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
|
||||
request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
|
||||
}
|
||||
return request.build();
|
||||
}
|
||||
|
|
|
@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
|||
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
|
||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
|
@ -1455,11 +1456,50 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
synchronized (filesCompacting) {
|
||||
filesCompacting.removeAll(compactedFiles);
|
||||
}
|
||||
|
||||
// These may be null when the RS is shutting down. The space quota Chores will fix the Region
|
||||
// sizes later so it's not super-critical if we miss these.
|
||||
RegionServerServices rsServices = region.getRegionServerServices();
|
||||
if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
|
||||
updateSpaceQuotaAfterFileReplacement(
|
||||
rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
|
||||
compactedFiles, result);
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the space quota usage for this region, removing the size for files compacted away
|
||||
* and adding in the size for new files.
|
||||
*
|
||||
* @param sizeStore The object tracking changes in region size for space quotas.
|
||||
* @param regionInfo The identifier for the region whose size is being updated.
|
||||
* @param oldFiles Files removed from this store's region.
|
||||
* @param newFiles Files added to this store's region.
|
||||
*/
|
||||
void updateSpaceQuotaAfterFileReplacement(
|
||||
RegionSizeStore sizeStore, RegionInfo regionInfo, Collection<HStoreFile> oldFiles,
|
||||
Collection<HStoreFile> newFiles) {
|
||||
long delta = 0;
|
||||
if (oldFiles != null) {
|
||||
for (HStoreFile compactedFile : oldFiles) {
|
||||
if (compactedFile.isHFile()) {
|
||||
delta -= compactedFile.getReader().length();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newFiles != null) {
|
||||
for (HStoreFile newFile : newFiles) {
|
||||
if (newFile.isHFile()) {
|
||||
delta += newFile.getReader().length();
|
||||
}
|
||||
}
|
||||
}
|
||||
sizeStore.incrementRegionSize(regionInfo, delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a very elaborate compaction completion message.
|
||||
* @param cr Request.
|
||||
|
|
|
@ -46,6 +46,7 @@ public class MetricsRegionServer {
|
|||
private MetricsRegionServerSource serverSource;
|
||||
private MetricsRegionServerWrapper regionServerWrapper;
|
||||
private RegionServerTableMetrics tableMetrics;
|
||||
private MetricsRegionServerQuotaSource quotaSource;
|
||||
|
||||
private MetricRegistry metricRegistry;
|
||||
private Timer bulkLoadTimer;
|
||||
|
@ -62,6 +63,8 @@ public class MetricsRegionServer {
|
|||
|
||||
// create and use metrics from the new hbase-metrics based registry.
|
||||
bulkLoadTimer = metricRegistry.timer("Bulkload");
|
||||
|
||||
quotaSource = CompatibilitySingletonFactory.getInstance(MetricsRegionServerQuotaSource.class);
|
||||
}
|
||||
|
||||
MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
|
||||
|
@ -211,4 +214,18 @@ public class MetricsRegionServer {
|
|||
public void updateBulkLoad(long millis) {
|
||||
this.bulkLoadTimer.updateMillis(millis);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see MetricsRegionServerQuotaSource#incrementNumRegionSizeReportsSent(long)
|
||||
*/
|
||||
public void incrementNumRegionSizeReportsSent(long numReportsSent) {
|
||||
quotaSource.incrementNumRegionSizeReportsSent(numReportsSent);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see MetricsRegionServerQuotaSource#incrementRegionSizeReportingChoreTime(long)
|
||||
*/
|
||||
public void incrementRegionSizeReportingChoreTime(long time) {
|
||||
quotaSource.incrementRegionSizeReportingChoreTime(time);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2279,9 +2279,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
requestCount.increment();
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
Map<byte[], List<Path>> map = null;
|
||||
final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
|
||||
long sizeToBeLoaded = -1;
|
||||
|
||||
// Check to see if this bulk load would exceed the space quota for this table
|
||||
if (QuotaUtil.isQuotaEnabled(getConfiguration())) {
|
||||
if (spaceQuotaEnabled) {
|
||||
ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
|
||||
SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
|
||||
region);
|
||||
|
@ -2292,7 +2294,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
filePaths.add(familyPath.getPath());
|
||||
}
|
||||
// Check if the batch of files exceeds the current quota
|
||||
enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths);
|
||||
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2318,6 +2320,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
|
||||
builder.setLoaded(map != null);
|
||||
if (map != null) {
|
||||
// Treat any negative size as a flag to "ignore" updating the region size as that is
|
||||
// not possible to occur in real life (cannot bulk load a file with negative size)
|
||||
if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
|
||||
+ sizeToBeLoaded + " bytes");
|
||||
}
|
||||
// Inform space quotas of the new files for this region
|
||||
getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(
|
||||
region.getRegionInfo(), sizeToBeLoaded);
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException ie) {
|
||||
throw new ServiceException(ie);
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
|
@ -234,4 +235,12 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
|
|||
* See HBASE-17712 for more details.
|
||||
*/
|
||||
void unassign(byte[] regionName) throws IOException;
|
||||
|
||||
/**
|
||||
* Reports the provided Region sizes hosted by this RegionServer to the active Master.
|
||||
*
|
||||
* @param sizeStore The sizes for Regions locally hosted.
|
||||
* @return {@code false} if reporting should be temporarily paused, {@code true} otherwise.
|
||||
*/
|
||||
boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||
|
@ -334,4 +335,9 @@ public class MockRegionServerServices implements RegionServerServices {
|
|||
public Connection createConnection(Configuration conf) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||
|
@ -678,4 +679,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
|||
public Connection createConnection(Configuration conf) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,21 +29,30 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -87,6 +96,8 @@ public class SpaceQuotaHelperForTests {
|
|||
conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000);
|
||||
conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000);
|
||||
conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000);
|
||||
conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, 1000);
|
||||
conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_KEY, 1000);
|
||||
// The period at which we check for compacted files that should be deleted from HDFS
|
||||
conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
|
||||
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
|
@ -116,10 +127,7 @@ public class SpaceQuotaHelperForTests {
|
|||
void removeAllQuotas(Connection conn) throws IOException, InterruptedException {
|
||||
// Wait for the quota table to be created
|
||||
if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
|
||||
do {
|
||||
LOG.debug("Quota table does not yet exist");
|
||||
Thread.sleep(1000);
|
||||
} while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME));
|
||||
waitForQuotaTable(conn);
|
||||
} else {
|
||||
// Or, clean up any quotas from previous test runs.
|
||||
QuotaRetriever scanner = QuotaRetriever.open(conn.getConfiguration());
|
||||
|
@ -159,14 +167,14 @@ public class SpaceQuotaHelperForTests {
|
|||
/**
|
||||
* Waits 30seconds for the HBase quota table to exist.
|
||||
*/
|
||||
void waitForQuotaTable(Connection conn) throws IOException {
|
||||
public void waitForQuotaTable(Connection conn) throws IOException {
|
||||
waitForQuotaTable(conn, 30_000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits {@code timeout} milliseconds for the HBase quota table to exist.
|
||||
*/
|
||||
void waitForQuotaTable(Connection conn, long timeout) throws IOException {
|
||||
public void waitForQuotaTable(Connection conn, long timeout) throws IOException {
|
||||
testUtil.waitFor(timeout, 1000, new Predicate<IOException>() {
|
||||
@Override
|
||||
public boolean evaluate() throws IOException {
|
||||
|
@ -316,8 +324,8 @@ public class SpaceQuotaHelperForTests {
|
|||
}
|
||||
|
||||
// Create the table
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tn);
|
||||
tableDesc.addFamily(new HColumnDescriptor(F1));
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tn)
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(F1)).build();
|
||||
if (numRegions == 1) {
|
||||
admin.createTable(tableDesc);
|
||||
} else {
|
||||
|
@ -338,8 +346,8 @@ public class SpaceQuotaHelperForTests {
|
|||
}
|
||||
|
||||
// Create the table
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tn);
|
||||
tableDesc.addFamily(new HColumnDescriptor(F1));
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tn)
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(F1)).build();
|
||||
|
||||
admin.createTable(tableDesc);
|
||||
return tn;
|
||||
|
@ -364,6 +372,44 @@ public class SpaceQuotaHelperForTests {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bulk-loads a number of files with a number of rows to the given table.
|
||||
*/
|
||||
ClientServiceCallable<Boolean> generateFileToLoad(
|
||||
TableName tn, int numFiles, int numRowsPerFile) throws Exception {
|
||||
Connection conn = testUtil.getConnection();
|
||||
FileSystem fs = testUtil.getTestFileSystem();
|
||||
Configuration conf = testUtil.getConfiguration();
|
||||
Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files");
|
||||
fs.mkdirs(baseDir);
|
||||
final List<Pair<byte[], String>> famPaths = new ArrayList<>();
|
||||
for (int i = 1; i <= numFiles; i++) {
|
||||
Path hfile = new Path(baseDir, "file" + i);
|
||||
TestHRegionServerBulkLoad.createHFile(
|
||||
fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("my"),
|
||||
Bytes.toBytes("file"), numRowsPerFile);
|
||||
famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfile.toString()));
|
||||
}
|
||||
|
||||
// bulk load HFiles
|
||||
Table table = conn.getTable(tn);
|
||||
final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
|
||||
return new ClientServiceCallable<Boolean>(
|
||||
conn, tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(),
|
||||
HConstants.PRIORITY_UNSET) {
|
||||
@Override
|
||||
public Boolean rpcCall() throws Exception {
|
||||
SecureBulkLoadClient secureClient = null;
|
||||
byte[] regionName = getLocation().getRegion().getRegionName();
|
||||
try (Table table = conn.getTable(getTableName())) {
|
||||
secureClient = new SecureBulkLoadClient(conf, table);
|
||||
return secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
|
||||
true, null, bulkToken);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstraction to simplify the case where a test needs to verify a certain state
|
||||
* on a {@code SpaceQuotaSnapshot}. This class fails-fast when there is no such
|
||||
|
|
|
@ -57,7 +57,6 @@ public class TestFileSystemUtilizationChore {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestFileSystemUtilizationChore.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testNoOnlineRegions() {
|
||||
// One region with a store size of one.
|
||||
|
@ -67,14 +66,13 @@ public class TestFileSystemUtilizationChore {
|
|||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(regionSizes)))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
final Region region = mockRegionWithSize(regionSizes);
|
||||
Mockito.doReturn(Arrays.asList(region)).when(rs).getRegions();
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testRegionSizes() {
|
||||
// One region with a store size of one.
|
||||
|
@ -84,14 +82,13 @@ public class TestFileSystemUtilizationChore {
|
|||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(regionSizes)))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
final Region region = mockRegionWithSize(regionSizes);
|
||||
Mockito.doReturn(Arrays.asList(region)).when(rs).getRegions();
|
||||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testMultipleRegionSizes() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
|
@ -108,7 +105,7 @@ public class TestFileSystemUtilizationChore {
|
|||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum, r2Sum, r3Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
final Region r1 = mockRegionWithSize(r1Sizes);
|
||||
final Region r2 = mockRegionWithSize(r2Sizes);
|
||||
|
@ -151,7 +148,6 @@ public class TestFileSystemUtilizationChore {
|
|||
assertEquals(timeUnit, chore.getTimeUnit());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testProcessingLeftoverRegions() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
|
@ -173,7 +169,7 @@ public class TestFileSystemUtilizationChore {
|
|||
};
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(leftover1Sum, leftover2Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
// We shouldn't compute all of these region sizes, just the leftovers
|
||||
final Region r1 = mockRegionWithSize(Arrays.asList(1024L, 2048L));
|
||||
|
@ -184,7 +180,6 @@ public class TestFileSystemUtilizationChore {
|
|||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testProcessingNowOfflineLeftoversAreIgnored() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
|
@ -205,7 +200,7 @@ public class TestFileSystemUtilizationChore {
|
|||
};
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(leftover1Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
// We shouldn't compute all of these region sizes, just the leftovers
|
||||
final Region r1 = mockRegionWithSize(Arrays.asList(1024L, 2048L));
|
||||
|
@ -217,7 +212,6 @@ public class TestFileSystemUtilizationChore {
|
|||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testIgnoreSplitParents() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
|
@ -231,7 +225,7 @@ public class TestFileSystemUtilizationChore {
|
|||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum))))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
final Region r1 = mockRegionWithSize(r1Sizes);
|
||||
final Region r2 = mockSplitParentRegionWithSize(r2Sizes);
|
||||
|
@ -239,7 +233,6 @@ public class TestFileSystemUtilizationChore {
|
|||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testIgnoreRegionReplicas() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
|
@ -253,7 +246,7 @@ public class TestFileSystemUtilizationChore {
|
|||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(r1Sum))
|
||||
.when(rs)
|
||||
.reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
final Region r1 = mockRegionWithSize(r1Sizes);
|
||||
final Region r2 = mockRegionReplicaWithSize(r2Sizes);
|
||||
|
@ -261,7 +254,6 @@ public class TestFileSystemUtilizationChore {
|
|||
chore.chore();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testNonHFilesAreIgnored() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
|
@ -280,7 +272,7 @@ public class TestFileSystemUtilizationChore {
|
|||
final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
|
||||
doAnswer(new ExpectedRegionSizeSummationAnswer(
|
||||
sum(Arrays.asList(r1HFileSizeSum, r2HFileSizeSum))))
|
||||
.when(rs).reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
|
||||
.when(rs).reportRegionSizesForQuotas(any(RegionSizeStore.class));
|
||||
|
||||
final Region r1 = mockRegionWithHFileLinks(r1StoreFileSizes, r1HFileSizes);
|
||||
final Region r2 = mockRegionWithHFileLinks(r2StoreFileSizes, r2HFileSizes);
|
||||
|
@ -302,7 +294,10 @@ public class TestFileSystemUtilizationChore {
|
|||
*/
|
||||
private HRegionServer mockRegionServer(Configuration conf) {
|
||||
final HRegionServer rs = mock(HRegionServer.class);
|
||||
final RegionServerSpaceQuotaManager quotaManager = mock(RegionServerSpaceQuotaManager.class);
|
||||
when(rs.getConfiguration()).thenReturn(conf);
|
||||
when(rs.getRegionServerSpaceQuotaManager()).thenReturn(quotaManager);
|
||||
when(quotaManager.getRegionSizeStore()).thenReturn(new RegionSizeStoreImpl());
|
||||
return rs;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,228 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
|
||||
@Category({MediumTests.class})
|
||||
public class TestLowLatencySpaceQuotas {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
// Global for all tests in the class
|
||||
private static final AtomicLong COUNTER = new AtomicLong(0);
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
private SpaceQuotaHelperForTests helper;
|
||||
private Connection conn;
|
||||
private Admin admin;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
// The default 1s period for QuotaObserverChore is good.
|
||||
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
|
||||
// Set the period to read region size from HDFS to be very long
|
||||
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void removeAllQuotas() throws Exception {
|
||||
helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
|
||||
conn = TEST_UTIL.getConnection();
|
||||
admin = TEST_UTIL.getAdmin();
|
||||
helper.waitForQuotaTable(conn);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushes() throws Exception {
|
||||
TableName tn = helper.createTableWithRegions(1);
|
||||
// Set a quota
|
||||
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
|
||||
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
|
||||
admin.setQuota(settings);
|
||||
|
||||
// Write some data
|
||||
final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
|
||||
helper.writeData(tn, initialSize);
|
||||
|
||||
// Make sure a flush happened
|
||||
admin.flush(tn);
|
||||
|
||||
// We should be able to observe the system recording an increase in size (even
|
||||
// though we know the filesystem scanning did not happen).
|
||||
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
|
||||
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||
return snapshot.getUsage() >= initialSize;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMajorCompaction() throws Exception {
|
||||
TableName tn = helper.createTableWithRegions(1);
|
||||
// Set a quota
|
||||
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
|
||||
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
|
||||
admin.setQuota(settings);
|
||||
|
||||
// Write some data and flush it to disk.
|
||||
final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
|
||||
helper.writeData(tn, sizePerBatch);
|
||||
admin.flush(tn);
|
||||
|
||||
// Write the same data again, flushing it to a second file
|
||||
helper.writeData(tn, sizePerBatch);
|
||||
admin.flush(tn);
|
||||
|
||||
// After two flushes, both hfiles would contain similar data. We should see 2x the data.
|
||||
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
|
||||
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||
return snapshot.getUsage() >= 2L * sizePerBatch;
|
||||
}
|
||||
});
|
||||
|
||||
// Rewrite the two files into one.
|
||||
admin.majorCompact(tn);
|
||||
|
||||
// After we major compact the table, we should notice quickly that the amount of data in the
|
||||
// table is much closer to reality (the duplicate entries across the two files are removed).
|
||||
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
|
||||
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||
return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinorCompaction() throws Exception {
|
||||
TableName tn = helper.createTableWithRegions(1);
|
||||
// Set a quota
|
||||
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
|
||||
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
|
||||
admin.setQuota(settings);
|
||||
|
||||
// Write some data and flush it to disk.
|
||||
final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
|
||||
final long numBatches = 6;
|
||||
for (long i = 0; i < numBatches; i++) {
|
||||
helper.writeData(tn, sizePerBatch);
|
||||
admin.flush(tn);
|
||||
}
|
||||
|
||||
HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
|
||||
long numFiles = getNumHFilesForRegion(region);
|
||||
assertEquals(numBatches, numFiles);
|
||||
|
||||
// After two flushes, both hfiles would contain similar data. We should see 2x the data.
|
||||
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
|
||||
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||
return snapshot.getUsage() >= numFiles * sizePerBatch;
|
||||
}
|
||||
});
|
||||
|
||||
// Rewrite some files into fewer
|
||||
TEST_UTIL.compact(tn, false);
|
||||
long numFilesAfterMinorCompaction = getNumHFilesForRegion(region);
|
||||
|
||||
// After we major compact the table, we should notice quickly that the amount of data in the
|
||||
// table is much closer to reality (the duplicate entries across the two files are removed).
|
||||
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
|
||||
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||
return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch &&
|
||||
snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private long getNumHFilesForRegion(HRegion region) {
|
||||
return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkLoading() throws Exception {
|
||||
TableName tn = helper.createTableWithRegions(1);
|
||||
// Set a quota
|
||||
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
|
||||
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
|
||||
admin.setQuota(settings);
|
||||
|
||||
ClientServiceCallable<Boolean> callable = helper.generateFileToLoad(tn, 3, 550);
|
||||
// Make sure the files are about as long as we expect
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
FileStatus[] files = fs.listStatus(
|
||||
new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
|
||||
long totalSize = 0;
|
||||
for (FileStatus file : files) {
|
||||
assertTrue(
|
||||
"Expected the file, " + file.getPath() + ", length to be larger than 25KB, but was "
|
||||
+ file.getLen(),
|
||||
file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
|
||||
totalSize += file.getLen();
|
||||
}
|
||||
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
RpcRetryingCaller<Boolean> caller = factory.<Boolean> newCaller();
|
||||
assertTrue("The bulk load failed", caller.callWithRetries(callable, Integer.MAX_VALUE));
|
||||
|
||||
final long finalTotalSize = totalSize;
|
||||
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
|
||||
@Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
|
||||
return snapshot.getUsage() >= finalTotalSize;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -29,17 +29,18 @@ import java.util.Map.Entry;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -86,8 +87,8 @@ public class TestQuotaObserverChoreRegionReports {
|
|||
@Test
|
||||
public void testReportExpiration() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
// Send reports every 30 seconds
|
||||
conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 25000);
|
||||
// Send reports every 25 seconds
|
||||
conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, 25000);
|
||||
// Expire the reports after 5 seconds
|
||||
conf.setInt(QuotaObserverChore.REGION_REPORT_RETENTION_DURATION_KEY, 5000);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
|
@ -103,8 +104,8 @@ public class TestQuotaObserverChoreRegionReports {
|
|||
|
||||
// Create a table
|
||||
final TableName tn = TableName.valueOf("reportExpiration");
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tn);
|
||||
tableDesc.addFamily(new HColumnDescriptor(FAM1));
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tn).addColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.of(FAM1)).build();
|
||||
TEST_UTIL.getAdmin().createTable(tableDesc);
|
||||
|
||||
// No reports right after we created this table.
|
||||
|
@ -148,8 +149,8 @@ public class TestQuotaObserverChoreRegionReports {
|
|||
|
||||
// Create a table
|
||||
final TableName tn = TableName.valueOf("quotaAcceptanceWithoutReports");
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tn);
|
||||
tableDesc.addFamily(new HColumnDescriptor(FAM1));
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tn).addColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.of(FAM1)).build();
|
||||
TEST_UTIL.getAdmin().createTable(tableDesc);
|
||||
|
||||
// Set a quota
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({SmallTests.class})
|
||||
public class TestRegionSizeImpl {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionSizeImpl.class);
|
||||
|
||||
@Test
|
||||
public void testReportingWithSizeChanges() {
|
||||
long currentSize = 1024L;
|
||||
RegionSizeImpl size = new RegionSizeImpl(currentSize);
|
||||
|
||||
assertEquals(currentSize, size.getSize());
|
||||
|
||||
currentSize *= 2L;
|
||||
size.setSize(currentSize);
|
||||
assertEquals(currentSize, size.getSize());
|
||||
|
||||
long delta = 512L;
|
||||
currentSize += delta;
|
||||
size.incrementSize(delta);
|
||||
assertEquals(currentSize, size.getSize());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({SmallTests.class})
|
||||
public class TestRegionSizeReportingChore {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionSizeReportingChore.class);
|
||||
|
||||
@Test
|
||||
public void testDefaultConfigurationProperties() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
RegionSizeReportingChore chore = new RegionSizeReportingChore(rs);
|
||||
assertEquals(
|
||||
RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT,
|
||||
chore.getInitialDelay());
|
||||
assertEquals(
|
||||
RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT, chore.getPeriod());
|
||||
assertEquals(
|
||||
TimeUnit.valueOf(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_TIMEUNIT_DEFAULT),
|
||||
chore.getTimeUnit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonDefaultConfigurationProperties() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
final int period = RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT + 1;
|
||||
final long delay = RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT + 1L;
|
||||
final String timeUnit = TimeUnit.SECONDS.name();
|
||||
conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, period);
|
||||
conf.setLong(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_KEY, delay);
|
||||
conf.set(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY, timeUnit);
|
||||
RegionSizeReportingChore chore = new RegionSizeReportingChore(rs);
|
||||
assertEquals(delay, chore.getInitialDelay());
|
||||
assertEquals(period, chore.getPeriod());
|
||||
assertEquals(TimeUnit.valueOf(timeUnit), chore.getTimeUnit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovableOfNonOnlineRegions() {
|
||||
final Configuration conf = getDefaultHBaseConfiguration();
|
||||
final HRegionServer rs = mockRegionServer(conf);
|
||||
RegionSizeReportingChore chore = new RegionSizeReportingChore(rs);
|
||||
|
||||
RegionInfo infoA = RegionInfoBuilder.newBuilder(TableName.valueOf("T1"))
|
||||
.setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
|
||||
RegionInfo infoB = RegionInfoBuilder.newBuilder(TableName.valueOf("T1"))
|
||||
.setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("d")).build();
|
||||
RegionInfo infoC = RegionInfoBuilder.newBuilder(TableName.valueOf("T1"))
|
||||
.setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
|
||||
|
||||
RegionSizeStore store = new RegionSizeStoreImpl();
|
||||
store.put(infoA, 1024L);
|
||||
store.put(infoB, 1024L);
|
||||
store.put(infoC, 1024L);
|
||||
|
||||
// If there are no online regions, all entries should be removed.
|
||||
chore.removeNonOnlineRegions(store, Collections.<RegionInfo> emptySet());
|
||||
assertTrue(store.isEmpty());
|
||||
|
||||
store.put(infoA, 1024L);
|
||||
store.put(infoB, 1024L);
|
||||
store.put(infoC, 1024L);
|
||||
|
||||
// Remove a single region
|
||||
chore.removeNonOnlineRegions(store, new HashSet<>(Arrays.asList(infoA, infoC)));
|
||||
assertEquals(2, store.size());
|
||||
assertNotNull(store.getRegionSize(infoA));
|
||||
assertNotNull(store.getRegionSize(infoC));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an HBase Configuration object for the default values.
|
||||
*/
|
||||
private Configuration getDefaultHBaseConfiguration() {
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
conf.addResource("hbase-default.xml");
|
||||
return conf;
|
||||
}
|
||||
|
||||
private HRegionServer mockRegionServer(Configuration conf) {
|
||||
HRegionServer rs = mock(HRegionServer.class);
|
||||
when(rs.getConfiguration()).thenReturn(conf);
|
||||
return rs;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({SmallTests.class})
|
||||
public class TestRegionSizeStoreImpl {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionSizeStoreImpl.class);
|
||||
|
||||
private static final RegionInfo INFOA = RegionInfoBuilder.newBuilder(TableName.valueOf("TEST"))
|
||||
.setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
|
||||
private static final RegionInfo INFOB = RegionInfoBuilder.newBuilder(TableName.valueOf("TEST"))
|
||||
.setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
|
||||
|
||||
@Test
|
||||
public void testSizeUpdates() {
|
||||
RegionSizeStore store = new RegionSizeStoreImpl();
|
||||
assertTrue(store.isEmpty());
|
||||
assertEquals(0, store.size());
|
||||
|
||||
store.put(INFOA, 1024L);
|
||||
|
||||
assertFalse(store.isEmpty());
|
||||
assertEquals(1, store.size());
|
||||
assertEquals(1024L, store.getRegionSize(INFOA).getSize());
|
||||
|
||||
store.put(INFOA, 2048L);
|
||||
assertEquals(1, store.size());
|
||||
assertEquals(2048L, store.getRegionSize(INFOA).getSize());
|
||||
|
||||
store.incrementRegionSize(INFOA, 512L);
|
||||
assertEquals(1, store.size());
|
||||
assertEquals(2048L + 512L, store.getRegionSize(INFOA).getSize());
|
||||
|
||||
store.remove(INFOA);
|
||||
assertTrue(store.isEmpty());
|
||||
assertEquals(0, store.size());
|
||||
|
||||
store.put(INFOA, 64L);
|
||||
store.put(INFOB, 128L);
|
||||
|
||||
assertEquals(2, store.size());
|
||||
Map<RegionInfo,RegionSize> records = new HashMap<>();
|
||||
for (Entry<RegionInfo,RegionSize> entry : store) {
|
||||
records.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
assertEquals(64L, records.remove(INFOA).getSize());
|
||||
assertEquals(128L, records.remove(INFOB).getSize());
|
||||
assertTrue(records.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNegativeDeltaForMissingRegion() {
|
||||
RegionSizeStore store = new RegionSizeStoreImpl();
|
||||
|
||||
assertNull(store.getRegionSize(INFOA));
|
||||
|
||||
// We shouldn't allow a negative size to enter the RegionSizeStore. Getting a negative size
|
||||
// like this shouldn't be possible, but we can prevent the bad state from propagating and
|
||||
// getting worse.
|
||||
store.incrementRegionSize(INFOA, -5);
|
||||
assertNotNull(store.getRegionSize(INFOA));
|
||||
assertEquals(0, store.getRegionSize(INFOA).getSize());
|
||||
}
|
||||
}
|
|
@ -23,9 +23,7 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -36,7 +34,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
|
@ -52,17 +49,13 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
|||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.quotas.policies.DefaultViolationPolicyEnforcement;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -247,9 +240,9 @@ public class TestSpaceQuotas {
|
|||
TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
|
||||
|
||||
// The table is now in violation. Try to do a bulk load
|
||||
ClientServiceCallable<Void> callable = generateFileToLoad(tableName, 1, 50);
|
||||
ClientServiceCallable<Boolean> callable = helper.generateFileToLoad(tableName, 1, 50);
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
RpcRetryingCaller<Boolean> caller = factory.newCaller();
|
||||
try {
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
fail("Expected the bulk load call to fail!");
|
||||
|
@ -298,7 +291,7 @@ public class TestSpaceQuotas {
|
|||
enforcement instanceof DefaultViolationPolicyEnforcement);
|
||||
|
||||
// Should generate two files, each of which is over 25KB each
|
||||
ClientServiceCallable<Void> callable = generateFileToLoad(tn, 2, 500);
|
||||
ClientServiceCallable<Boolean> callable = helper.generateFileToLoad(tn, 2, 525);
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
FileStatus[] files = fs.listStatus(
|
||||
new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
|
||||
|
@ -311,7 +304,7 @@ public class TestSpaceQuotas {
|
|||
}
|
||||
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
RpcRetryingCaller<Boolean> caller = factory.newCaller();
|
||||
try {
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
fail("Expected the bulk load call to fail!");
|
||||
|
@ -432,39 +425,4 @@ public class TestSpaceQuotas {
|
|||
assertTrue(
|
||||
"Expected to see an exception writing data to a table exceeding its quota", sawError);
|
||||
}
|
||||
|
||||
private ClientServiceCallable<Void> generateFileToLoad(
|
||||
TableName tn, int numFiles, int numRowsPerFile) throws Exception {
|
||||
Connection conn = TEST_UTIL.getConnection();
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files");
|
||||
fs.mkdirs(baseDir);
|
||||
final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>();
|
||||
for (int i = 1; i <= numFiles; i++) {
|
||||
Path hfile = new Path(baseDir, "file" + i);
|
||||
TestHRegionServerBulkLoad.createHFile(
|
||||
fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
|
||||
Bytes.toBytes("reject"), numRowsPerFile);
|
||||
famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfile.toString()));
|
||||
}
|
||||
|
||||
// bulk load HFiles
|
||||
Table table = conn.getTable(tn);
|
||||
final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
|
||||
return new ClientServiceCallable<Void>(conn,
|
||||
tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(), HConstants.PRIORITY_UNSET) {
|
||||
@Override
|
||||
public Void rpcCall() throws Exception {
|
||||
SecureBulkLoadClient secureClient = null;
|
||||
byte[] regionName = getLocation().getRegionInfo().getRegionName();
|
||||
try (Table table = conn.getTable(getTableName())) {
|
||||
secureClient = new SecureBulkLoadClient(conf, table);
|
||||
secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
|
||||
true, null, bulkToken);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
|
|||
|
||||
policy.initialize(rss, tableName, snapshot);
|
||||
|
||||
policy.checkBulkLoad(fs, paths);
|
||||
policy.computeBulkLoadSize(fs, paths);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
|
@ -97,7 +97,7 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
|
|||
policy.initialize(rss, tableName, snapshot);
|
||||
|
||||
// If the file to bulk load isn't a file, this should throw an exception
|
||||
policy.checkBulkLoad(fs, paths);
|
||||
policy.computeBulkLoadSize(fs, paths);
|
||||
}
|
||||
|
||||
@Test(expected = SpaceLimitingException.class)
|
||||
|
@ -120,7 +120,7 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
|
|||
|
||||
policy.initialize(rss, tableName, snapshot);
|
||||
|
||||
policy.checkBulkLoad(fs, paths);
|
||||
policy.computeBulkLoadSize(fs, paths);
|
||||
}
|
||||
|
||||
@Test(expected = SpaceLimitingException.class)
|
||||
|
@ -143,6 +143,6 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
|
|||
|
||||
policy.initialize(rss, tableName, snapshot);
|
||||
|
||||
policy.checkBulkLoad(fs, paths);
|
||||
policy.computeBulkLoadSize(fs, paths);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||
|
@ -1658,6 +1659,56 @@ public class TestHStore {
|
|||
assertFalse(heap.equals(heap2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpaceQuotaChangeAfterReplacement() throws IOException {
|
||||
final TableName tn = TableName.valueOf(name.getMethodName());
|
||||
init(name.getMethodName());
|
||||
|
||||
RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
|
||||
|
||||
HStoreFile sf1 = mockStoreFileWithLength(1024L);
|
||||
HStoreFile sf2 = mockStoreFileWithLength(2048L);
|
||||
HStoreFile sf3 = mockStoreFileWithLength(4096L);
|
||||
HStoreFile sf4 = mockStoreFileWithLength(8192L);
|
||||
|
||||
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
|
||||
.setEndKey(Bytes.toBytes("b")).build();
|
||||
|
||||
// Compacting two files down to one, reducing size
|
||||
sizeStore.put(regionInfo, 1024L + 4096L);
|
||||
store.updateSpaceQuotaAfterFileReplacement(
|
||||
sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
|
||||
|
||||
assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
|
||||
|
||||
// The same file length in and out should have no change
|
||||
store.updateSpaceQuotaAfterFileReplacement(
|
||||
sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
|
||||
|
||||
assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
|
||||
|
||||
// Increase the total size used
|
||||
store.updateSpaceQuotaAfterFileReplacement(
|
||||
sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
|
||||
|
||||
assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
|
||||
|
||||
RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
|
||||
.setEndKey(Bytes.toBytes("c")).build();
|
||||
store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
|
||||
|
||||
assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
|
||||
}
|
||||
|
||||
private HStoreFile mockStoreFileWithLength(long length) {
|
||||
HStoreFile sf = mock(HStoreFile.class);
|
||||
StoreFileReader sfr = mock(StoreFileReader.class);
|
||||
when(sf.isHFile()).thenReturn(true);
|
||||
when(sf.getReader()).thenReturn(sfr);
|
||||
when(sfr.length()).thenReturn(length);
|
||||
return sf;
|
||||
}
|
||||
|
||||
private static class MyThread extends Thread {
|
||||
private StoreScanner scanner;
|
||||
private KeyValueHeap heap;
|
||||
|
|
|
@ -25,12 +25,13 @@ import static org.mockito.Matchers.anyLong;
|
|||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSize;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStoreFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -68,52 +69,24 @@ public class TestRegionServerRegionSpaceUseReport {
|
|||
.setStartKey(Bytes.toBytes("c"))
|
||||
.setEndKey(Bytes.toBytes("d"))
|
||||
.build();
|
||||
Map<RegionInfo,Long> sizes = new HashMap<>();
|
||||
sizes.put(hri1, 1024L * 1024L);
|
||||
sizes.put(hri2, 1024L * 1024L * 8L);
|
||||
sizes.put(hri3, 1024L * 1024L * 32L);
|
||||
RegionSizeStore store = RegionSizeStoreFactory.getInstance().createStore();
|
||||
store.put(hri1, 1024L * 1024L);
|
||||
store.put(hri2, 1024L * 1024L * 8L);
|
||||
store.put(hri3, 1024L * 1024L * 32L);
|
||||
|
||||
// Call the real method to convert the map into a protobuf
|
||||
HRegionServer rs = mock(HRegionServer.class);
|
||||
doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any());
|
||||
doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any(RegionSizeStore.class));
|
||||
doCallRealMethod().when(rs).convertRegionSize(any(), anyLong());
|
||||
|
||||
RegionSpaceUseReportRequest requests = rs.buildRegionSpaceUseReportRequest(sizes);
|
||||
assertEquals(sizes.size(), requests.getSpaceUseCount());
|
||||
RegionSpaceUseReportRequest requests = rs.buildRegionSpaceUseReportRequest(store);
|
||||
assertEquals(store.size(), requests.getSpaceUseCount());
|
||||
for (RegionSpaceUse spaceUse : requests.getSpaceUseList()) {
|
||||
RegionInfo hri = ProtobufUtil.toRegionInfo(spaceUse.getRegionInfo());
|
||||
Long expectedSize = sizes.remove(hri);
|
||||
RegionSize expectedSize = store.remove(hri);
|
||||
assertNotNull("Could not find size for HRI: " + hri, expectedSize);
|
||||
assertEquals(expectedSize.longValue(), spaceUse.getRegionSize());
|
||||
assertEquals(expectedSize.getSize(), spaceUse.getRegionSize());
|
||||
}
|
||||
assertTrue("Should not have any space use entries left: " + sizes, sizes.isEmpty());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testNullMap() {
|
||||
// Call the real method to convert the map into a protobuf
|
||||
HRegionServer rs = mock(HRegionServer.class);
|
||||
doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any());
|
||||
doCallRealMethod().when(rs).convertRegionSize(any(), anyLong());
|
||||
|
||||
rs.buildRegionSpaceUseReportRequest(null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testMalformedMap() {
|
||||
TableName tn = TableName.valueOf("table1");
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tn)
|
||||
.setStartKey(Bytes.toBytes("a"))
|
||||
.setEndKey(Bytes.toBytes("b"))
|
||||
.build();
|
||||
Map<RegionInfo,Long> sizes = new HashMap<>();
|
||||
sizes.put(hri1, null);
|
||||
|
||||
// Call the real method to convert the map into a protobuf
|
||||
HRegionServer rs = mock(HRegionServer.class);
|
||||
doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any());
|
||||
doCallRealMethod().when(rs).convertRegionSize(any(), anyLong());
|
||||
|
||||
rs.buildRegionSpaceUseReportRequest(sizes);
|
||||
assertTrue("Should not have any space use entries left: " + store, store.isEmpty());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue