HBASE-5930. Limits the amount of time an edit can live in the memstore.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1475872 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2013-04-25 17:50:17 +00:00
parent 0c4416a570
commit b65d39bc9d
10 changed files with 148 additions and 23 deletions

View File

@ -342,6 +342,14 @@
hbase.server.thread.wakefrequency milliseconds. hbase.server.thread.wakefrequency milliseconds.
</description> </description>
</property> </property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>
<value>3600000</value>
<description>
Amount of time to wait since the last time a region was flushed before
invoking an optional cache flush. Default 1 hour.
</description>
</property>
<property> <property>
<name>hbase.hregion.memstore.flush.size</name> <name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value> <value>134217728</value>

View File

@ -32,4 +32,11 @@ public interface FlushRequester {
* @param region the HRegion requesting the cache flush * @param region the HRegion requesting the cache flush
*/ */
void requestFlush(HRegion region); void requestFlush(HRegion region);
/**
* Tell the listener the cache needs to be flushed after a delay
*
* @param region the HRegion requesting the cache flush
* @param delay after how much time should the flush happen
*/
void requestDelayedFlush(HRegion region, long delay);
} }

View File

@ -347,6 +347,7 @@ public class HRegion implements HeapSize { // , Writable{
final RegionServerServices rsServices; final RegionServerServices rsServices;
private RegionServerAccounting rsAccounting; private RegionServerAccounting rsAccounting;
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>(); private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
private long flushCheckInterval;
private long blockingMemStoreSize; private long blockingMemStoreSize;
final long threadWakeFrequency; final long threadWakeFrequency;
// Used to guard closes // Used to guard closes
@ -438,6 +439,8 @@ public class HRegion implements HeapSize { // , Writable{
.add(confParam) .add(confParam)
.addStringMap(htd.getConfiguration()) .addStringMap(htd.getConfiguration())
.addWritableMap(htd.getValues()); .addWritableMap(htd.getValues());
this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
DEFAULT_CACHE_FLUSH_INTERVAL);
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION); DEFAULT_ROWLOCK_WAIT_DURATION);
@ -835,6 +838,12 @@ public class HRegion implements HeapSize { // , Writable{
private final Object closeLock = new Object(); private final Object closeLock = new Object();
/** Conf key for the periodic flush interval */
public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
"hbase.regionserver.optionalcacheflushinterval";
/** Default interval for the memstore flush */
public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
/** /**
* Close down this HRegion. Flush the cache unless abort parameter is true, * Close down this HRegion. Flush the cache unless abort parameter is true,
* Shut down each HStore, don't service any more calls. * Shut down each HStore, don't service any more calls.
@ -1322,6 +1331,26 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
/**
* Should the memstore be flushed now
*/
boolean shouldFlush() {
long now = EnvironmentEdgeManager.currentTimeMillis();
//if we flushed in the recent past, we don't need to do again now
if ((now - getLastFlushTime() < flushCheckInterval)) {
return false;
}
//since we didn't flush in the recent past, flush now if certain conditions
//are met. Return true on first such memstore hit.
for (Store s : this.getStores().values()) {
if (s.timeOfOldestEdit() < now - flushCheckInterval) {
// we have an old enough edit in the memstore, flush
return true;
}
}
return false;
}
/** /**
* Flush the memstore. * Flush the memstore.
* *
@ -4898,7 +4927,7 @@ public class HRegion implements HeapSize { // , Writable{
ClassSize.OBJECT + ClassSize.OBJECT +
ClassSize.ARRAY + ClassSize.ARRAY +
38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(10 * Bytes.SIZEOF_LONG) + (11 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN); Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +

View File

@ -235,7 +235,7 @@ public class HRegionServer implements ClientProtocol,
public static final Log LOG = LogFactory.getLog(HRegionServer.class); public static final Log LOG = LogFactory.getLog(HRegionServer.class);
private final Random rand = new Random(); private final Random rand;
/* /*
* Strings to be used in forming the exception message for * Strings to be used in forming the exception message for
@ -357,6 +357,11 @@ public class HRegionServer implements ClientProtocol,
*/ */
Chore compactionChecker; Chore compactionChecker;
/*
* Check for flushes
*/
Chore periodicFlusher;
// HLog and HLog roller. log is protected rather than private to avoid // HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes // eclipse warning when accessed by inner classes
protected volatile HLog hlog; protected volatile HLog hlog;
@ -502,6 +507,7 @@ public class HRegionServer implements ClientProtocol,
throw new IllegalArgumentException("Failed resolve of " + initialIsa); throw new IllegalArgumentException("Failed resolve of " + initialIsa);
} }
this.rand = new Random(initialIsa.hashCode());
this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this, this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this,
new Class<?>[]{ClientProtocol.class, new Class<?>[]{ClientProtocol.class,
AdminProtocol.class, HBaseRPCErrorHandler.class, AdminProtocol.class, HBaseRPCErrorHandler.class,
@ -682,6 +688,7 @@ public class HRegionServer implements ClientProtocol,
".multiplier", 1000); ".multiplier", 1000);
this.compactionChecker = new CompactionChecker(this, this.compactionChecker = new CompactionChecker(this,
this.threadWakeFrequency * multiplier, this); this.threadWakeFrequency * multiplier, this);
this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
// Health checker thread. // Health checker thread.
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
@ -1274,6 +1281,36 @@ public class HRegionServer implements ClientProtocol,
} }
} }
class PeriodicMemstoreFlusher extends Chore {
final HRegionServer server;
final static int RANGE_OF_DELAY = 20000; //millisec
final static int MIN_DELAY_TIME = 3000; //millisec
public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
this.server = server;
}
@Override
protected void chore() {
for (HRegion r : this.server.onlineRegions.values()) {
if (r == null)
continue;
if (r.shouldFlush()) {
FlushRequester requester = server.getFlushRequester();
if (requester != null) {
long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
" after a delay of " + randomDelay);
//Throttle the flushes by putting a delay. If we don't throttle, and there
//is a balanced write-load on the regions in a table, we might end up
//overwhelming the filesystem with too many flushes at once.
requester.requestDelayedFlush(r, randomDelay);
}
}
}
}
}
/** /**
* Report the status of the server. A server is online once all the startup is * Report the status of the server. A server is online once all the startup is
* completed (setting up filesystem, starting service threads, etc.). This * completed (setting up filesystem, starting service threads, etc.). This
@ -1419,6 +1456,8 @@ public class HRegionServer implements ClientProtocol,
this.cacheFlusher.start(uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
".compactionChecker", uncaughtExceptionHandler); ".compactionChecker", uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
".periodicFlusher", uncaughtExceptionHandler);
if (this.healthCheckChore != null) { if (this.healthCheckChore != null) {
Threads Threads
.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
@ -1499,7 +1538,8 @@ public class HRegionServer implements ClientProtocol,
// Verify that all threads are alive // Verify that all threads are alive
if (!(leases.isAlive() if (!(leases.isAlive()
&& cacheFlusher.isAlive() && hlogRoller.isAlive() && cacheFlusher.isAlive() && hlogRoller.isAlive()
&& this.compactionChecker.isAlive())) { && this.compactionChecker.isAlive())
&& this.periodicFlusher.isAlive()) {
stop("One or more threads are no longer alive -- stop"); stop("One or more threads are no longer alive -- stop");
return false; return false;
} }
@ -1662,6 +1702,7 @@ public class HRegionServer implements ClientProtocol,
*/ */
protected void join() { protected void join() {
Threads.shutdown(this.compactionChecker.getThread()); Threads.shutdown(this.compactionChecker.getThread());
Threads.shutdown(this.periodicFlusher.getThread());
this.cacheFlusher.join(); this.cacheFlusher.join();
if (this.healthCheckChore != null) { if (this.healthCheckChore != null) {
Threads.shutdown(this.healthCheckChore.getThread()); Threads.shutdown(this.healthCheckChore.getThread());

View File

@ -446,6 +446,11 @@ public class HStore implements Store {
} }
} }
@Override
public long timeOfOldestEdit() {
return memstore.timeOfOldestEdit();
}
/** /**
* Adds a value to the memstore * Adds a value to the memstore
* *

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/** /**
* The MemStore holds in-memory modifications to the Store. Modifications * The MemStore holds in-memory modifications to the Store. Modifications
@ -90,6 +91,9 @@ public class MemStore implements HeapSize {
// Used to track own heapSize // Used to track own heapSize
final AtomicLong size; final AtomicLong size;
// Used to track when to flush
volatile long timeOfOldestEdit = Long.MAX_VALUE;
TimeRangeTracker timeRangeTracker; TimeRangeTracker timeRangeTracker;
TimeRangeTracker snapshotTimeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker;
@ -166,6 +170,7 @@ public class MemStore implements HeapSize {
if (allocator != null) { if (allocator != null) {
this.allocator = new MemStoreLAB(conf, chunkPool); this.allocator = new MemStoreLAB(conf, chunkPool);
} }
timeOfOldestEdit = Long.MAX_VALUE;
} }
} }
} finally { } finally {
@ -233,6 +238,28 @@ public class MemStore implements HeapSize {
} }
} }
long timeOfOldestEdit() {
return timeOfOldestEdit;
}
private boolean addToKVSet(KeyValue e) {
boolean b = this.kvset.add(e);
setOldestEditTimeToNow();
return b;
}
private boolean removeFromKVSet(KeyValue e) {
boolean b = this.kvset.remove(e);
setOldestEditTimeToNow();
return b;
}
void setOldestEditTimeToNow() {
if (timeOfOldestEdit == Long.MAX_VALUE) {
timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
}
}
/** /**
* Internal version of add() that doesn't clone KVs with the * Internal version of add() that doesn't clone KVs with the
* allocator, and doesn't take the lock. * allocator, and doesn't take the lock.
@ -240,7 +267,7 @@ public class MemStore implements HeapSize {
* Callers should ensure they already have the read lock taken * Callers should ensure they already have the read lock taken
*/ */
private long internalAdd(final KeyValue toAdd) { private long internalAdd(final KeyValue toAdd) {
long s = heapSizeChange(toAdd, this.kvset.add(toAdd)); long s = heapSizeChange(toAdd, addToKVSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd); timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s); this.size.addAndGet(s);
return s; return s;
@ -288,7 +315,7 @@ public class MemStore implements HeapSize {
// If the key is in the memstore, delete it. Update this.size. // If the key is in the memstore, delete it. Update this.size.
found = this.kvset.get(kv); found = this.kvset.get(kv);
if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) { if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
this.kvset.remove(kv); removeFromKVSet(kv);
long s = heapSizeChange(kv, true); long s = heapSizeChange(kv, true);
this.size.addAndGet(-s); this.size.addAndGet(-s);
} }
@ -307,7 +334,7 @@ public class MemStore implements HeapSize {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
KeyValue toAdd = maybeCloneWithAllocator(delete); KeyValue toAdd = maybeCloneWithAllocator(delete);
s += heapSizeChange(toAdd, this.kvset.add(toAdd)); s += heapSizeChange(toAdd, addToKVSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd); timeRangeTracker.includeTimestamp(toAdd);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
@ -606,6 +633,7 @@ public class MemStore implements HeapSize {
addedSize -= delta; addedSize -= delta;
this.size.addAndGet(-delta); this.size.addAndGet(-delta);
it.remove(); it.remove();
setOldestEditTimeToNow();
} else { } else {
versionsVisible++; versionsVisible++;
} }
@ -941,7 +969,7 @@ public class MemStore implements HeapSize {
} }
public final static long FIXED_OVERHEAD = ClassSize.align( public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (13 * ClassSize.REFERENCE)); ClassSize.OBJECT + (13 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +

View File

@ -326,6 +326,18 @@ class MemStoreFlusher implements FlushRequester {
} }
} }
public void requestDelayedFlush(HRegion r, long delay) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has some delay
FlushRegionEntry fqe = new FlushRegionEntry(r);
fqe.requeue(delay);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
}
}
public int getFlushQueueSize() { public int getFlushQueueSize() {
return flushQueue.size(); return flushQueue.size();
} }

View File

@ -116,6 +116,11 @@ public interface Store extends HeapSize, StoreConfigInformation {
*/ */
public long add(KeyValue kv); public long add(KeyValue kv);
/**
* When was the last edit done in the memstore
*/
long timeOfOldestEdit();
/** /**
* Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the * Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the
* key & memstoreTS value of the kv parameter. * key & memstoreTS value of the kv parameter.

View File

@ -872,6 +872,12 @@ public class TestWALReplay {
throw new RuntimeException("Exception flushing", e); throw new RuntimeException("Exception flushing", e);
} }
} }
@Override
public void requestDelayedFlush(HRegion region, long when) {
// TODO Auto-generated method stub
}
} }
private void addWALEdits (final byte [] tableName, final HRegionInfo hri, private void addWALEdits (final byte [] tableName, final HRegionInfo hri,

View File

@ -1,19 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.io.IOException; import java.io.IOException;