HBASE-12562 Handling memory pressure for secondary region replicas

This commit is contained in:
Enis Soztutar 2015-03-06 14:32:05 -08:00
parent ec1eff9b69
commit 4ac42a2f56
5 changed files with 331 additions and 38 deletions

View File

@ -476,6 +476,9 @@ public class FileLink {
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
// Assumes that the ordering of locations between objects are the same. This is true for the
// current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations
// or keep them presorted

View File

@ -4469,7 +4469,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param flush the flush descriptor
* @throws IOException
*/
private void dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
long totalFreedSize = 0;
this.updatesLock.writeLock().lock();
try {
mvcc.waitForPreviousTransactionsComplete();
@ -4483,10 +4484,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Prepare flush (take a snapshot) and then abort (drop the snapshot)
if (store == null ) {
for (Store s : stores.values()) {
dropStoreMemstoreContentsForSeqId(s, currentSeqId);
totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
}
} else {
dropStoreMemstoreContentsForSeqId(store, currentSeqId);
totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
}
} else {
LOG.info(getRegionInfo().getEncodedName() + " : "
@ -4496,13 +4497,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} finally {
this.updatesLock.writeLock().unlock();
}
return totalFreedSize;
}
private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
this.addAndGetGlobalMemstoreSize(-s.getFlushableSize());
private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
long snapshotSize = s.getFlushableSize();
this.addAndGetGlobalMemstoreSize(-snapshotSize);
StoreFlushContext ctx = s.createFlushContext(currentSeqId);
ctx.prepare();
ctx.abort();
return snapshotSize;
}
private void replayWALFlushAbortMarker(FlushDescriptor flush) {
@ -4623,27 +4627,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// if all stores ended up dropping their snapshots, we can safely drop the
// prepareFlushResult
if (writestate.flushing) {
boolean canDrop = true;
for (Entry<byte[], StoreFlushContext> entry
: prepareFlushResult.storeFlushCtxs.entrySet()) {
Store store = getStore(entry.getKey());
if (store == null) {
continue;
}
if (store.getSnapshotSize() > 0) {
canDrop = false;
}
}
// this means that all the stores in the region has finished flushing, but the WAL marker
// may not have been written or we did not receive it yet.
if (canDrop) {
writestate.flushing = false;
this.prepareFlushResult = null;
}
}
dropPrepareFlushIfPossible();
// advance the mvcc read point so that the new flushed file is visible.
// there may be some in-flight transactions, but they won't be made visible since they are
@ -4745,6 +4729,126 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
/**
* If all stores ended up dropping their snapshots, we can safely drop the prepareFlushResult
*/
private void dropPrepareFlushIfPossible() {
if (writestate.flushing) {
boolean canDrop = true;
if (prepareFlushResult.storeFlushCtxs != null) {
for (Entry<byte[], StoreFlushContext> entry
: prepareFlushResult.storeFlushCtxs.entrySet()) {
Store store = getStore(entry.getKey());
if (store == null) {
continue;
}
if (store.getSnapshotSize() > 0) {
canDrop = false;
break;
}
}
}
// this means that all the stores in the region has finished flushing, but the WAL marker
// may not have been written or we did not receive it yet.
if (canDrop) {
writestate.flushing = false;
this.prepareFlushResult = null;
}
}
}
/**
* Checks the underlying store files, and opens the files that have not
* been opened, and removes the store file readers for store files no longer
* available. Mainly used by secondary region replicas to keep up to date with
* the primary region files or open new flushed files and drop their memstore snapshots in case
* of memory pressure.
* @throws IOException
*/
boolean refreshStoreFiles() throws IOException {
if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return false; // if primary nothing to do
}
if (LOG.isDebugEnabled()) {
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ "Refreshing store files to see whether we can free up memstore");
}
long totalFreedSize = 0;
long smallestSeqIdInStores = Long.MAX_VALUE;
startRegionOperation(); // obtain region close lock
try {
synchronized (writestate) {
for (Store store : getStores().values()) {
// TODO: some stores might see new data from flush, while others do not which
// MIGHT break atomic edits across column families.
long maxSeqIdBefore = store.getMaxSequenceId();
// refresh the store files. This is similar to observing a region open wal marker.
store.refreshStoreFiles();
long storeSeqId = store.getMaxSequenceId();
if (storeSeqId < smallestSeqIdInStores) {
smallestSeqIdInStores = storeSeqId;
}
// see whether we can drop the memstore or the snapshot
if (storeSeqId > maxSeqIdBefore) {
if (writestate.flushing) {
// only drop memstore snapshots if they are smaller than last flush for the store
if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
if (ctx != null) {
long snapshotSize = store.getFlushableSize();
ctx.abort();
this.addAndGetGlobalMemstoreSize(-snapshotSize);
this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
totalFreedSize += snapshotSize;
}
}
}
// Drop the memstore contents if they are now smaller than the latest seen flushed file
totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store);
}
}
// if all stores ended up dropping their snapshots, we can safely drop the
// prepareFlushResult
dropPrepareFlushIfPossible();
// advance the mvcc read point so that the new flushed files are visible.
// there may be some in-flight transactions, but they won't be made visible since they are
// either greater than flush seq number or they were already picked up via flush.
for (Store s : getStores().values()) {
getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
}
// smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely
// skip all edits that are to be replayed in the future with that has a smaller seqId
// than this. We are updating lastReplayedOpenRegionSeqId so that we can skip all edits
// that we have picked the flush files for
if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
}
}
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
synchronized (this) {
notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
return totalFreedSize > 0;
} finally {
closeRegionOperation();
}
}
/** Checks whether the given regionName is either equal to our region, or that
* the regionName is the primary region to our corresponding range for the secondary replica.
*/

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.util.StringUtils.humanReadableInt;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
@ -44,13 +45,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@ -70,6 +74,7 @@ import com.google.common.base.Preconditions;
class MemStoreFlusher implements FlushRequester {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
private Configuration conf;
// These two data members go together. Any entry in the one must have
// a corresponding entry in the other.
private final BlockingQueue<FlushQueueEntry> flushQueue =
@ -100,6 +105,7 @@ class MemStoreFlusher implements FlushRequester {
public MemStoreFlusher(final Configuration conf,
final HRegionServer server) {
super();
this.conf = conf;
this.server = server;
this.threadWakeFrequency =
conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@ -138,6 +144,9 @@ class MemStoreFlusher implements FlushRequester {
Set<HRegion> excludedRegions = new HashSet<HRegion>();
double secondaryMultiplier
= ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
boolean flushedOne = false;
while (!flushedOne) {
// Find the biggest region that doesn't have too many storefiles
@ -147,8 +156,11 @@ class MemStoreFlusher implements FlushRequester {
// Find the biggest region, total, even if it might have too many flushes.
HRegion bestAnyRegion = getBiggestMemstoreRegion(
regionsBySize, excludedRegions, false);
// Find the biggest region that is a secondary region
HRegion bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
excludedRegions);
if (bestAnyRegion == null) {
if (bestAnyRegion == null && bestRegionReplica == null) {
LOG.error("Above memory mark but there are no flushable regions!");
return false;
}
@ -177,20 +189,39 @@ class MemStoreFlusher implements FlushRequester {
}
}
Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
Preconditions.checkState(
(regionToFlush != null && regionToFlush.memstoreSize.get() > 0) ||
(bestRegionReplica != null && bestRegionReplica.memstoreSize.get() > 0));
if (regionToFlush == null ||
(bestRegionReplica != null &&
ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
(bestRegionReplica.memstoreSize.get()
> secondaryMultiplier * regionToFlush.memstoreSize.get()))) {
LOG.info("Refreshing storefiles of region " + regionToFlush +
" due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
server.getRegionServerAccounting().getGlobalMemstoreSize()));
flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
if (!flushedOne) {
LOG.info("Excluding secondary region " + regionToFlush +
" - trying to find a different region to refresh files.");
excludedRegions.add(bestRegionReplica);
}
} else {
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
+ "Total Memstore size="
+ humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+ ", Region memstore size="
+ humanReadableInt(regionToFlush.memstoreSize.get()));
flushedOne = flushRegion(regionToFlush, true, true);
if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
" - trying to find a different region to flush.");
excludedRegions.add(regionToFlush);
}
}
}
return true;
}
@ -281,6 +312,33 @@ class MemStoreFlusher implements FlushRequester {
return null;
}
private HRegion getBiggestMemstoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
Set<HRegion> excludedRegions) {
synchronized (regionsInQueue) {
for (HRegion region : regionsBySize.values()) {
if (excludedRegions.contains(region)) {
continue;
}
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
continue;
}
return region;
}
}
return null;
}
private boolean refreshStoreFilesAndReclaimMemory(HRegion region) {
try {
return region.refreshStoreFiles();
} catch (IOException e) {
LOG.warn("Refreshing store files failed with exception", e);
}
return false;
}
/**
* Return true if global memory usage is above the high watermark
*/

