HBASE-24164 Retain the ReadRequests and WriteRequests of region on we… (#1500)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
60f6f79e3c
commit
87b8bdf617
|
@ -151,6 +151,11 @@ public class TransitRegionStateProcedure
|
|||
this.forceNewPlan = forceNewPlan;
|
||||
this.type = type;
|
||||
setInitalAndLastState();
|
||||
|
||||
// when do reopen TRSP, let the rs know the targetServer so it can keep some info on close
|
||||
if (type == TransitionType.REOPEN) {
|
||||
this.assignCandidate = getRegionStateNode(env).getRegionLocation();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -872,6 +872,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE);
|
||||
this.miniBatchSize = conf.getInt(HBASE_REGIONSERVER_MINIBATCH_SIZE,
|
||||
DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE);
|
||||
|
||||
// recover the metrics of read and write requests count if they were retained
|
||||
if (rsServices != null && rsServices.getRegionServerAccounting() != null) {
|
||||
Pair<Long, Long> retainedRWRequestsCnt = rsServices.getRegionServerAccounting()
|
||||
.getRetainedRegionRWRequestsCnt().get(getRegionInfo().getEncodedName());
|
||||
if (retainedRWRequestsCnt != null) {
|
||||
this.setReadRequestsCount(retainedRWRequestsCnt.getFirst());
|
||||
this.setWriteRequestsCount(retainedRWRequestsCnt.getSecond());
|
||||
// remove them since won't use again
|
||||
rsServices.getRegionServerAccounting().getRetainedRegionRWRequestsCnt()
|
||||
.remove(getRegionInfo().getEncodedName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setHTableSpecificConf() {
|
||||
|
@ -8942,4 +8955,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setReadRequestsCount(long readRequestsCount) {
|
||||
this.readRequestsCount.add(readRequestsCount);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setWriteRequestsCount(long writeRequestsCount) {
|
||||
this.writeRequestsCount.add(writeRequestsCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3291,7 +3291,12 @@ public class HRegionServer extends HasThread implements
|
|||
closeSeqNum = r.getOpenSeqNum();
|
||||
if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
|
||||
}
|
||||
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
|
||||
boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
|
||||
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
|
||||
if (selfMove) {
|
||||
this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName()
|
||||
, new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
|
||||
}
|
||||
}
|
||||
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
|
||||
return toReturn != null;
|
||||
|
@ -3453,8 +3458,8 @@ public class HRegionServer extends HasThread implements
|
|||
*/
|
||||
private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
|
||||
|
||||
private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
|
||||
if (ServerName.isSameAddress(destination, this.getServerName())) {
|
||||
private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) {
|
||||
if (selfMove) {
|
||||
LOG.warn("Not adding moved region record: " + encodedName + " to self.");
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.lang.management.MemoryType;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -47,6 +49,11 @@ public class RegionServerAccounting {
|
|||
private long globalOnHeapMemstoreLimit;
|
||||
private long globalOnHeapMemstoreLimitLowMark;
|
||||
|
||||
// encoded region name -> Pair -> read count as first, write count as second.
|
||||
// when region close and target rs is the current server, we will put an entry,
|
||||
// and will remove it when reigon open after recover them.
|
||||
private ConcurrentMap<String, Pair<Long, Long>> retainedRegionRWRequestsCnt;
|
||||
|
||||
public RegionServerAccounting(Configuration conf) {
|
||||
Pair<Long, MemoryType> globalMemstoreSizePair = MemorySizeUtil.getGlobalMemStoreSize(conf);
|
||||
this.globalMemStoreLimit = globalMemstoreSizePair.getFirst();
|
||||
|
@ -67,6 +74,7 @@ public class RegionServerAccounting {
|
|||
this.globalOnHeapMemstoreLimit = MemorySizeUtil.getOnheapGlobalMemStoreSize(conf);
|
||||
this.globalOnHeapMemstoreLimitLowMark =
|
||||
(long) (this.globalOnHeapMemstoreLimit * this.globalMemStoreLimitLowMarkPercent);
|
||||
this.retainedRegionRWRequestsCnt = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
long getGlobalMemStoreLimit() {
|
||||
|
@ -123,6 +131,13 @@ public class RegionServerAccounting {
|
|||
return this.globalMemStoreOffHeapSize.sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the retained metrics of region's read and write requests count
|
||||
*/
|
||||
protected ConcurrentMap<String, Pair<Long, Long>> getRetainedRegionRWRequestsCnt() {
|
||||
return this.retainedRegionRWRequestsCnt;
|
||||
}
|
||||
|
||||
void incGlobalMemStoreSize(MemStoreSize mss) {
|
||||
incGlobalMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -130,6 +131,8 @@ public class TestTransitRegionStateProcedure {
|
|||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
|
||||
HRegionServer rs = UTIL.getRSForFirstRegionInTable(tableName);
|
||||
HRegion region = rs.getRegions(tableName).get(0);
|
||||
region.setReadRequestsCount(1);
|
||||
region.setWriteRequestsCount(2);
|
||||
long openSeqNum = region.getOpenSeqNum();
|
||||
TransitRegionStateProcedure proc =
|
||||
TransitRegionStateProcedure.reopen(env, region.getRegionInfo());
|
||||
|
@ -139,6 +142,8 @@ public class TestTransitRegionStateProcedure {
|
|||
long openSeqNum2 = region2.getOpenSeqNum();
|
||||
// confirm that the region is successfully opened
|
||||
assertTrue(openSeqNum2 > openSeqNum);
|
||||
assertEquals(1, region2.getReadRequestsCount());
|
||||
assertEquals(2, region2.getWriteRequestsCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue