HBASE-10665 TestCompaction and TestCompactionWithCoprocessor run too long

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1574775 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2014-03-06 05:51:04 +00:00
parent 59f57d9158
commit 9265f6f762
5 changed files with 767 additions and 505 deletions

View File

@ -31,7 +31,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@ -216,10 +218,12 @@ public abstract class HBaseTestCase extends TestCase {
* @param r
* @param columnFamily
* @param column
* @param writeToWAL
* @throws IOException
* @return count of what we added.
*/
public static long addContent(final HRegion r, final byte [] columnFamily, final byte[] column)
public static long addContent(final HRegion r, final byte [] columnFamily, final byte[] column,
boolean writeToWAL)
throws IOException {
byte [] startKey = r.getRegionInfo().getStartKey();
byte [] endKey = r.getRegionInfo().getEndKey();
@ -228,7 +232,12 @@ public abstract class HBaseTestCase extends TestCase {
startKeyBytes = START_KEY_BYTES;
}
return addContent(new HRegionIncommon(r), Bytes.toString(columnFamily), Bytes.toString(column),
startKeyBytes, endKey, -1);
startKeyBytes, endKey, -1, writeToWAL);
}
public static long addContent(final HRegion r, final byte [] columnFamily, final byte[] column)
throws IOException {
return addContent(r, columnFamily, column, true);
}
/**
@ -237,12 +246,18 @@ public abstract class HBaseTestCase extends TestCase {
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
* @param r
* @param columnFamily
* @param writeToWAL
* @throws IOException
* @return count of what we added.
*/
public static long addContent(final HRegion r, final byte [] columnFamily, boolean writeToWAL)
throws IOException {
return addContent(r, columnFamily, null, writeToWAL);
}
public static long addContent(final HRegion r, final byte [] columnFamily)
throws IOException {
return addContent(r, columnFamily, null);
return addContent(r, columnFamily, null, true);
}
/**
@ -251,17 +266,28 @@ public abstract class HBaseTestCase extends TestCase {
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
* @param updater An instance of {@link Incommon}.
* @param columnFamily
* @param writeToWAL
* @throws IOException
* @return count of what we added.
*/
public static long addContent(final Incommon updater,
final String columnFamily, final boolean writeToWAL) throws IOException {
return addContent(updater, columnFamily, START_KEY_BYTES, null, writeToWAL);
}
public static long addContent(final Incommon updater,
final String columnFamily) throws IOException {
return addContent(updater, columnFamily, START_KEY_BYTES, null);
return addContent(updater, columnFamily, START_KEY_BYTES, null, true);
}
public static long addContent(final Incommon updater, final String family,
final String column, final boolean writeToWAL) throws IOException {
return addContent(updater, family, column, START_KEY_BYTES, null, writeToWAL);
}
public static long addContent(final Incommon updater, final String family,
final String column) throws IOException {
return addContent(updater, family, column, START_KEY_BYTES, null);
return addContent(updater, family, column, START_KEY_BYTES, null, true);
}
/**
@ -272,19 +298,26 @@ public abstract class HBaseTestCase extends TestCase {
* @param columnFamily
* @param startKeyBytes Where to start the rows inserted
* @param endKey Where to stop inserting rows.
* @param writeToWAL
* @return count of what we added.
* @throws IOException
*/
public static long addContent(final Incommon updater, final String columnFamily,
final byte [] startKeyBytes, final byte [] endKey)
final byte [] startKeyBytes, final byte [] endKey, final boolean writeToWAL)
throws IOException {
return addContent(updater, columnFamily, null, startKeyBytes, endKey, -1);
return addContent(updater, columnFamily, null, startKeyBytes, endKey, -1, writeToWAL);
}
public static long addContent(final Incommon updater, final String family,
final String column, final byte [] startKeyBytes,
final byte [] endKey) throws IOException {
return addContent(updater, family, column, startKeyBytes, endKey, -1);
final byte [] endKey,
final boolean writeToWAL) throws IOException {
return addContent(updater, family, column, startKeyBytes, endKey, -1, writeToWAL);
}
public static long addContent(final Incommon updater, final String family, String column,
final byte [] startKeyBytes, final byte [] endKey) throws IOException {
return addContent(updater, family, column, startKeyBytes, endKey, -1, true);
}
/**
@ -296,13 +329,15 @@ public abstract class HBaseTestCase extends TestCase {
* @param startKeyBytes Where to start the rows inserted
* @param endKey Where to stop inserting rows.
* @param ts Timestamp to write the content with.
* @param writeToWAL
* @return count of what we added.
* @throws IOException
*/
public static long addContent(final Incommon updater,
final String columnFamily,
final String column,
final byte [] startKeyBytes, final byte [] endKey, final long ts)
final byte [] startKeyBytes, final byte [] endKey, final long ts,
final boolean writeToWAL)
throws IOException {
long count = 0;
// Add rows of three characters. The first character starts with the
@ -348,6 +383,7 @@ public abstract class HBaseTestCase extends TestCase {
} else {
put.add(split[0], split[1], t);
}
put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
updater.put(put);
count++;
} catch (RuntimeException ex) {
@ -373,6 +409,13 @@ public abstract class HBaseTestCase extends TestCase {
return count;
}
public static long addContent(final Incommon updater,
final String columnFamily,
final String column,
final byte [] startKeyBytes, final byte [] endKey, final long ts) throws IOException {
return addContent(updater, columnFamily, column, startKeyBytes, endKey, ts, true);
}
/**
* Implementors can flushcache.
*/

View File

@ -21,10 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -37,10 +34,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
@ -50,8 +44,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
@ -61,19 +53,11 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -91,25 +75,22 @@ import org.mockito.stubbing.Answer;
/**
* Test compactions
* Test compaction framework and common functions
*/
@Category(MediumTests.class)
public class TestCompaction {
@Rule public TestName name = new TestName();
static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility().createLocalHTU();
private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
protected Configuration conf = UTIL.getConfiguration();
private HRegion r = null;
private HTableDescriptor htd = null;
private Path compactionDir = null;
private Path regionCompactionDir = null;
private static final byte [] COLUMN_FAMILY = fam1;
private final byte [] STARTROW = Bytes.toBytes(START_KEY);
private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
private int compactionThreshold;
private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
final private byte[] col1, col2;
private byte[] secondRowBytes, thirdRowBytes;
private static final long MAX_FILES_TO_COMPACT = 10;
/** constructor */
@ -121,14 +102,11 @@ public class TestCompaction {
conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
firstRowBytes = START_KEY_BYTES;
secondRowBytes = START_KEY_BYTES.clone();
// Increment the least significant character so we get to next row.
secondRowBytes[START_KEY_BYTES.length - 1]++;
thirdRowBytes = START_KEY_BYTES.clone();
thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
col1 = Bytes.toBytes("column1");
col2 = Bytes.toBytes("column2");
}
@Before
@ -144,384 +122,6 @@ public class TestCompaction {
hlog.closeAndDelete();
}
/**
* Test that on a major compaction, if all cells are expired or deleted, then
* we'll end up with no product. Make sure scanner over region returns
* right answer in this case - and that it just basically works.
* @throws IOException
*/
@Test
public void testMajorCompactingToNoOutput() throws IOException {
createStoreFile(r);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Now delete everything.
InternalScanner s = r.getScanner(new Scan());
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
r.delete(new Delete(CellUtil.cloneRow(results.get(0))));
if (!result) break;
} while(true);
s.close();
// Flush
r.flushcache();
// Major compact.
r.compactStores(true);
s = r.getScanner(new Scan());
int counter = 0;
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
if (!result) break;
counter++;
} while(true);
assertEquals(0, counter);
}
/**
* Run compaction and flushing memstore
* Assert deletes get cleaned up.
* @throws Exception
*/
@Test
public void testMajorCompaction() throws Exception {
majorCompaction();
}
@Test
public void testDataBlockEncodingInCacheOnly() throws Exception {
majorCompactionWithDataBlockEncoding(true);
}
@Test
public void testDataBlockEncodingEverywhere() throws Exception {
majorCompactionWithDataBlockEncoding(false);
}
public void majorCompactionWithDataBlockEncoding(boolean inCacheOnly)
throws Exception {
Map<HStore, HFileDataBlockEncoder> replaceBlockCache =
new HashMap<HStore, HFileDataBlockEncoder>();
for (Entry<byte[], Store> pair : r.getStores().entrySet()) {
HStore store = (HStore) pair.getValue();
HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
replaceBlockCache.put(store, blockEncoder);
final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
inCache;
store.setDataBlockEncoderInTest(new HFileDataBlockEncoderImpl(onDisk));
}
majorCompaction();
// restore settings
for (Entry<HStore, HFileDataBlockEncoder> entry :
replaceBlockCache.entrySet()) {
entry.getKey().setDataBlockEncoderInTest(entry.getValue());
}
}
private void majorCompaction() throws Exception {
createStoreFile(r);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Add more content.
HBaseTestCase.addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
// Now there are about 5 versions of each column.
// Default is that there only 3 (MAXVERSIONS) versions allowed per column.
//
// Assert == 3 when we ask for versions.
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
// see if CompactionProgress is in place but null
for (Store store : this.r.stores.values()) {
assertNull(store.getCompactionProgress());
}
r.flushcache();
r.compactStores(true);
// see if CompactionProgress has done its thing on at least one store
int storeCount = 0;
for (Store store : this.r.stores.values()) {
CompactionProgress progress = store.getCompactionProgress();
if( progress != null ) {
++storeCount;
assertTrue(progress.currentCompactedKVs > 0);
assertTrue(progress.totalCompactingKVs > 0);
}
assertTrue(storeCount > 0);
}
// look at the second row
// Increment the least significant character so we get to next row.
byte [] secondRowBytes = START_KEY_BYTES.clone();
secondRowBytes[START_KEY_BYTES.length - 1]++;
// Always 3 versions if that is what max versions is.
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).
setMaxVersions(100));
LOG.debug("Row " + Bytes.toStringBinary(secondRowBytes) + " after " +
"initial compaction: " + result);
assertEquals("Invalid number of versions of row "
+ Bytes.toStringBinary(secondRowBytes) + ".", compactionThreshold,
result.size());
// Now add deletes to memstore and then flush it.
// That will put us over
// the compaction threshold of 3 store files. Compacting these store files
// should result in a compacted store file that has no references to the
// deleted row.
LOG.debug("Adding deletes to memstore and flushing");
Delete delete = new Delete(secondRowBytes, System.currentTimeMillis());
byte [][] famAndQf = {COLUMN_FAMILY, null};
delete.deleteFamily(famAndQf[0]);
r.delete(delete);
// Assert deleted.
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should have been deleted", result.isEmpty());
r.flushcache();
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should have been deleted", result.isEmpty());
// Add a bit of data and flush. Start adding at 'bbb'.
createSmallerStoreFile(this.r);
r.flushcache();
// Assert that the second row is still deleted.
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should still be deleted", result.isEmpty());
// Force major compaction.
r.compactStores(true);
assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should still be deleted", result.isEmpty());
// Make sure the store files do have some 'aaa' keys in them -- exactly 3.
// Also, that compacted store files do not have any secondRowBytes because
// they were deleted.
verifyCounts(3,0);
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttl = 1000;
for (Store hstore : this.r.stores.values()) {
HStore store = ((HStore) hstore);
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator());
store.setScanInfo(si);
}
Thread.sleep(1000);
r.compactStores(true);
int count = count();
assertEquals("Should not see anything after TTL has expired", 0, count);
}
@Test
public void testTimeBasedMajorCompaction() throws Exception {
// create 2 storefiles and force a major compaction to reset the time
int delay = 10 * 1000; // 10 sec
float jitterPct = 0.20f; // 20%
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
s.storeEngine.getCompactionPolicy().setConf(conf);
try {
createStoreFile(r);
createStoreFile(r);
r.compactStores(true);
// add one more file & verify that a regular compaction won't work
createStoreFile(r);
r.compactStores(false);
assertEquals(2, s.getStorefilesCount());
// ensure that major compaction time is deterministic
RatioBasedCompactionPolicy
c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy();
Collection<StoreFile> storeFiles = s.getStorefiles();
long mcTime = c.getNextMajorCompactTime(storeFiles);
for (int i = 0; i < 10; ++i) {
assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
}
// ensure that the major compaction time is within the variance
long jitter = Math.round(delay * jitterPct);
assertTrue(delay - jitter <= mcTime && mcTime <= delay + jitter);
// wait until the time-based compaction interval
Thread.sleep(mcTime);
// trigger a compaction request and ensure that it's upgraded to major
r.compactStores(false);
assertEquals(1, s.getStorefilesCount());
} finally {
// reset the timed compaction settings
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
// run a major to reset the cache
createStoreFile(r);
r.compactStores(true);
assertEquals(1, s.getStorefilesCount());
}
}
@Test
public void testMinorCompactionWithDeleteRow() throws Exception {
Delete deleteRow = new Delete(secondRowBytes);
testMinorCompactionWithDelete(deleteRow);
}
@Test
public void testMinorCompactionWithDeleteColumn1() throws Exception {
Delete dc = new Delete(secondRowBytes);
/* delete all timestamps in the column */
dc.deleteColumns(fam2, col2);
testMinorCompactionWithDelete(dc);
}
@Test
public void testMinorCompactionWithDeleteColumn2() throws Exception {
Delete dc = new Delete(secondRowBytes);
dc.deleteColumn(fam2, col2);
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
* we only delete the latest version. One might expect to see only
* versions 1 and 2. HBase differs, and gives us 0, 1 and 2.
* This is okay as well. Since there was no compaction done before the
* delete, version 0 seems to stay on.
*/
//testMinorCompactionWithDelete(dc, 2);
testMinorCompactionWithDelete(dc, 3);
}
@Test
public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
Delete deleteCF = new Delete(secondRowBytes);
deleteCF.deleteFamily(fam2);
testMinorCompactionWithDelete(deleteCF);
}
@Test
public void testMinorCompactionWithDeleteVersion1() throws Exception {
Delete deleteVersion = new Delete(secondRowBytes);
deleteVersion.deleteColumns(fam2, col2, 2);
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
* We delete versions 0 ... 2. So, we still have one remaining.
*/
testMinorCompactionWithDelete(deleteVersion, 1);
}
@Test
public void testMinorCompactionWithDeleteVersion2() throws Exception {
Delete deleteVersion = new Delete(secondRowBytes);
deleteVersion.deleteColumn(fam2, col2, 1);
/*
* the table has 4 versions: 0, 1, 2, and 3.
* We delete 1.
* Should have 3 remaining.
*/
testMinorCompactionWithDelete(deleteVersion, 3);
}
/*
* A helper function to test the minor compaction algorithm. We check that
* the delete markers are left behind. Takes delete as an argument, which
* can be any delete (row, column, columnfamliy etc), that essentially
* deletes row2 and column2. row1 and column1 should be undeleted
*/
private void testMinorCompactionWithDelete(Delete delete) throws Exception {
testMinorCompactionWithDelete(delete, 0);
}
private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception {
HRegionIncommon loader = new HRegionIncommon(r);
for (int i = 0; i < compactionThreshold + 1; i++) {
HBaseTestCase.addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
HBaseTestCase.addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
HBaseTestCase.addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
HBaseTestCase.addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
r.flushcache();
}
Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
// Now add deletes to memstore and then flush it. That will put us over
// the compaction threshold of 3 store files. Compacting these store files
// should result in a compacted store file that has no references to the
// deleted row.
r.delete(delete);
// Make sure that we have only deleted family2 from secondRowBytes
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(expectedResultsAfterDelete, result.size());
// but we still have firstrow
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
r.flushcache();
// should not change anything.
// Let us check again
// Make sure that we have only deleted family2 from secondRowBytes
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(expectedResultsAfterDelete, result.size());
// but we still have firstrow
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
// do a compaction
Store store2 = this.r.stores.get(fam2);
int numFiles1 = store2.getStorefiles().size();
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
((HStore)store2).compactRecentForTestingAssumingDefaultPolicy(compactionThreshold); // = 3
int numFiles2 = store2.getStorefiles().size();
// Check that we did compact
assertTrue("Number of store files should go down", numFiles1 > numFiles2);
// Check that it was a minor compaction.
assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
// Make sure that we have only deleted family2 from secondRowBytes
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(expectedResultsAfterDelete, result.size());
// but we still have firstrow
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
}
private void verifyCounts(int countRow1, int countRow2) throws Exception {
int count1 = 0;
int count2 = 0;
for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
scanner.seekTo();
do {
byte [] row = scanner.getKeyValue().getRow();
if (Bytes.equals(row, STARTROW)) {
count1++;
} else if(Bytes.equals(row, secondRowBytes)) {
count2++;
}
} while(scanner.next());
}
assertEquals(countRow1,count1);
assertEquals(countRow2,count2);
}
/**
* Verify that you can stop a long-running compaction
* (used during RS shutdown)
@ -546,7 +146,7 @@ public class TestCompaction {
for (int j = 0; j < jmax; j++) {
p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
}
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), false);
loader.put(p);
loader.flushcache();
}
@ -623,14 +223,7 @@ public class TestCompaction {
private void createStoreFile(final HRegion region, String family) throws IOException {
HRegionIncommon loader = new HRegionIncommon(region);
HBaseTestCase.addContent(loader, family);
loader.flushcache();
}
private void createSmallerStoreFile(final HRegion region) throws IOException {
HRegionIncommon loader = new HRegionIncommon(region);
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
"bbb").getBytes(), null);
HBaseTestCase.addContent(loader, family, false);
loader.flushcache();
}
@ -644,8 +237,7 @@ public class TestCompaction {
Collection<StoreFile> storeFiles = store.getStorefiles();
DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
List<Path> newFiles = tool.compactForTesting(storeFiles, false);
tool.compactForTesting(storeFiles, false);
// Now lets corrupt the compacted file.
FileSystem fs = store.getFileSystem();
@ -671,45 +263,6 @@ public class TestCompaction {
"thrown while completing a corrupt file");
}
/**
* Test for HBASE-5920 - Test user requested major compactions always occurring
*/
@Test
public void testNonUserMajorCompactionRequest() throws Exception {
Store store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"System-requested major compaction should not occur if there are too many store files",
false,
request.isMajor());
}
/**
* Test for HBASE-5920
*/
@Test
public void testUserMajorCompactionRequest() throws IOException{
Store store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"User-requested major compaction should always occur, even if there are too many store files",
true,
request.isMajor());
}
/**
* Create a custom compaction request and be sure that we can track it through the queue, knowing
* when the compaction is completed.
@ -775,47 +328,6 @@ public class TestCompaction {
thread.interruptIfNecessary();
}
/**
* Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no
* product. Make sure scanner over region returns right answer in this case - and that it just
* basically works.
* @throws IOException
*/
public void testMajorCompactingToNoOutputWithReverseScan() throws IOException {
createStoreFile(r);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Now delete everything.
Scan scan = new Scan();
scan.setReversed(true);
InternalScanner s = r.getScanner(scan);
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
assertTrue(!results.isEmpty());
r.delete(new Delete(results.get(0).getRow()));
if (!result) break;
} while (true);
s.close();
// Flush
r.flushcache();
// Major compact.
r.compactStores(true);
scan = new Scan();
scan.setReversed(true);
s = r.getScanner(scan);
int counter = 0;
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
if (!result) break;
counter++;
} while (true);
s.close();
assertEquals(0, counter);
}
private class StoreMockMaker extends StatefulStoreMockMaker {
public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();

View File

@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.junit.experimental.categories.Category;
/**
* Make sure all compaction tests still pass with the preFlush and preCompact
* Make sure compaction tests still pass with the preFlush and preCompact
* overridden to implement the default behavior
*/
@Category(MediumTests.class)

View File

@ -0,0 +1,483 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Test major compactions
*/
@Category(MediumTests.class)
public class TestMajorCompaction {
@Rule public TestName name = new TestName();
static final Log LOG = LogFactory.getLog(TestMajorCompaction.class.getName());
private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
protected Configuration conf = UTIL.getConfiguration();
private HRegion r = null;
private HTableDescriptor htd = null;
private static final byte [] COLUMN_FAMILY = fam1;
private final byte [] STARTROW = Bytes.toBytes(START_KEY);
private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
private int compactionThreshold;
private byte[] secondRowBytes, thirdRowBytes;
private static final long MAX_FILES_TO_COMPACT = 10;
/** constructor */
public TestMajorCompaction() {
super();
// Set cache flush size to 1MB
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
secondRowBytes = START_KEY_BYTES.clone();
// Increment the least significant character so we get to next row.
secondRowBytes[START_KEY_BYTES.length - 1]++;
thirdRowBytes = START_KEY_BYTES.clone();
thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
}
@Before
public void setUp() throws Exception {
this.htd = UTIL.createTableDescriptor(name.getMethodName());
this.r = UTIL.createLocalHRegion(htd, null, null);
}
@After
public void tearDown() throws Exception {
HLog hlog = r.getLog();
this.r.close();
hlog.closeAndDelete();
}
/**
* Test that on a major compaction, if all cells are expired or deleted, then
* we'll end up with no product. Make sure scanner over region returns
* right answer in this case - and that it just basically works.
* @throws IOException
*/
@Test
public void testMajorCompactingToNoOutput() throws IOException {
createStoreFile(r);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Now delete everything.
InternalScanner s = r.getScanner(new Scan());
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
r.delete(new Delete(CellUtil.cloneRow(results.get(0))));
if (!result) break;
} while(true);
s.close();
// Flush
r.flushcache();
// Major compact.
r.compactStores(true);
s = r.getScanner(new Scan());
int counter = 0;
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
if (!result) break;
counter++;
} while(true);
assertEquals(0, counter);
}
/**
* Run compaction and flushing memstore
* Assert deletes get cleaned up.
* @throws Exception
*/
@Test
public void testMajorCompaction() throws Exception {
majorCompaction();
}
@Test
public void testDataBlockEncodingInCacheOnly() throws Exception {
majorCompactionWithDataBlockEncoding(true);
}
@Test
public void testDataBlockEncodingEverywhere() throws Exception {
majorCompactionWithDataBlockEncoding(false);
}
public void majorCompactionWithDataBlockEncoding(boolean inCacheOnly)
throws Exception {
Map<HStore, HFileDataBlockEncoder> replaceBlockCache =
new HashMap<HStore, HFileDataBlockEncoder>();
for (Entry<byte[], Store> pair : r.getStores().entrySet()) {
HStore store = (HStore) pair.getValue();
HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
replaceBlockCache.put(store, blockEncoder);
final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
inCache;
store.setDataBlockEncoderInTest(new HFileDataBlockEncoderImpl(onDisk));
}
majorCompaction();
// restore settings
for (Entry<HStore, HFileDataBlockEncoder> entry :
replaceBlockCache.entrySet()) {
entry.getKey().setDataBlockEncoderInTest(entry.getValue());
}
}
private void majorCompaction() throws Exception {
createStoreFile(r);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Add more content.
HBaseTestCase.addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY), false);
// Now there are about 5 versions of each column.
// Default is that there only 3 (MAXVERSIONS) versions allowed per column.
//
// Assert == 3 when we ask for versions.
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
// see if CompactionProgress is in place but null
for (Store store : this.r.stores.values()) {
assertNull(store.getCompactionProgress());
}
r.flushcache();
r.compactStores(true);
// see if CompactionProgress has done its thing on at least one store
int storeCount = 0;
for (Store store : this.r.stores.values()) {
CompactionProgress progress = store.getCompactionProgress();
if( progress != null ) {
++storeCount;
assertTrue(progress.currentCompactedKVs > 0);
assertTrue(progress.totalCompactingKVs > 0);
}
assertTrue(storeCount > 0);
}
// look at the second row
// Increment the least significant character so we get to next row.
byte [] secondRowBytes = START_KEY_BYTES.clone();
secondRowBytes[START_KEY_BYTES.length - 1]++;
// Always 3 versions if that is what max versions is.
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).
setMaxVersions(100));
LOG.debug("Row " + Bytes.toStringBinary(secondRowBytes) + " after " +
"initial compaction: " + result);
assertEquals("Invalid number of versions of row "
+ Bytes.toStringBinary(secondRowBytes) + ".", compactionThreshold,
result.size());
// Now add deletes to memstore and then flush it.
// That will put us over
// the compaction threshold of 3 store files. Compacting these store files
// should result in a compacted store file that has no references to the
// deleted row.
LOG.debug("Adding deletes to memstore and flushing");
Delete delete = new Delete(secondRowBytes, System.currentTimeMillis());
byte [][] famAndQf = {COLUMN_FAMILY, null};
delete.deleteFamily(famAndQf[0]);
r.delete(delete);
// Assert deleted.
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should have been deleted", result.isEmpty());
r.flushcache();
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should have been deleted", result.isEmpty());
// Add a bit of data and flush. Start adding at 'bbb'.
createSmallerStoreFile(this.r);
r.flushcache();
// Assert that the second row is still deleted.
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should still be deleted", result.isEmpty());
// Force major compaction.
r.compactStores(true);
assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
assertTrue("Second row should still be deleted", result.isEmpty());
// Make sure the store files do have some 'aaa' keys in them -- exactly 3.
// Also, that compacted store files do not have any secondRowBytes because
// they were deleted.
verifyCounts(3,0);
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttl = 1000;
for (Store hstore : this.r.stores.values()) {
HStore store = ((HStore) hstore);
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator());
store.setScanInfo(si);
}
Thread.sleep(1000);
r.compactStores(true);
int count = count();
assertEquals("Should not see anything after TTL has expired", 0, count);
}
@Test
public void testTimeBasedMajorCompaction() throws Exception {
// create 2 storefiles and force a major compaction to reset the time
int delay = 10 * 1000; // 10 sec
float jitterPct = 0.20f; // 20%
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
s.storeEngine.getCompactionPolicy().setConf(conf);
try {
createStoreFile(r);
createStoreFile(r);
r.compactStores(true);
// add one more file & verify that a regular compaction won't work
createStoreFile(r);
r.compactStores(false);
assertEquals(2, s.getStorefilesCount());
// ensure that major compaction time is deterministic
RatioBasedCompactionPolicy
c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy();
Collection<StoreFile> storeFiles = s.getStorefiles();
long mcTime = c.getNextMajorCompactTime(storeFiles);
for (int i = 0; i < 10; ++i) {
assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
}
// ensure that the major compaction time is within the variance
long jitter = Math.round(delay * jitterPct);
assertTrue(delay - jitter <= mcTime && mcTime <= delay + jitter);
// wait until the time-based compaction interval
Thread.sleep(mcTime);
// trigger a compaction request and ensure that it's upgraded to major
r.compactStores(false);
assertEquals(1, s.getStorefilesCount());
} finally {
// reset the timed compaction settings
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
// run a major to reset the cache
createStoreFile(r);
r.compactStores(true);
assertEquals(1, s.getStorefilesCount());
}
}
private void verifyCounts(int countRow1, int countRow2) throws Exception {
int count1 = 0;
int count2 = 0;
for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
scanner.seekTo();
do {
byte [] row = scanner.getKeyValue().getRow();
if (Bytes.equals(row, STARTROW)) {
count1++;
} else if(Bytes.equals(row, secondRowBytes)) {
count2++;
}
} while(scanner.next());
}
assertEquals(countRow1,count1);
assertEquals(countRow2,count2);
}
private int count() throws IOException {
int count = 0;
for (StoreFile f: this.r.stores.
get(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
if (!scanner.seekTo()) {
continue;
}
do {
count++;
} while(scanner.next());
}
return count;
}
private void createStoreFile(final HRegion region) throws IOException {
createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
}
private void createStoreFile(final HRegion region, String family) throws IOException {
HRegionIncommon loader = new HRegionIncommon(region);
HBaseTestCase.addContent(loader, family, false);
loader.flushcache();
}
private void createSmallerStoreFile(final HRegion region) throws IOException {
HRegionIncommon loader = new HRegionIncommon(region);
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
"bbb").getBytes(), null, false);
loader.flushcache();
}
/**
* Test for HBASE-5920 - Test user requested major compactions always occurring
*/
@Test
public void testNonUserMajorCompactionRequest() throws Exception {
Store store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"System-requested major compaction should not occur if there are too many store files",
false,
request.isMajor());
}
/**
* Test for HBASE-5920
*/
@Test
public void testUserMajorCompactionRequest() throws IOException{
Store store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"User-requested major compaction should always occur, even if there are too many store files",
true,
request.isMajor());
}
/**
* Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no
* product. Make sure scanner over region returns right answer in this case - and that it just
* basically works.
* @throws IOException
*/
public void testMajorCompactingToNoOutputWithReverseScan() throws IOException {
createStoreFile(r);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Now delete everything.
Scan scan = new Scan();
scan.setReversed(true);
InternalScanner s = r.getScanner(scan);
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
assertTrue(!results.isEmpty());
r.delete(new Delete(results.get(0).getRow()));
if (!result) break;
} while (true);
s.close();
// Flush
r.flushcache();
// Major compact.
r.compactStores(true);
scan = new Scan();
scan.setReversed(true);
s = r.getScanner(scan);
int counter = 0;
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
if (!result) break;
counter++;
} while (true);
s.close();
assertEquals(0, counter);
}
}

