[jira] [HBASE-5332] Deterministic Compaction Jitter
Summary: Changing the jitter in major compactions to be deterministic, so reboots don't cause a time-based compaction storm. This implementation seeds a random number generator with HDFS data for persistence. Test Plan: - mvn test -Dtest=TestCompaction,TestCompactSelection,TestHeapSize Reviewers: JIRA, Kannan, aaiyer, stack Reviewed By: Kannan Differential Revision: https://reviews.facebook.net/D1785 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1292495 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7b5ec59eda
commit
c3575d2eb5
|
@ -26,6 +26,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
|
@ -115,7 +116,6 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
final CacheConfig cacheConf;
|
||||
// ttl in milliseconds.
|
||||
private long ttl;
|
||||
long majorCompactionTime;
|
||||
private final int minFilesToCompact;
|
||||
private final int maxFilesToCompact;
|
||||
private final long minCompactSize;
|
||||
|
@ -229,8 +229,6 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
this.blockingStoreFileCount =
|
||||
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
|
||||
|
||||
this.majorCompactionTime = getNextMajorCompactTime();
|
||||
|
||||
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
|
||||
this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
|
||||
this.region.memstoreFlushSize);
|
||||
|
@ -1091,14 +1089,14 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
*/
|
||||
private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
|
||||
boolean result = false;
|
||||
if (filesToCompact == null || filesToCompact.isEmpty() ||
|
||||
majorCompactionTime == 0) {
|
||||
long mcTime = getNextMajorCompactTime();
|
||||
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||
long lowTimestamp = getLowestTimestamp(filesToCompact);
|
||||
long now = System.currentTimeMillis();
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
|
||||
// Major compaction time has elapsed.
|
||||
if (filesToCompact.size() == 1) {
|
||||
// Single file
|
||||
|
@ -1146,7 +1144,15 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
0.20F);
|
||||
if (jitterPct > 0) {
|
||||
long jitter = Math.round(ret * jitterPct);
|
||||
ret += jitter - Math.round(2L * jitter * Math.random());
|
||||
// deterministic jitter avoids a major compaction storm on restart
|
||||
ImmutableList<StoreFile> snapshot = storefiles;
|
||||
if (snapshot != null && !snapshot.isEmpty()) {
|
||||
String seed = snapshot.get(0).getPath().getName();
|
||||
double curRand = new Random(seed.hashCode()).nextDouble();
|
||||
ret += jitter - Math.round(2L * jitter * curRand);
|
||||
} else {
|
||||
ret = 0; // no storefiles == no major compaction
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -1210,7 +1216,6 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
if (isMajor) {
|
||||
// since we're enqueuing a major, update the compaction wait interval
|
||||
this.forceMajor = false;
|
||||
this.majorCompactionTime = getNextMajorCompactTime();
|
||||
}
|
||||
|
||||
// everything went better than expected. create a compaction request
|
||||
|
@ -2180,7 +2185,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
|
||||
public static final long FIXED_OVERHEAD =
|
||||
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
|
||||
+ (19 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
|
||||
+ (19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
|
||||
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||
|
|
|
@ -215,12 +215,17 @@ public class TestCompactSelection extends TestCase {
|
|||
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
|
||||
// the last file in compaction is the aggregate of all previous compactions
|
||||
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
|
||||
// trigger an aged major compaction
|
||||
store.majorCompactionTime = 1;
|
||||
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
|
||||
// major sure exceeding maxCompactSize also downgrades aged minors
|
||||
store.majorCompactionTime = 1;
|
||||
compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
|
||||
try {
|
||||
// trigger an aged major compaction
|
||||
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
|
||||
// major sure exceeding maxCompactSize also downgrades aged minors
|
||||
compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
|
||||
} finally {
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
||||
}
|
||||
|
||||
/* REFERENCES == file is from a region that was split */
|
||||
// treat storefiles that have references like a major compaction
|
||||
|
|
|
@ -292,6 +292,51 @@ public class TestCompaction extends HBaseTestCase {
|
|||
assertEquals("Should not see anything after TTL has expired", 0, count);
|
||||
}
|
||||
|
||||
public void testTimeBasedMajorCompaction() throws Exception {
|
||||
// create 2 storefiles and force a major compaction to reset the time
|
||||
int delay = 10 * 1000; // 10 sec
|
||||
float jitterPct = 0.20f; // 20%
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
|
||||
|
||||
Store s = r.getStore(COLUMN_FAMILY);
|
||||
try {
|
||||
createStoreFile(r);
|
||||
createStoreFile(r);
|
||||
r.compactStores(true);
|
||||
|
||||
// add one more file & verify that a regular compaction won't work
|
||||
createStoreFile(r);
|
||||
r.compactStores(false);
|
||||
assertEquals(2, s.getStorefilesCount());
|
||||
|
||||
// ensure that major compaction time is deterministic
|
||||
long mcTime = s.getNextMajorCompactTime();
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
assertEquals(mcTime, s.getNextMajorCompactTime());
|
||||
}
|
||||
|
||||
// ensure that the major compaction time is within the variance
|
||||
long jitter = Math.round(delay * jitterPct);
|
||||
assertTrue(delay - jitter <= mcTime && mcTime <= delay + jitter);
|
||||
|
||||
// wait until the time-based compaction interval
|
||||
Thread.sleep(mcTime);
|
||||
|
||||
// trigger a compaction request and ensure that it's upgraded to major
|
||||
r.compactStores(false);
|
||||
assertEquals(1, s.getStorefilesCount());
|
||||
} finally {
|
||||
// reset the timed compaction settings
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
||||
// run a major to reset the cache
|
||||
createStoreFile(r);
|
||||
r.compactStores(true);
|
||||
assertEquals(1, s.getStorefilesCount());
|
||||
}
|
||||
}
|
||||
|
||||
public void testMinorCompactionWithDeleteRow() throws Exception {
|
||||
Delete deleteRow = new Delete(secondRowBytes);
|
||||
testMinorCompactionWithDelete(deleteRow);
|
||||
|
|
Loading…
Reference in New Issue