View File

@ -68,6 +68,25 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
= "hbase.region.replica.wait.for.primary.flush";
private static final boolean DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH = true;
/**
* Enables or disables refreshing store files of secondary region replicas when the memory is
* above the global memstore lower limit. Refreshing the store files means that we will do a file
* list of the primary regions store files, and pick up new files. Also depending on the store
* files, we can drop some memstore contents which will free up memory.
*/
public static final String REGION_REPLICA_STORE_FILE_REFRESH
= "hbase.region.replica.storefile.refresh";
private static final boolean DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH = true;
/**
* The multiplier to use when we want to refresh a secondary region instead of flushing a primary
* region. Default value assumes that for doing the file refresh, the biggest secondary should be
* 4 times bigger than the biggest primary.
*/
public static final String REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER
= "hbase.region.replica.storefile.refresh.memstore.multiplier";
private static final double DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER = 4;
/**
* Returns the regionInfo object to use for interacting with the file system.
* @return An HRegionInfo object to interact with the filesystem
@ -163,6 +182,16 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH);
}
public static boolean isRegionReplicaStoreFileRefreshEnabled(Configuration conf) {
return conf.getBoolean(REGION_REPLICA_STORE_FILE_REFRESH,
DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH);
}
public static double getRegionReplicaStoreFileRefreshMultiplier(Configuration conf) {
return conf.getDouble(REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER,
DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER);
}
/**
* Return the peer id used for replicating to secondary region replicas
*/

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
@ -1326,6 +1327,104 @@ public class TestHRegionReplayEvents {
secondaryRegion.get(new Get(Bytes.toBytes(0)));
}
@Test
public void testRefreshStoreFiles() throws IOException {
assertEquals(0, primaryRegion.getStoreFileList(families).size());
assertEquals(0, secondaryRegion.getStoreFileList(families).size());
// Test case 1: refresh with an empty region
secondaryRegion.refreshStoreFiles();
assertEquals(0, secondaryRegion.getStoreFileList(families).size());
// do one flush
putDataWithFlushes(primaryRegion, 100, 100, 0);
int numRows = 100;
// refresh the store file list, and ensure that the files are picked up.
secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families),
secondaryRegion.getStoreFileList(families));
assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
LOG.info("-- Verifying edits from secondary");
verifyData(secondaryRegion, 0, numRows, cq, families);
// Test case 2: 3 some more flushes
putDataWithFlushes(primaryRegion, 100, 300, 0);
numRows = 300;
// refresh the store file list, and ensure that the files are picked up.
secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families),
secondaryRegion.getStoreFileList(families));
assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
LOG.info("-- Verifying edits from secondary");
verifyData(secondaryRegion, 0, numRows, cq, families);
if (FSUtils.WINDOWS) {
// compaction cannot move files while they are open in secondary on windows. Skip remaining.
return;
}
// Test case 3: compact primary files
primaryRegion.compactStores();
secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families),
secondaryRegion.getStoreFileList(families));
assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
LOG.info("-- Verifying edits from secondary");
verifyData(secondaryRegion, 0, numRows, cq, families);
LOG.info("-- Replaying edits in secondary");
// Test case 4: replay some edits, ensure that memstore is dropped.
assertTrue(secondaryRegion.getMemstoreSize().get() == 0);
putDataWithFlushes(primaryRegion, 400, 400, 0);
numRows = 400;
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
// do not replay flush
} else {
replayEdit(secondaryRegion, entry);
}
}
assertTrue(secondaryRegion.getMemstoreSize().get() > 0);
secondaryRegion.refreshStoreFiles();
assertTrue(secondaryRegion.getMemstoreSize().get() == 0);
LOG.info("-- Verifying edits from primary");
verifyData(primaryRegion, 0, numRows, cq, families);
LOG.info("-- Verifying edits from secondary");
verifyData(secondaryRegion, 0, numRows, cq, families);
}
/**
* Paths can be qualified or not. This does the assertion using String->Path conversion.
*/
private void assertPathListsEqual(List<String> list1, List<String> list2) {
List<Path> l1 = new ArrayList<>(list1.size());
for (String path : list1) {
l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
}
List<Path> l2 = new ArrayList<>(list2.size());
for (String path : list2) {
l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
}
assertEquals(l1, l2);
}
private void disableReads(HRegion region) {
region.setReadsEnabled(false);
try {