View File

@ -0,0 +1,224 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Test minor compactions
*/
@Category(MediumTests.class)
public class TestMinorCompaction {
@Rule public TestName name = new TestName();
static final Log LOG = LogFactory.getLog(TestMinorCompaction.class.getName());
private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
protected Configuration conf = UTIL.getConfiguration();
private HRegion r = null;
private HTableDescriptor htd = null;
private int compactionThreshold;
private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
final private byte[] col1, col2;
/** constructor */
public TestMinorCompaction() {
super();
// Set cache flush size to 1MB
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
firstRowBytes = START_KEY_BYTES;
secondRowBytes = START_KEY_BYTES.clone();
// Increment the least significant character so we get to next row.
secondRowBytes[START_KEY_BYTES.length - 1]++;
thirdRowBytes = START_KEY_BYTES.clone();
thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
col1 = Bytes.toBytes("column1");
col2 = Bytes.toBytes("column2");
}
@Before
public void setUp() throws Exception {
this.htd = UTIL.createTableDescriptor(name.getMethodName());
this.r = UTIL.createLocalHRegion(htd, null, null);
}
@After
public void tearDown() throws Exception {
HLog hlog = r.getLog();
this.r.close();
hlog.closeAndDelete();
}
@Test
public void testMinorCompactionWithDeleteRow() throws Exception {
Delete deleteRow = new Delete(secondRowBytes);
testMinorCompactionWithDelete(deleteRow);
}
@Test
public void testMinorCompactionWithDeleteColumn1() throws Exception {
Delete dc = new Delete(secondRowBytes);
/* delete all timestamps in the column */
dc.deleteColumns(fam2, col2);
testMinorCompactionWithDelete(dc);
}
@Test
public void testMinorCompactionWithDeleteColumn2() throws Exception {
Delete dc = new Delete(secondRowBytes);
dc.deleteColumn(fam2, col2);
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
* we only delete the latest version. One might expect to see only
* versions 1 and 2. HBase differs, and gives us 0, 1 and 2.
* This is okay as well. Since there was no compaction done before the
* delete, version 0 seems to stay on.
*/
testMinorCompactionWithDelete(dc, 3);
}
@Test
public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
Delete deleteCF = new Delete(secondRowBytes);
deleteCF.deleteFamily(fam2);
testMinorCompactionWithDelete(deleteCF);
}
@Test
public void testMinorCompactionWithDeleteVersion1() throws Exception {
Delete deleteVersion = new Delete(secondRowBytes);
deleteVersion.deleteColumns(fam2, col2, 2);
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
* We delete versions 0 ... 2. So, we still have one remaining.
*/
testMinorCompactionWithDelete(deleteVersion, 1);
}
@Test
public void testMinorCompactionWithDeleteVersion2() throws Exception {
Delete deleteVersion = new Delete(secondRowBytes);
deleteVersion.deleteColumn(fam2, col2, 1);
/*
* the table has 4 versions: 0, 1, 2, and 3.
* We delete 1.
* Should have 3 remaining.
*/
testMinorCompactionWithDelete(deleteVersion, 3);
}
/*
* A helper function to test the minor compaction algorithm. We check that
* the delete markers are left behind. Takes delete as an argument, which
* can be any delete (row, column, columnfamliy etc), that essentially
* deletes row2 and column2. row1 and column1 should be undeleted
*/
private void testMinorCompactionWithDelete(Delete delete) throws Exception {
testMinorCompactionWithDelete(delete, 0);
}
private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception {
HRegionIncommon loader = new HRegionIncommon(r);
for (int i = 0; i < compactionThreshold + 1; i++) {
HBaseTestCase.addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes,
thirdRowBytes, i, false);
HBaseTestCase.addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes,
thirdRowBytes, i, false);
HBaseTestCase.addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes,
thirdRowBytes, i, false);
HBaseTestCase.addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes,
thirdRowBytes, i, false);
r.flushcache();
}
Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
// Now add deletes to memstore and then flush it. That will put us over
// the compaction threshold of 3 store files. Compacting these store files
// should result in a compacted store file that has no references to the
// deleted row.
r.delete(delete);
// Make sure that we have only deleted family2 from secondRowBytes
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(expectedResultsAfterDelete, result.size());
// but we still have firstrow
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
r.flushcache();
// should not change anything.
// Let us check again
// Make sure that we have only deleted family2 from secondRowBytes
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(expectedResultsAfterDelete, result.size());
// but we still have firstrow
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
// do a compaction
Store store2 = this.r.stores.get(fam2);
int numFiles1 = store2.getStorefiles().size();
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
((HStore)store2).compactRecentForTestingAssumingDefaultPolicy(compactionThreshold); // = 3
int numFiles2 = store2.getStorefiles().size();
// Check that we did compact
assertTrue("Number of store files should go down", numFiles1 > numFiles2);
// Check that it was a minor compaction.
assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
// Make sure that we have only deleted family2 from secondRowBytes
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100));
assertEquals(expectedResultsAfterDelete, result.size());
// but we still have firstrow
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100));
assertEquals(compactionThreshold, result.size());
}
}