HBASE-3048 unify code for major/minor compactions (Amit via jgray)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1032626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ea5395d62b
commit
4a4df61776
|
@ -1,5 +1,5 @@
|
|||
HBase Change Log
|
||||
Release 0.21.0 - Unreleased
|
||||
Release 0.90.0 - Unreleased
|
||||
INCOMPATIBLE CHANGES
|
||||
HBASE-1822 Remove the deprecated APIs
|
||||
HBASE-1848 Fixup shell for HBASE-1822
|
||||
|
@ -1114,6 +1114,7 @@ Release 0.21.0 - Unreleased
|
|||
without losing important messages
|
||||
HBASE-3201 Add accounting of empty regioninfo_qualifier rows in meta to
|
||||
hbasefsck.
|
||||
HBASE-3048 unify code for major/minor compactions (Amit via jgray)
|
||||
|
||||
|
||||
NEW FEATURES
|
||||
|
|
|
@ -1,138 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A scanner that does a minor compaction at the same time. Doesn't need to
|
||||
* implement ChangedReadersObserver, since it doesn't scan memstore, only store files
|
||||
* and optionally the memstore-snapshot.
|
||||
*/
|
||||
public class MinorCompactingStoreScanner implements KeyValueScanner, InternalScanner {
|
||||
private KeyValueHeap heap;
|
||||
private KeyValue.KVComparator comparator;
|
||||
|
||||
MinorCompactingStoreScanner(Store store, List<? extends KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
comparator = store.comparator;
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
|
||||
for (KeyValueScanner scanner : scanners ) {
|
||||
scanner.seek(firstKv);
|
||||
}
|
||||
heap = new KeyValueHeap(scanners, store.comparator);
|
||||
}
|
||||
|
||||
MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator,
|
||||
List<? extends KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
this.comparator = comparator;
|
||||
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
|
||||
for (KeyValueScanner scanner : scanners ) {
|
||||
scanner.seek(firstKv);
|
||||
}
|
||||
|
||||
heap = new KeyValueHeap(scanners, comparator);
|
||||
}
|
||||
|
||||
public KeyValue peek() {
|
||||
return heap.peek();
|
||||
}
|
||||
|
||||
public KeyValue next() throws IOException {
|
||||
return heap.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seek(KeyValue key) {
|
||||
// cant seek.
|
||||
throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
|
||||
}
|
||||
|
||||
public boolean reseek(KeyValue key) {
|
||||
return seek(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* High performance merge scan.
|
||||
* @param writer
|
||||
* @return True if more.
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean next(StoreFile.Writer writer) throws IOException {
|
||||
KeyValue row = heap.peek();
|
||||
if (row == null) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
KeyValue kv;
|
||||
while ((kv = heap.peek()) != null) {
|
||||
// check to see if this is a different row
|
||||
if (comparator.compareRows(row, kv) != 0) {
|
||||
// reached next row
|
||||
return true;
|
||||
}
|
||||
writer.append(heap.next());
|
||||
}
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results) throws IOException {
|
||||
KeyValue row = heap.peek();
|
||||
if (row == null) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
KeyValue kv;
|
||||
while ((kv = heap.peek()) != null) {
|
||||
// check to see if this is a different row
|
||||
if (comparator.compareRows(row, kv) != 0) {
|
||||
// reached next row
|
||||
return true;
|
||||
}
|
||||
results.add(heap.next());
|
||||
}
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, int limit) throws IOException {
|
||||
// should not use limits with minor compacting store scanner
|
||||
return next(results);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
heap.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceID() {
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -45,6 +45,7 @@ public class ScanQueryMatcher {
|
|||
|
||||
/** Keeps track of deletes */
|
||||
protected DeleteTracker deletes;
|
||||
protected boolean retainDeletesInOutput;
|
||||
|
||||
/** Keeps track of columns and versions */
|
||||
protected ColumnTracker columns;
|
||||
|
@ -71,7 +72,8 @@ public class ScanQueryMatcher {
|
|||
*/
|
||||
public ScanQueryMatcher(Scan scan, byte [] family,
|
||||
NavigableSet<byte[]> columns, long ttl,
|
||||
KeyValue.KeyComparator rowComparator, int maxVersions) {
|
||||
KeyValue.KeyComparator rowComparator, int maxVersions,
|
||||
boolean retainDeletesInOutput) {
|
||||
this.tr = scan.getTimeRange();
|
||||
this.oldestStamp = System.currentTimeMillis() - ttl;
|
||||
this.rowComparator = rowComparator;
|
||||
|
@ -79,6 +81,7 @@ public class ScanQueryMatcher {
|
|||
this.stopRow = scan.getStopRow();
|
||||
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
|
||||
this.filter = scan.getFilter();
|
||||
this.retainDeletesInOutput = retainDeletesInOutput;
|
||||
|
||||
// Single branch to deal with two types of reads (columns vs all in family)
|
||||
if (columns == null || columns.size() == 0) {
|
||||
|
@ -90,6 +93,13 @@ public class ScanQueryMatcher {
|
|||
this.columns = new ExplicitColumnTracker(columns,maxVersions);
|
||||
}
|
||||
}
|
||||
public ScanQueryMatcher(Scan scan, byte [] family,
|
||||
NavigableSet<byte[]> columns, long ttl,
|
||||
KeyValue.KeyComparator rowComparator, int maxVersions) {
|
||||
/* By default we will not include deletes */
|
||||
/* deletes are included explicitly (for minor compaction) */
|
||||
this(scan, family, columns, ttl, rowComparator, maxVersions, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if the caller should do one of several things:
|
||||
|
@ -159,7 +169,12 @@ public class ScanQueryMatcher {
|
|||
this.deletes.add(bytes, offset, qualLength, timestamp, type);
|
||||
// Can't early out now, because DelFam come before any other keys
|
||||
}
|
||||
return MatchCode.SKIP;
|
||||
if (retainDeletesInOutput) {
|
||||
return MatchCode.INCLUDE;
|
||||
}
|
||||
else {
|
||||
return MatchCode.SKIP;
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.deletes.isEmpty() &&
|
||||
|
@ -356,4 +371,4 @@ public class ScanQueryMatcher {
|
|||
*/
|
||||
SEEK_NEXT_USING_HINT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -709,6 +709,28 @@ public class Store implements HeapSize {
|
|||
return checkSplit(forceSplit);
|
||||
}
|
||||
|
||||
/*
|
||||
* Compact the most recent N files. Essentially a hook for testing.
|
||||
*/
|
||||
protected void compactRecent(int N) throws IOException {
|
||||
synchronized(compactLock) {
|
||||
List<StoreFile> filesToCompact = this.storefiles;
|
||||
int count = filesToCompact.size();
|
||||
if (N > count) {
|
||||
throw new RuntimeException("Not enough files");
|
||||
}
|
||||
|
||||
filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
|
||||
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
|
||||
boolean majorcompaction = (N == count);
|
||||
|
||||
// Ready to go. Have list of files to compact.
|
||||
StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
|
||||
// Move the compaction into place.
|
||||
StoreFile sf = completeCompaction(filesToCompact, writer);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @param files
|
||||
* @return True if any of the files in <code>files</code> are References.
|
||||
|
@ -843,13 +865,12 @@ public class Store implements HeapSize {
|
|||
// where all source cells are expired or deleted.
|
||||
StoreFile.Writer writer = null;
|
||||
try {
|
||||
// NOTE: the majority of the time for a compaction is spent in this section
|
||||
if (majorCompaction) {
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(family.getMaxVersions());
|
||||
scanner = new StoreScanner(this, scan, scanners);
|
||||
/* include deletes, unless we are doing a major compaction */
|
||||
scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
|
||||
int bytesWritten = 0;
|
||||
// since scanner.next() can return 'false' but still be delivering data,
|
||||
// we have to use a do/while loop.
|
||||
|
@ -888,39 +909,6 @@ public class Store implements HeapSize {
|
|||
scanner.close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
MinorCompactingStoreScanner scanner = null;
|
||||
try {
|
||||
scanner = new MinorCompactingStoreScanner(this, scanners);
|
||||
if (scanner.peek() != null) {
|
||||
writer = createWriterInTmp(maxKeyCount);
|
||||
int bytesWritten = 0;
|
||||
while (scanner.peek() != null) {
|
||||
KeyValue kv = scanner.next();
|
||||
writer.append(kv);
|
||||
|
||||
// check periodically to see if a system stop is requested
|
||||
if (Store.closeCheckInterval > 0) {
|
||||
bytesWritten += kv.getLength();
|
||||
if (bytesWritten > Store.closeCheckInterval) {
|
||||
bytesWritten = 0;
|
||||
if (!this.region.areWritesEnabled()) {
|
||||
writer.close();
|
||||
fs.delete(writer.getPath(), false);
|
||||
throw new InterruptedIOException(
|
||||
"Aborting compaction of store " + this +
|
||||
" in region " + this.region +
|
||||
" because user requested stop.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
writer.appendMetadata(maxId, majorCompaction);
|
||||
|
|
|
@ -58,12 +58,14 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
* @param columns which columns we are scanning
|
||||
* @throws IOException
|
||||
*/
|
||||
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) throws IOException {
|
||||
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
|
||||
throws IOException {
|
||||
this.store = store;
|
||||
this.cacheBlocks = scan.getCacheBlocks();
|
||||
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
|
||||
columns, store.ttl, store.comparator.getRawComparator(),
|
||||
store.versionsToReturn(scan.getMaxVersions()));
|
||||
store.versionsToReturn(scan.getMaxVersions()),
|
||||
false);
|
||||
|
||||
this.isGet = scan.isGetScan();
|
||||
// pass columns = try to filter out unnecessary ScanFiles
|
||||
|
@ -89,14 +91,15 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
* @param scan the spec
|
||||
* @param scanners ancilliary scanners
|
||||
*/
|
||||
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
|
||||
boolean retainDeletesInOutput)
|
||||
throws IOException {
|
||||
this.store = store;
|
||||
this.cacheBlocks = false;
|
||||
this.isGet = false;
|
||||
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
|
||||
null, store.ttl, store.comparator.getRawComparator(),
|
||||
store.versionsToReturn(scan.getMaxVersions()));
|
||||
store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
|
||||
|
||||
// Seek all scanners to the initial key
|
||||
for(KeyValueScanner scanner : scanners) {
|
||||
|
@ -117,7 +120,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
this.isGet = false;
|
||||
this.cacheBlocks = scan.getCacheBlocks();
|
||||
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
|
||||
comparator.getRawComparator(), scan.getMaxVersions());
|
||||
comparator.getRawComparator(), scan.getMaxVersions(), false);
|
||||
|
||||
// Seek all scanners to the initial key
|
||||
for(KeyValueScanner scanner : scanners) {
|
||||
|
|
|
@ -58,18 +58,31 @@ public class TestCompaction extends HBaseTestCase {
|
|||
private static final byte [] COLUMN_FAMILY = fam1;
|
||||
private final byte [] STARTROW = Bytes.toBytes(START_KEY);
|
||||
private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
|
||||
private static final int COMPACTION_THRESHOLD = MAXVERSIONS;
|
||||
private int compactionThreshold;
|
||||
private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
|
||||
final private byte[] col1, col2;
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
|
||||
/** constructor */
|
||||
public TestCompaction() {
|
||||
public TestCompaction() throws Exception {
|
||||
super();
|
||||
|
||||
// Set cache flush size to 1MB
|
||||
conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024);
|
||||
conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
|
||||
this.cluster = null;
|
||||
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
|
||||
|
||||
firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
||||
secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
||||
// Increment the least significant character so we get to next row.
|
||||
secondRowBytes[START_KEY_BYTES.length - 1]++;
|
||||
thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
||||
thirdRowBytes[START_KEY_BYTES.length - 1]++;
|
||||
thirdRowBytes[START_KEY_BYTES.length - 1]++;
|
||||
col1 = "column1".getBytes(HConstants.UTF8_ENCODING);
|
||||
col2 = "column2".getBytes(HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,7 +115,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
*/
|
||||
public void testMajorCompactingToNoOutput() throws IOException {
|
||||
createStoreFile(r);
|
||||
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
|
||||
for (int i = 0; i < compactionThreshold; i++) {
|
||||
createStoreFile(r);
|
||||
}
|
||||
// Now delete everything.
|
||||
|
@ -133,43 +146,35 @@ public class TestCompaction extends HBaseTestCase {
|
|||
* Assert deletes get cleaned up.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testCompaction() throws Exception {
|
||||
public void testMajorCompaction() throws Exception {
|
||||
createStoreFile(r);
|
||||
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
|
||||
for (int i = 0; i < compactionThreshold; i++) {
|
||||
createStoreFile(r);
|
||||
}
|
||||
// Add more content. Now there are about 5 versions of each column.
|
||||
// Default is that there only 3 (MAXVERSIONS) versions allowed per column.
|
||||
// Assert == 3 when we ask for versions.
|
||||
// Add more content.
|
||||
addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
|
||||
|
||||
|
||||
// FIX!!
|
||||
// Cell[] cellValues =
|
||||
// Cell.createSingleCellArray(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
||||
// Now there are about 5 versions of each column.
|
||||
// Default is that there only 3 (MAXVERSIONS) versions allowed per column.
|
||||
//
|
||||
// Assert == 3 when we ask for versions.
|
||||
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
|
||||
// Assert that I can get 3 versions since it is the max I should get
|
||||
assertEquals(COMPACTION_THRESHOLD, result.size());
|
||||
// assertEquals(cellValues.length, 3);
|
||||
r.flushcache();
|
||||
r.compactStores();
|
||||
// Always 3 versions if that is what max versions is.
|
||||
byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
||||
r.compactStores(true);
|
||||
|
||||
// look at the second row
|
||||
// Increment the least significant character so we get to next row.
|
||||
byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
||||
secondRowBytes[START_KEY_BYTES.length - 1]++;
|
||||
// FIX
|
||||
|
||||
// Always 3 versions if that is what max versions is.
|
||||
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
|
||||
// Assert that I can get 3 versions since it is the max I should get
|
||||
assertEquals(3, result.size());
|
||||
//
|
||||
// cellValues = Cell.createSingleCellArray(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100/*Too many*/));
|
||||
// LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " +
|
||||
// cellValues.length);
|
||||
// assertTrue(cellValues.length == 3);
|
||||
|
||||
// Now add deletes to memstore and then flush it. That will put us over
|
||||
// Now add deletes to memstore and then flush it.
|
||||
// That will put us over
|
||||
// the compaction threshold of 3 store files. Compacting these store files
|
||||
// should result in a compacted store file that has no references to the
|
||||
// deleted row.
|
||||
|
@ -179,52 +184,33 @@ public class TestCompaction extends HBaseTestCase {
|
|||
r.delete(delete, null, true);
|
||||
|
||||
// Assert deleted.
|
||||
|
||||
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
|
||||
assertTrue(result.isEmpty());
|
||||
|
||||
assertTrue("Second row should have been deleted", result.isEmpty());
|
||||
|
||||
r.flushcache();
|
||||
|
||||
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
|
||||
assertTrue(result.isEmpty());
|
||||
assertTrue("Second row should have been deleted", result.isEmpty());
|
||||
|
||||
// Add a bit of data and flush. Start adding at 'bbb'.
|
||||
createSmallerStoreFile(this.r);
|
||||
r.flushcache();
|
||||
// Assert that the second row is still deleted.
|
||||
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
|
||||
assertTrue(result.isEmpty());
|
||||
assertTrue("Second row should still be deleted", result.isEmpty());
|
||||
|
||||
// Force major compaction.
|
||||
r.compactStores(true);
|
||||
assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
|
||||
|
||||
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
|
||||
assertTrue(result.isEmpty());
|
||||
assertTrue("Second row should still be deleted", result.isEmpty());
|
||||
|
||||
// Make sure the store files do have some 'aaa' keys in them -- exactly 3.
|
||||
// Also, that compacted store files do not have any secondRowBytes because
|
||||
// they were deleted.
|
||||
int count = 0;
|
||||
boolean containsStartRow = false;
|
||||
for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
|
||||
HFileScanner scanner = f.getReader().getScanner(false, false);
|
||||
scanner.seekTo();
|
||||
do {
|
||||
byte [] row = scanner.getKeyValue().getRow();
|
||||
if (Bytes.equals(row, STARTROW)) {
|
||||
containsStartRow = true;
|
||||
count++;
|
||||
} else {
|
||||
// After major compaction, should be none of these rows in compacted
|
||||
// file.
|
||||
assertFalse(Bytes.equals(row, secondRowBytes));
|
||||
}
|
||||
} while(scanner.next());
|
||||
}
|
||||
assertTrue(containsStartRow);
|
||||
assertTrue(count == 3);
|
||||
|
||||
verifyCounts(3,0);
|
||||
|
||||
// Multiple versions allowed for an entry, so the delete isn't enough
|
||||
// Lower TTL and expire to ensure that all our entries have been wiped
|
||||
final int ttlInSeconds = 1;
|
||||
|
@ -234,13 +220,145 @@ public class TestCompaction extends HBaseTestCase {
|
|||
Thread.sleep(ttlInSeconds * 1000);
|
||||
|
||||
r.compactStores(true);
|
||||
count = count();
|
||||
assertTrue(count == 0);
|
||||
int count = count();
|
||||
assertTrue("Should not see anything after TTL has expired", count == 0);
|
||||
}
|
||||
|
||||
public void testMinorCompactionWithDeleteRow() throws Exception {
|
||||
Delete deleteRow = new Delete(secondRowBytes);
|
||||
testMinorCompactionWithDelete(deleteRow);
|
||||
}
|
||||
public void testMinorCompactionWithDeleteColumn1() throws Exception {
|
||||
Delete dc = new Delete(secondRowBytes);
|
||||
/* delete all timestamps in the column */
|
||||
dc.deleteColumns(fam2, col2);
|
||||
testMinorCompactionWithDelete(dc);
|
||||
}
|
||||
public void testMinorCompactionWithDeleteColumn2() throws Exception {
|
||||
Delete dc = new Delete(secondRowBytes);
|
||||
dc.deleteColumn(fam2, col2);
|
||||
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
|
||||
* we only delete the latest version. One might expect to see only
|
||||
* versions 1 and 2. HBase differs, and gives us 0, 1 and 2.
|
||||
* This is okay as well. Since there was no compaction done before the
|
||||
* delete, version 0 seems to stay on.
|
||||
*/
|
||||
//testMinorCompactionWithDelete(dc, 2);
|
||||
testMinorCompactionWithDelete(dc, 3);
|
||||
}
|
||||
public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
|
||||
Delete deleteCF = new Delete(secondRowBytes);
|
||||
deleteCF.deleteFamily(fam2);
|
||||
testMinorCompactionWithDelete(deleteCF);
|
||||
}
|
||||
public void testMinorCompactionWithDeleteVersion1() throws Exception {
|
||||
Delete deleteVersion = new Delete(secondRowBytes);
|
||||
deleteVersion.deleteColumns(fam2, col2, 2);
|
||||
/* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
|
||||
* We delete versions 0 ... 2. So, we still have one remaining.
|
||||
*/
|
||||
testMinorCompactionWithDelete(deleteVersion, 1);
|
||||
}
|
||||
public void testMinorCompactionWithDeleteVersion2() throws Exception {
|
||||
Delete deleteVersion = new Delete(secondRowBytes);
|
||||
deleteVersion.deleteColumn(fam2, col2, 1);
|
||||
/*
|
||||
* the table has 4 versions: 0, 1, 2, and 3.
|
||||
* 0 does not count.
|
||||
* We delete 1.
|
||||
* Should have 2 remaining.
|
||||
*/
|
||||
testMinorCompactionWithDelete(deleteVersion, 2);
|
||||
}
|
||||
|
||||
/*
|
||||
* A helper function to test the minor compaction algorithm. We check that
|
||||
* the delete markers are left behind. Takes delete as an argument, which
|
||||
* can be any delete (row, column, columnfamliy etc), that essentially
|
||||
* deletes row2 and column2. row1 and column1 should be undeleted
|
||||
*/
|
||||
private void testMinorCompactionWithDelete(Delete delete) throws Exception {
|
||||
testMinorCompactionWithDelete(delete, 0);
|
||||
}
|
||||
private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception {
|
||||
HRegionIncommon loader = new HRegionIncommon(r);
|
||||
for (int i = 0; i < compactionThreshold + 1; i++) {
|
||||
addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
|
||||
addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
|
||||
addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
|
||||
addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
|
||||
r.flushcache();
|
||||
}
|
||||
|
||||
Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
|
||||
// Now add deletes to memstore and then flush it. That will put us over
|
||||
// the compaction threshold of 3 store files. Compacting these store files
|
||||
// should result in a compacted store file that has no references to the
|
||||
// deleted row.
|
||||
r.delete(delete, null, true);
|
||||
|
||||
// Make sure that we have only deleted family2 from secondRowBytes
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
|
||||
assertEquals(expectedResultsAfterDelete, result.size());
|
||||
// but we still have firstrow
|
||||
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
|
||||
r.flushcache();
|
||||
// should not change anything.
|
||||
// Let us check again
|
||||
|
||||
// Make sure that we have only deleted family2 from secondRowBytes
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
|
||||
assertEquals(expectedResultsAfterDelete, result.size());
|
||||
// but we still have firstrow
|
||||
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
|
||||
// do a compaction
|
||||
Store store2 = this.r.stores.get(fam2);
|
||||
int numFiles1 = store2.getStorefiles().size();
|
||||
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
|
||||
store2.compactRecent(compactionThreshold); // = 3
|
||||
int numFiles2 = store2.getStorefiles().size();
|
||||
// Check that we did compact
|
||||
assertTrue("Number of store files should go down", numFiles1 > numFiles2);
|
||||
// Check that it was a minor compaction.
|
||||
assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
|
||||
|
||||
// Make sure that we have only deleted family2 from secondRowBytes
|
||||
result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
|
||||
assertEquals(expectedResultsAfterDelete, result.size());
|
||||
// but we still have firstrow
|
||||
result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
}
|
||||
|
||||
private void verifyCounts(int countRow1, int countRow2) throws Exception {
|
||||
int count1 = 0;
|
||||
int count2 = 0;
|
||||
for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
|
||||
HFileScanner scanner = f.getReader().getScanner(false, false);
|
||||
scanner.seekTo();
|
||||
do {
|
||||
byte [] row = scanner.getKeyValue().getRow();
|
||||
if (Bytes.equals(row, STARTROW)) {
|
||||
count1++;
|
||||
} else if(Bytes.equals(row, secondRowBytes)) {
|
||||
count2++;
|
||||
}
|
||||
} while(scanner.next());
|
||||
}
|
||||
assertEquals(countRow1,count1);
|
||||
assertEquals(countRow2,count2);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verify that you can stop a long-running compaction
|
||||
* Verify that you can stop a long-running compaction
|
||||
* (used during RS shutdown)
|
||||
* @throws Exception
|
||||
*/
|
||||
|
@ -253,9 +371,9 @@ public class TestCompaction extends HBaseTestCase {
|
|||
|
||||
try {
|
||||
// Create a couple store files w/ 15KB (over 10KB interval)
|
||||
int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD);
|
||||
int jmax = (int) Math.ceil(15.0/compactionThreshold);
|
||||
byte [] pad = new byte[1000]; // 1 KB chunk
|
||||
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
|
||||
for (int i = 0; i < compactionThreshold; i++) {
|
||||
HRegionIncommon loader = new HRegionIncommon(r);
|
||||
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
|
||||
for (int j = 0; j < jmax; j++) {
|
||||
|
@ -265,7 +383,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
loader.put(p);
|
||||
loader.flushcache();
|
||||
}
|
||||
|
||||
|
||||
HRegion spyR = spy(r);
|
||||
doAnswer(new Answer() {
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
|
@ -276,10 +394,10 @@ public class TestCompaction extends HBaseTestCase {
|
|||
|
||||
// force a minor compaction, but not before requesting a stop
|
||||
spyR.compactStores();
|
||||
|
||||
// ensure that the compaction stopped, all old files are intact,
|
||||
|
||||
// ensure that the compaction stopped, all old files are intact,
|
||||
Store s = r.stores.get(COLUMN_FAMILY);
|
||||
assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount());
|
||||
assertEquals(compactionThreshold, s.getStorefilesCount());
|
||||
assertTrue(s.getStorefilesSize() > 15*1000);
|
||||
// and no new store files persisted past compactStores()
|
||||
FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
|
||||
|
@ -291,7 +409,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
Store.closeCheckInterval = origWI;
|
||||
|
||||
// Delete all Store information once done using
|
||||
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
|
||||
for (int i = 0; i < compactionThreshold; i++) {
|
||||
Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
|
||||
byte [][] famAndQf = {COLUMN_FAMILY, null};
|
||||
delete.deleteFamily(famAndQf[0]);
|
||||
|
@ -306,12 +424,12 @@ public class TestCompaction extends HBaseTestCase {
|
|||
store.ttl = ttlInSeconds * 1000;
|
||||
}
|
||||
Thread.sleep(ttlInSeconds * 1000);
|
||||
|
||||
|
||||
r.compactStores(true);
|
||||
assertEquals(0, count());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private int count() throws IOException {
|
||||
int count = 0;
|
||||
for (StoreFile f: this.r.stores.
|
||||
|
|
|
@ -1,90 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture;
|
||||
|
||||
public class TestMinorCompactingStoreScanner extends TestCase {
|
||||
|
||||
public void testDeleteFamiliy() throws IOException {
|
||||
KeyValue[] kvs = new KeyValue[] {
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 100, KeyValue.Type.DeleteFamily, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "e", 11, KeyValue.Type.DeleteColumn, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "g", 11, KeyValue.Type.Delete, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
};
|
||||
List<KeyValueScanner> scanners = scanFixture(kvs);
|
||||
|
||||
InternalScanner scan =
|
||||
new MinorCompactingStoreScanner("cf", KeyValue.COMPARATOR, scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertTrue(scan.next(results));
|
||||
assertEquals(11, results.size());
|
||||
assertEquals(kvs[0], results.get(0));
|
||||
assertEquals(kvs[1], results.get(1));
|
||||
assertEquals(kvs[2], results.get(2));
|
||||
assertEquals(kvs[3], results.get(3));
|
||||
assertEquals(kvs[5], results.get(4));
|
||||
assertEquals(kvs[4], results.get(5));
|
||||
assertEquals(kvs[6], results.get(6));
|
||||
assertEquals(kvs[8], results.get(7));
|
||||
assertEquals(kvs[7], results.get(8));
|
||||
assertEquals(kvs[9], results.get(9));
|
||||
assertEquals(kvs[10], results.get(10));
|
||||
|
||||
results.clear();
|
||||
assertFalse(scan.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(kvs[kvs.length-1], results.get(0));
|
||||
}
|
||||
|
||||
public void testDeleteVersion() throws IOException {
|
||||
KeyValue[] kvs = new KeyValue[] {
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 15, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 10, KeyValue.Type.Delete, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 10, KeyValue.Type.Put, "dont-care")
|
||||
};
|
||||
List<KeyValueScanner> scanners = scanFixture(kvs);
|
||||
InternalScanner scan =
|
||||
new MinorCompactingStoreScanner("cf", KeyValue.COMPARATOR, scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertFalse(scan.next(results));
|
||||
assertEquals(3, results.size());
|
||||
assertEquals(kvs[0], results.get(0));
|
||||
assertEquals(kvs[1], results.get(1));
|
||||
assertEquals(kvs[2], results.get(2));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue