HBASE-5930. Added unit tests and configuration value for disabling the automatic flush
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1482161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
429af3c930
commit
8e0202945b
|
@ -346,8 +346,8 @@
|
|||
<name>hbase.regionserver.optionalcacheflushinterval</name>
|
||||
<value>3600000</value>
|
||||
<description>
|
||||
Amount of time to wait since the last time a region was flushed before
|
||||
invoking an optional cache flush. Default 1 hour.
|
||||
Maximum amount of time an edit lives in memory before being automatically flushed.
|
||||
Default 1 hour. Set it to 0 to disable automatic flushing.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
|
|
|
@ -1336,6 +1336,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Should the memstore be flushed now
|
||||
*/
|
||||
boolean shouldFlush() {
|
||||
if (flushCheckInterval <= 0) { //disabled
|
||||
return false;
|
||||
}
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
//if we flushed in the recent past, we don't need to do again now
|
||||
if ((now - getLastFlushTime() < flushCheckInterval)) {
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.rmi.UnexpectedException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.*;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -877,6 +880,100 @@ public class TestMemStore extends TestCase {
|
|||
//this.memstore = null;
|
||||
}
|
||||
|
||||
////////////////////////////////////
|
||||
// Test for periodic memstore flushes
|
||||
// based on time of oldest edit
|
||||
////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Tests that the timeOfOldestEdit is updated correctly for the
|
||||
* various edit operations in memstore.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testUpdateToTimeOfOldestEdit() throws Exception {
|
||||
try {
|
||||
EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
|
||||
EnvironmentEdgeManager.injectEdge(edge);
|
||||
MemStore memstore = new MemStore();
|
||||
long t = memstore.timeOfOldestEdit();
|
||||
assertEquals(t, Long.MAX_VALUE);
|
||||
|
||||
// test the case that the timeOfOldestEdit is updated after a KV add
|
||||
memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
|
||||
t = memstore.timeOfOldestEdit();
|
||||
assertTrue(t == 1234);
|
||||
// snapshot() will reset timeOfOldestEdit. The method will also assert the
|
||||
// value is reset to Long.MAX_VALUE
|
||||
t = runSnapshot(memstore);
|
||||
|
||||
// test the case that the timeOfOldestEdit is updated after a KV delete
|
||||
memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
|
||||
t = memstore.timeOfOldestEdit();
|
||||
assertTrue(t == 1234);
|
||||
t = runSnapshot(memstore);
|
||||
|
||||
// test the case that the timeOfOldestEdit is updated after a KV upsert
|
||||
List<Cell> l = new ArrayList<Cell>();
|
||||
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
|
||||
kv1.setMvccVersion(100);
|
||||
l.add(kv1);
|
||||
memstore.upsert(l, 1000);
|
||||
t = memstore.timeOfOldestEdit();
|
||||
assertTrue(t == 1234);
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the HRegion.shouldFlush method - adds an edit in the memstore
|
||||
* and checks that shouldFlush returns true, and another where it disables
|
||||
* the periodic flush functionality and tests whether shouldFlush returns
|
||||
* false.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testShouldFlush() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
|
||||
checkShouldFlush(conf, true);
|
||||
// test disable flush
|
||||
conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0);
|
||||
checkShouldFlush(conf, false);
|
||||
}
|
||||
|
||||
private void checkShouldFlush(Configuration conf, boolean expected) throws Exception {
|
||||
try {
|
||||
EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
|
||||
EnvironmentEdgeManager.injectEdge(edge);
|
||||
HBaseTestingUtility hbaseUtility = new HBaseTestingUtility(conf);
|
||||
HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor("foo"));
|
||||
|
||||
Map<byte[], Store> stores = region.getStores();
|
||||
assertTrue(stores.size() == 1);
|
||||
|
||||
Store s = stores.entrySet().iterator().next().getValue();
|
||||
edge.setCurrentTimeMillis(1234);
|
||||
s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
|
||||
edge.setCurrentTimeMillis(1234 + 100);
|
||||
assertTrue(region.shouldFlush() == false);
|
||||
edge.setCurrentTimeMillis(1234 + 10000);
|
||||
assertTrue(region.shouldFlush() == expected);
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
|
||||
long t = 1234;
|
||||
@Override
|
||||
public long currentTimeMillis() {
|
||||
return t;
|
||||
}
|
||||
public void setCurrentTimeMillis(long t) {
|
||||
this.t = t;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
|
||||
* @param hmc Instance to add rows to.
|
||||
|
@ -906,14 +1003,17 @@ public class TestMemStore extends TestCase {
|
|||
return ROW_COUNT;
|
||||
}
|
||||
|
||||
private void runSnapshot(final MemStore hmc) throws UnexpectedException {
|
||||
private long runSnapshot(final MemStore hmc) throws UnexpectedException {
|
||||
// Save off old state.
|
||||
int oldHistorySize = hmc.getSnapshot().size();
|
||||
hmc.snapshot();
|
||||
KeyValueSkipListSet ss = hmc.getSnapshot();
|
||||
// Make some assertions about what just happened.
|
||||
assertTrue("History size has not increased", oldHistorySize < ss.size());
|
||||
long t = memstore.timeOfOldestEdit();
|
||||
assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
|
||||
hmc.clearSnapshot(ss);
|
||||
return t;
|
||||
}
|
||||
|
||||
private void isExpectedRowWithoutTimestamps(final int rowIndex,
|
||||
|
|
Loading…
Reference in New Issue