HBASE-26069 Remove HStore.compactRecentForTestingAssumingDefaultPolic… (#3462)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
29cd782d25
commit
e65fc9226f
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -53,7 +52,6 @@ import java.util.function.Predicate;
|
|||
import java.util.function.ToLongFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -89,7 +87,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
|
||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
|
@ -1747,64 +1744,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method tries to compact N recent files for testing.
|
||||
* Note that because compacting "recent" files only makes sense for some policies,
|
||||
* e.g. the default one, it assumes default policy is used. It doesn't use policy,
|
||||
* but instead makes a compaction candidate list by itself.
|
||||
* @param N Number of files.
|
||||
*/
|
||||
public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
|
||||
List<HStoreFile> filesToCompact;
|
||||
boolean isMajor;
|
||||
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
synchronized (filesCompacting) {
|
||||
filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
|
||||
if (!filesCompacting.isEmpty()) {
|
||||
// exclude all files older than the newest file we're currently
|
||||
// compacting. this allows us to preserve contiguity (HBASE-2856)
|
||||
HStoreFile last = filesCompacting.get(filesCompacting.size() - 1);
|
||||
int idx = filesToCompact.indexOf(last);
|
||||
Preconditions.checkArgument(idx != -1);
|
||||
filesToCompact.subList(0, idx + 1).clear();
|
||||
}
|
||||
int count = filesToCompact.size();
|
||||
if (N > count) {
|
||||
throw new RuntimeException("Not enough files");
|
||||
}
|
||||
|
||||
filesToCompact = filesToCompact.subList(count - N, count);
|
||||
isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
|
||||
filesCompacting.addAll(filesToCompact);
|
||||
Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
|
||||
.getStoreFileComparator());
|
||||
}
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
||||
try {
|
||||
// Ready to go. Have list of files to compact.
|
||||
List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
|
||||
.compactForTesting(filesToCompact, isMajor);
|
||||
for (Path newFile: newFiles) {
|
||||
// Move the compaction into place.
|
||||
HStoreFile sf = moveFileIntoPlace(newFile);
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
this.getCoprocessorHost().postCompact(this, sf, null, null, null);
|
||||
}
|
||||
replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
|
||||
refreshStoreSizeAndTotalBytes();
|
||||
}
|
||||
} finally {
|
||||
synchronized (filesCompacting) {
|
||||
filesCompacting.removeAll(filesToCompact);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReferences() {
|
||||
// Grab the read lock here, because we need to ensure that: only when the atomic
|
||||
|
|
|
@ -18,21 +18,18 @@
|
|||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
|
@ -65,23 +62,6 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
|
|||
return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact a list of files for testing. Creates a fake {@link CompactionRequestImpl} to pass to
|
||||
* {@link #compact(CompactionRequestImpl, ThroughputController, User)};
|
||||
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
|
||||
* generated {@link CompactionRequestImpl}.
|
||||
* @param isMajor true to major compact (prune all deletes, max versions, etc)
|
||||
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \
|
||||
* made it through the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<Path> compactForTesting(Collection<HStoreFile> filesToCompact, boolean isMajor)
|
||||
throws IOException {
|
||||
CompactionRequestImpl cr = new CompactionRequestImpl(filesToCompact);
|
||||
cr.setIsMajor(isMajor, isMajor);
|
||||
return compact(cr, NoLimitThroughputController.INSTANCE, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
|
||||
CompactionRequestImpl request) throws IOException {
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -82,25 +81,18 @@ public class TestScannerSelectionUsingTTL {
|
|||
|
||||
public final int numFreshFiles, totalNumFiles;
|
||||
|
||||
/** Whether we are specifying the exact files to compact */
|
||||
private final boolean explicitCompaction;
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> parameters() {
|
||||
List<Object[]> params = new ArrayList<>();
|
||||
for (int numFreshFiles = 1; numFreshFiles <= 3; ++numFreshFiles) {
|
||||
for (boolean explicitCompaction : new boolean[] { false, true }) {
|
||||
params.add(new Object[] { numFreshFiles, explicitCompaction });
|
||||
}
|
||||
params.add(new Object[] { numFreshFiles });
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
public TestScannerSelectionUsingTTL(int numFreshFiles,
|
||||
boolean explicitCompaction) {
|
||||
public TestScannerSelectionUsingTTL(int numFreshFiles) {
|
||||
this.numFreshFiles = numFreshFiles;
|
||||
this.totalNumFiles = numFreshFiles + NUM_EXPIRED_FILES;
|
||||
this.explicitCompaction = explicitCompaction;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -152,13 +144,7 @@ public class TestScannerSelectionUsingTTL {
|
|||
Set<String> accessedFiles = cache.getCachedFileNamesForTest();
|
||||
LOG.debug("Files accessed during scan: " + accessedFiles);
|
||||
|
||||
// Exercise both compaction codepaths.
|
||||
if (explicitCompaction) {
|
||||
HStore store = region.getStore(FAMILY_BYTES);
|
||||
store.compactRecentForTestingAssumingDefaultPolicy(totalNumFiles);
|
||||
} else {
|
||||
region.compact(false);
|
||||
}
|
||||
region.compact(false);
|
||||
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
|
|
|
@ -85,6 +85,8 @@ import org.junit.rules.TestName;
|
|||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test compaction framework and common functions
|
||||
|
@ -94,9 +96,12 @@ public class TestCompaction {
|
|||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCompaction.class);
|
||||
HBaseClassTestRule.forClass(TestCompaction.class);
|
||||
|
||||
@Rule public TestName name = new TestName();
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
protected Configuration conf = UTIL.getConfiguration();
|
||||
|
||||
|
@ -154,7 +159,6 @@ public class TestCompaction {
|
|||
/**
|
||||
* Verify that you can stop a long-running compaction
|
||||
* (used during RS shutdown)
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testInterruptCompactionBySize() throws Exception {
|
||||
|
@ -180,7 +184,7 @@ public class TestCompaction {
|
|||
}
|
||||
|
||||
HRegion spyR = spy(r);
|
||||
doAnswer(new Answer() {
|
||||
doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
r.writestate.writesEnabled = false;
|
||||
|
@ -256,7 +260,7 @@ public class TestCompaction {
|
|||
}
|
||||
|
||||
HRegion spyR = spy(r);
|
||||
doAnswer(new Answer() {
|
||||
doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
r.writestate.writesEnabled = false;
|
||||
|
@ -311,15 +315,14 @@ public class TestCompaction {
|
|||
|
||||
private int count() throws IOException {
|
||||
int count = 0;
|
||||
for (HStoreFile f: this.r.stores.
|
||||
get(COLUMN_FAMILY_TEXT).getStorefiles()) {
|
||||
for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
|
||||
HFileScanner scanner = f.getReader().getScanner(false, false);
|
||||
if (!scanner.seekTo()) {
|
||||
continue;
|
||||
}
|
||||
do {
|
||||
count++;
|
||||
} while(scanner.next());
|
||||
} while (scanner.next());
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
@ -344,7 +347,8 @@ public class TestCompaction {
|
|||
|
||||
Collection<HStoreFile> storeFiles = store.getStorefiles();
|
||||
DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
|
||||
tool.compactForTesting(storeFiles, false);
|
||||
CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
|
||||
tool.compact(request, NoLimitThroughputController.INSTANCE, null);
|
||||
|
||||
// Now lets corrupt the compacted file.
|
||||
FileSystem fs = store.getFileSystem();
|
||||
|
@ -363,7 +367,7 @@ public class TestCompaction {
|
|||
// in the 'tmp' directory;
|
||||
assertTrue(fs.exists(origPath));
|
||||
assertFalse(fs.exists(dstPath));
|
||||
System.out.println("testCompactionWithCorruptResult Passed");
|
||||
LOG.info("testCompactionWithCorruptResult Passed");
|
||||
return;
|
||||
}
|
||||
fail("testCompactionWithCorruptResult failed since no exception was" +
|
||||
|
@ -418,28 +422,27 @@ public class TestCompaction {
|
|||
Mockito.when(mockRegion.checkSplit()).
|
||||
thenThrow(new RuntimeException("Thrown intentionally by test!"));
|
||||
|
||||
MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r);
|
||||
try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
|
||||
|
||||
long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
|
||||
long preFailedCount = metricsWrapper.getNumCompactionsFailed();
|
||||
long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
|
||||
long preFailedCount = metricsWrapper.getNumCompactionsFailed();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Tracker tracker = new Tracker(latch);
|
||||
thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER,
|
||||
tracker, null);
|
||||
// wait for the latch to complete.
|
||||
latch.await(120, TimeUnit.SECONDS);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Tracker tracker = new Tracker(latch);
|
||||
thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
|
||||
null);
|
||||
// wait for the latch to complete.
|
||||
latch.await(120, TimeUnit.SECONDS);
|
||||
|
||||
// compaction should have completed and been marked as failed due to error in split request
|
||||
long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
|
||||
long postFailedCount = metricsWrapper.getNumCompactionsFailed();
|
||||
// compaction should have completed and been marked as failed due to error in split request
|
||||
long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
|
||||
long postFailedCount = metricsWrapper.getNumCompactionsFailed();
|
||||
|
||||
assertTrue("Completed count should have increased (pre=" + preCompletedCount +
|
||||
", post="+postCompletedCount+")",
|
||||
postCompletedCount > preCompletedCount);
|
||||
assertTrue("Failed count should have increased (pre=" + preFailedCount +
|
||||
", post=" + postFailedCount + ")",
|
||||
postFailedCount > preFailedCount);
|
||||
assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" +
|
||||
postCompletedCount + ")", postCompletedCount > preCompletedCount);
|
||||
assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" +
|
||||
postFailedCount + ")", postFailedCount > preFailedCount);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,11 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -35,12 +40,17 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -57,34 +67,51 @@ public class TestMinorCompaction {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMinorCompaction.class);
|
||||
|
||||
@Rule public TestName name = new TestName();
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
protected Configuration conf = UTIL.getConfiguration();
|
||||
private static Configuration CONF = UTIL.getConfiguration();
|
||||
|
||||
private HRegion r = null;
|
||||
private TableDescriptor htd = null;
|
||||
private int compactionThreshold;
|
||||
private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
|
||||
final private byte[] col1, col2;
|
||||
private static int COMPACTION_THRESHOLD;
|
||||
private static byte[] FIRST_ROW_BYTES, SECOND_ROW_BYTES, THIRD_ROW_BYTES;
|
||||
private static byte[] COL1, COL2;
|
||||
|
||||
/** constructor */
|
||||
public TestMinorCompaction() {
|
||||
super();
|
||||
public static final class MyCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||
|
||||
public MyCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||
super(conf, storeConfigInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles,
|
||||
List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
|
||||
boolean forceMajor) throws IOException {
|
||||
return new CompactionRequestImpl(
|
||||
candidateFiles.stream().filter(f -> !filesCompacting.contains(f))
|
||||
.limit(COMPACTION_THRESHOLD).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() {
|
||||
// Set cache flush size to 1MB
|
||||
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
|
||||
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
|
||||
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
|
||||
CONF.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
|
||||
CONF.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
|
||||
COMPACTION_THRESHOLD = CONF.getInt("hbase.hstore.compactionThreshold", 3);
|
||||
CONF.setClass(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, MyCompactionPolicy.class,
|
||||
RatioBasedCompactionPolicy.class);
|
||||
|
||||
firstRowBytes = START_KEY_BYTES;
|
||||
secondRowBytes = START_KEY_BYTES.clone();
|
||||
FIRST_ROW_BYTES = START_KEY_BYTES;
|
||||
SECOND_ROW_BYTES = START_KEY_BYTES.clone();
|
||||
// Increment the least significant character so we get to next row.
|
||||
secondRowBytes[START_KEY_BYTES.length - 1]++;
|
||||
thirdRowBytes = START_KEY_BYTES.clone();
|
||||
thirdRowBytes[START_KEY_BYTES.length - 1] =
|
||||
(byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
|
||||
col1 = Bytes.toBytes("column1");
|
||||
col2 = Bytes.toBytes("column2");
|
||||
SECOND_ROW_BYTES[START_KEY_BYTES.length - 1]++;
|
||||
THIRD_ROW_BYTES = START_KEY_BYTES.clone();
|
||||
THIRD_ROW_BYTES[START_KEY_BYTES.length - 1] =
|
||||
(byte) (THIRD_ROW_BYTES[START_KEY_BYTES.length - 1] + 2);
|
||||
COL1 = Bytes.toBytes("column1");
|
||||
COL2 = Bytes.toBytes("column2");
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -97,29 +124,29 @@ public class TestMinorCompaction {
|
|||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
WAL wal = ((HRegion)r).getWAL();
|
||||
((HRegion)r).close();
|
||||
WAL wal = ((HRegion) r).getWAL();
|
||||
((HRegion) r).close();
|
||||
wal.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinorCompactionWithDeleteRow() throws Exception {
|
||||
Delete deleteRow = new Delete(secondRowBytes);
|
||||
Delete deleteRow = new Delete(SECOND_ROW_BYTES);
|
||||
testMinorCompactionWithDelete(deleteRow);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinorCompactionWithDeleteColumn1() throws Exception {
|
||||
Delete dc = new Delete(secondRowBytes);
|
||||
Delete dc = new Delete(SECOND_ROW_BYTES);
|
||||
/* delete all timestamps in the column */
|
||||
dc.addColumns(fam2, col2);
|
||||
dc.addColumns(fam2, COL2);
|
||||
testMinorCompactionWithDelete(dc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinorCompactionWithDeleteColumn2() throws Exception {
|
||||
Delete dc = new Delete(secondRowBytes);
|
||||
dc.addColumn(fam2, col2);
|
||||
Delete dc = new Delete(SECOND_ROW_BYTES);
|
||||
dc.addColumn(fam2, COL2);
|
||||
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
|
||||
* we only delete the latest version. One might expect to see only
|
||||
* versions 1 and 2. HBase differs, and gives us 0, 1 and 2.
|
||||
|
@ -131,15 +158,15 @@ public class TestMinorCompaction {
|
|||
|
||||
@Test
|
||||
public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
|
||||
Delete deleteCF = new Delete(secondRowBytes);
|
||||
Delete deleteCF = new Delete(SECOND_ROW_BYTES);
|
||||
deleteCF.addFamily(fam2);
|
||||
testMinorCompactionWithDelete(deleteCF);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinorCompactionWithDeleteVersion1() throws Exception {
|
||||
Delete deleteVersion = new Delete(secondRowBytes);
|
||||
deleteVersion.addColumns(fam2, col2, 2);
|
||||
Delete deleteVersion = new Delete(SECOND_ROW_BYTES);
|
||||
deleteVersion.addColumns(fam2, COL2, 2);
|
||||
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
|
||||
* We delete versions 0 ... 2. So, we still have one remaining.
|
||||
*/
|
||||
|
@ -148,8 +175,8 @@ public class TestMinorCompaction {
|
|||
|
||||
@Test
|
||||
public void testMinorCompactionWithDeleteVersion2() throws Exception {
|
||||
Delete deleteVersion = new Delete(secondRowBytes);
|
||||
deleteVersion.addColumn(fam2, col2, 1);
|
||||
Delete deleteVersion = new Delete(SECOND_ROW_BYTES);
|
||||
deleteVersion.addColumn(fam2, COL2, 1);
|
||||
/*
|
||||
* the table has 4 versions: 0, 1, 2, and 3.
|
||||
* We delete 1.
|
||||
|
@ -171,22 +198,22 @@ public class TestMinorCompaction {
|
|||
private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete)
|
||||
throws Exception {
|
||||
Table loader = new RegionAsTable(r);
|
||||
for (int i = 0; i < compactionThreshold + 1; i++) {
|
||||
HTestConst.addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes,
|
||||
thirdRowBytes, i);
|
||||
HTestConst.addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes,
|
||||
thirdRowBytes, i);
|
||||
HTestConst.addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes,
|
||||
thirdRowBytes, i);
|
||||
HTestConst.addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes,
|
||||
thirdRowBytes, i);
|
||||
for (int i = 0; i < COMPACTION_THRESHOLD + 1; i++) {
|
||||
HTestConst.addContent(loader, Bytes.toString(fam1), Bytes.toString(COL1), FIRST_ROW_BYTES,
|
||||
THIRD_ROW_BYTES, i);
|
||||
HTestConst.addContent(loader, Bytes.toString(fam1), Bytes.toString(COL2), FIRST_ROW_BYTES,
|
||||
THIRD_ROW_BYTES, i);
|
||||
HTestConst.addContent(loader, Bytes.toString(fam2), Bytes.toString(COL1), FIRST_ROW_BYTES,
|
||||
THIRD_ROW_BYTES, i);
|
||||
HTestConst.addContent(loader, Bytes.toString(fam2), Bytes.toString(COL2), FIRST_ROW_BYTES,
|
||||
THIRD_ROW_BYTES, i);
|
||||
r.flush(true);
|
||||
}
|
||||
|
||||
Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).readVersions(100));
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).readVersions(100));
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
Result result = r.get(new Get(FIRST_ROW_BYTES).addColumn(fam1, COL1).readVersions(100));
|
||||
assertEquals(COMPACTION_THRESHOLD, result.size());
|
||||
result = r.get(new Get(SECOND_ROW_BYTES).addColumn(fam2, COL2).readVersions(100));
|
||||
assertEquals(COMPACTION_THRESHOLD, result.size());
|
||||
|
||||
// Now add deletes to memstore and then flush it. That will put us over
|
||||
// the compaction threshold of 3 store files. Compacting these store files
|
||||
|
@ -195,28 +222,30 @@ public class TestMinorCompaction {
|
|||
r.delete(delete);
|
||||
|
||||
// Make sure that we have only deleted family2 from secondRowBytes
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).readVersions(100));
|
||||
result = r.get(new Get(SECOND_ROW_BYTES).addColumn(fam2, COL2).readVersions(100));
|
||||
assertEquals(expectedResultsAfterDelete, result.size());
|
||||
// but we still have firstrow
|
||||
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).readVersions(100));
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
result = r.get(new Get(FIRST_ROW_BYTES).addColumn(fam1, COL1).readVersions(100));
|
||||
assertEquals(COMPACTION_THRESHOLD, result.size());
|
||||
|
||||
r.flush(true);
|
||||
// should not change anything.
|
||||
// Let us check again
|
||||
|
||||
// Make sure that we have only deleted family2 from secondRowBytes
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).readVersions(100));
|
||||
result = r.get(new Get(SECOND_ROW_BYTES).addColumn(fam2, COL2).readVersions(100));
|
||||
assertEquals(expectedResultsAfterDelete, result.size());
|
||||
// but we still have firstrow
|
||||
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).readVersions(100));
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
result = r.get(new Get(FIRST_ROW_BYTES).addColumn(fam1, COL1).readVersions(100));
|
||||
assertEquals(COMPACTION_THRESHOLD, result.size());
|
||||
|
||||
// do a compaction
|
||||
HStore store2 = r.getStore(fam2);
|
||||
int numFiles1 = store2.getStorefiles().size();
|
||||
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
|
||||
((HStore)store2).compactRecentForTestingAssumingDefaultPolicy(compactionThreshold); // = 3
|
||||
assertTrue("Was expecting to see 4 store files", numFiles1 > COMPACTION_THRESHOLD); // > 3
|
||||
Optional<CompactionContext> compaction = store2.requestCompaction();
|
||||
assertTrue(compaction.isPresent());
|
||||
store2.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null); // = 3
|
||||
int numFiles2 = store2.getStorefiles().size();
|
||||
// Check that we did compact
|
||||
assertTrue("Number of store files should go down", numFiles1 > numFiles2);
|
||||
|
@ -224,10 +253,10 @@ public class TestMinorCompaction {
|
|||
assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
|
||||
|
||||
// Make sure that we have only deleted family2 from secondRowBytes
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).readVersions(100));
|
||||
result = r.get(new Get(SECOND_ROW_BYTES).addColumn(fam2, COL2).readVersions(100));
|
||||
assertEquals(expectedResultsAfterDelete, result.size());
|
||||
// but we still have firstrow
|
||||
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).readVersions(100));
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
result = r.get(new Get(FIRST_ROW_BYTES).addColumn(fam1, COL1).readVersions(100));
|
||||
assertEquals(COMPACTION_THRESHOLD, result.size());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue