HBASE-20196 Maintain all regions with same size in memstore flusher
This commit is contained in:
parent
67f013430c
commit
df5de33a02
|
@ -2759,14 +2759,20 @@ public class HRegionServer extends HasThread implements
|
|||
configurationManager.registerObserver(region);
|
||||
}
|
||||
|
||||
private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
|
||||
long size) {
|
||||
if (!sortedRegions.containsKey(size)) {
|
||||
sortedRegions.put(size, new ArrayList<>());
|
||||
}
|
||||
sortedRegions.get(size).add(region);
|
||||
}
|
||||
/**
|
||||
* @return A new Map of online regions sorted by region off-heap size with the first entry being
|
||||
* the biggest. If two regions are the same size, then the last one found wins; i.e. this
|
||||
* method may NOT return all regions.
|
||||
* the biggest.
|
||||
*/
|
||||
SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() {
|
||||
SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
|
||||
// we'll sort the regions in reverse
|
||||
SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
|
||||
SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(
|
||||
new Comparator<Long>() {
|
||||
@Override
|
||||
public int compare(Long a, Long b) {
|
||||
|
@ -2775,19 +2781,18 @@ public class HRegionServer extends HasThread implements
|
|||
});
|
||||
// Copy over all regions. Regions are sorted by size with biggest first.
|
||||
for (HRegion region : this.onlineRegions.values()) {
|
||||
sortedRegions.put(region.getMemStoreOffHeapSize(), region);
|
||||
addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
|
||||
}
|
||||
return sortedRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A new Map of online regions sorted by region heap size with the first entry being the
|
||||
* biggest. If two regions are the same size, then the last one found wins; i.e. this method
|
||||
* may NOT return all regions.
|
||||
* biggest.
|
||||
*/
|
||||
SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() {
|
||||
SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
|
||||
// we'll sort the regions in reverse
|
||||
SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
|
||||
SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(
|
||||
new Comparator<Long>() {
|
||||
@Override
|
||||
public int compare(Long a, Long b) {
|
||||
|
@ -2796,7 +2801,7 @@ public class HRegionServer extends HasThread implements
|
|||
});
|
||||
// Copy over all regions. Regions are sorted by size with biggest first.
|
||||
for (HRegion region : this.onlineRegions.values()) {
|
||||
sortedRegions.put(region.getMemStoreHeapSize(), region);
|
||||
addRegion(sortedRegions, region, region.getMemStoreHeapSize());
|
||||
}
|
||||
return sortedRegions;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -155,7 +156,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
* @return true if successful
|
||||
*/
|
||||
private boolean flushOneForGlobalPressure() {
|
||||
SortedMap<Long, HRegion> regionsBySize = null;
|
||||
SortedMap<Long, Collection<HRegion>> regionsBySize = null;
|
||||
switch(flushType) {
|
||||
case ABOVE_OFFHEAP_HIGHER_MARK:
|
||||
case ABOVE_OFFHEAP_LOWER_MARK:
|
||||
|
@ -387,41 +388,45 @@ class MemStoreFlusher implements FlushRequester {
|
|||
}
|
||||
|
||||
private HRegion getBiggestMemStoreRegion(
|
||||
SortedMap<Long, HRegion> regionsBySize,
|
||||
SortedMap<Long, Collection<HRegion>> regionsBySize,
|
||||
Set<HRegion> excludedRegions,
|
||||
boolean checkStoreFileCount) {
|
||||
synchronized (regionsInQueue) {
|
||||
for (HRegion region : regionsBySize.values()) {
|
||||
if (excludedRegions.contains(region)) {
|
||||
continue;
|
||||
}
|
||||
for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
|
||||
for (HRegion region : entry.getValue()) {
|
||||
if (excludedRegions.contains(region)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (region.writestate.flushing || !region.writestate.writesEnabled) {
|
||||
continue;
|
||||
}
|
||||
if (region.writestate.flushing || !region.writestate.writesEnabled) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (checkStoreFileCount && isTooManyStoreFiles(region)) {
|
||||
continue;
|
||||
if (checkStoreFileCount && isTooManyStoreFiles(region)) {
|
||||
continue;
|
||||
}
|
||||
return region;
|
||||
}
|
||||
return region;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
|
||||
private HRegion getBiggestMemStoreOfRegionReplica(
|
||||
SortedMap<Long, Collection<HRegion>> regionsBySize,
|
||||
Set<HRegion> excludedRegions) {
|
||||
synchronized (regionsInQueue) {
|
||||
for (HRegion region : regionsBySize.values()) {
|
||||
if (excludedRegions.contains(region)) {
|
||||
continue;
|
||||
}
|
||||
for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
|
||||
for (HRegion region : entry.getValue()) {
|
||||
if (excludedRegions.contains(region)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
|
||||
continue;
|
||||
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
|
||||
continue;
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
||||
return region;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue