HBASE-5611 Replayed edits from regions that failed to open during recovery aren't removed from the global MemStore size (Jieshan)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1331681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6f6b264efc
commit
b695b64dfa
|
@ -312,6 +312,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
final long rowProcessorTimeout;
|
||||
private volatile long lastFlushTime;
|
||||
final RegionServerServices rsServices;
|
||||
private RegionServerAccounting rsAccounting;
|
||||
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
|
||||
private long blockingMemStoreSize;
|
||||
final long threadWakeFrequency;
|
||||
|
@ -443,9 +444,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.rowProcessorTimeout = conf.getLong(
|
||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||
|
||||
// don't initialize coprocessors if not running within a regionserver
|
||||
// TODO: revisit if coprocessors should load in other cases
|
||||
if (rsServices != null) {
|
||||
this.rsAccounting = this.rsServices.getRegionServerAccounting();
|
||||
// don't initialize coprocessors if not running within a regionserver
|
||||
// TODO: revisit if coprocessors should load in other cases
|
||||
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -702,13 +704,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @return the size of memstore in this region
|
||||
*/
|
||||
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
|
||||
if (this.rsServices != null) {
|
||||
RegionServerAccounting rsAccounting =
|
||||
this.rsServices.getRegionServerAccounting();
|
||||
|
||||
if (rsAccounting != null) {
|
||||
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
|
||||
}
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
|
||||
}
|
||||
return this.memstoreSize.getAndAdd(memStoreSize);
|
||||
}
|
||||
|
@ -2708,6 +2705,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
// The edits size added into rsAccounting during this replaying will not
|
||||
// be required any more. So just clear it.
|
||||
if (this.rsAccounting != null) {
|
||||
this.rsAccounting.clearRegionReplayEditsSize(this.regionInfo.getRegionName());
|
||||
}
|
||||
}
|
||||
if (seqid > minSeqId) {
|
||||
// Then we added some edits to memory. Flush and cleanup split edit files.
|
||||
|
@ -2889,7 +2891,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @return True if we should flush.
|
||||
*/
|
||||
protected boolean restoreEdit(final Store s, final KeyValue kv) {
|
||||
return isFlushSize(this.addAndGetGlobalMemstoreSize(s.add(kv)));
|
||||
long kvSize = s.add(kv);
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.addAndGetRegionReplayEditsSize(this.regionInfo.getRegionName(), kvSize);
|
||||
}
|
||||
return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -4775,7 +4781,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
33 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
|
||||
34 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
|
||||
(6 * Bytes.SIZEOF_LONG) +
|
||||
Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
|
|
|
@ -19,9 +19,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* RegionServerAccounting keeps record of some basic real time information about
|
||||
|
@ -32,6 +35,11 @@ public class RegionServerAccounting {
|
|||
|
||||
private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0);
|
||||
|
||||
// Store the edits size during replaying HLog. Use this to roll back the
|
||||
// global memstore size once a region opening failed.
|
||||
private final ConcurrentMap<byte[], AtomicLong> replayEditsPerRegion =
|
||||
new ConcurrentSkipListMap<byte[], AtomicLong>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/**
|
||||
* @return the global Memstore size in the RegionServer
|
||||
*/
|
||||
|
@ -48,4 +56,46 @@ public class RegionServerAccounting {
|
|||
return atomicGlobalMemstoreSize.addAndGet(memStoreSize);
|
||||
}
|
||||
|
||||
/***
|
||||
* Add memStoreSize to replayEditsPerRegion.
|
||||
*
|
||||
* @param regionName region name.
|
||||
* @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
|
||||
* @return the replay edits size for the region.
|
||||
*/
|
||||
public long addAndGetRegionReplayEditsSize(byte[] regionName, long memStoreSize) {
|
||||
AtomicLong replayEdistsSize = replayEditsPerRegion.get(regionName);
|
||||
if (replayEdistsSize == null) {
|
||||
replayEdistsSize = new AtomicLong(0);
|
||||
replayEditsPerRegion.put(regionName, replayEdistsSize);
|
||||
}
|
||||
return replayEdistsSize.addAndGet(memStoreSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Roll back the global MemStore size for a specified region when this region
|
||||
* can't be opened.
|
||||
*
|
||||
* @param regionName the region which could not open.
|
||||
* @return the global Memstore size in the RegionServer
|
||||
*/
|
||||
public long rollbackRegionReplayEditsSize(byte[] regionName) {
|
||||
AtomicLong replayEditsSize = replayEditsPerRegion.get(regionName);
|
||||
long editsSizeLong = 0L;
|
||||
if (replayEditsSize != null) {
|
||||
editsSizeLong = -replayEditsSize.get();
|
||||
clearRegionReplayEditsSize(regionName);
|
||||
}
|
||||
return addAndGetGlobalMemstoreSize(editsSizeLong);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear a region from replayEditsPerRegion.
|
||||
*
|
||||
* @param regionName region name.
|
||||
*/
|
||||
public void clearRegionReplayEditsSize(byte[] regionName) {
|
||||
replayEditsPerRegion.remove(regionName);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
|
@ -345,8 +346,17 @@ public class OpenRegionHandler extends EventHandler {
|
|||
// We failed open. Our caller will see the 'null' return value
|
||||
// and transition the node back to FAILED_OPEN. If that fails,
|
||||
// we rely on the Timeout Monitor in the master to reassign.
|
||||
LOG.error("Failed open of region=" +
|
||||
this.regionInfo.getRegionNameAsString(), t);
|
||||
LOG.error(
|
||||
"Failed open of region=" + this.regionInfo.getRegionNameAsString()
|
||||
+ ", starting to roll back the global memstore size.", t);
|
||||
// Decrease the global memstore size.
|
||||
if (this.rsServices != null) {
|
||||
RegionServerAccounting rsAccounting =
|
||||
this.rsServices.getRegionServerAccounting();
|
||||
if (rsAccounting != null) {
|
||||
rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName());
|
||||
}
|
||||
}
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue