HBASE-947 [Optimization] Major compaction should remove deletes as well as the deleted cell
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@718430 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6aec3c3e37
commit
9b4ca73d59
|
@ -124,6 +124,8 @@ Release 0.19.0 - Unreleased
|
||||||
HBASE-999 Up versions on historian and keep history of deleted regions for a
|
HBASE-999 Up versions on historian and keep history of deleted regions for a
|
||||||
while rather than delete immediately
|
while rather than delete immediately
|
||||||
HBASE-938 Major compaction period is not checked periodically
|
HBASE-938 Major compaction period is not checked periodically
|
||||||
|
HBASE-947 [Optimization] Major compaction should remove deletes as well as
|
||||||
|
the deleted cell
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
||||||
|
|
|
@ -435,7 +435,6 @@ public class HStore implements HConstants {
|
||||||
curfile = new HStoreFile(conf, fs, basedir, this.info,
|
curfile = new HStoreFile(conf, fs, basedir, this.info,
|
||||||
family.getName(), fid, reference);
|
family.getName(), fid, reference);
|
||||||
long storeSeqId = -1;
|
long storeSeqId = -1;
|
||||||
boolean majorCompaction = false;
|
|
||||||
try {
|
try {
|
||||||
storeSeqId = curfile.loadInfo(fs);
|
storeSeqId = curfile.loadInfo(fs);
|
||||||
if (storeSeqId > this.maxSeqId) {
|
if (storeSeqId > this.maxSeqId) {
|
||||||
|
@ -1043,12 +1042,46 @@ public class HStore implements HConstants {
|
||||||
return nrows;
|
return nrows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param r List to reverse
|
||||||
|
* @return A reversed array of content of <code>readers</code>
|
||||||
|
*/
|
||||||
|
private MapFile.Reader [] reverse(final List<MapFile.Reader> r) {
|
||||||
|
List<MapFile.Reader> copy = new ArrayList<MapFile.Reader>(r);
|
||||||
|
Collections.reverse(copy);
|
||||||
|
return copy.toArray(new MapFile.Reader[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param rdrs List of readers
|
||||||
|
* @param keys Current keys
|
||||||
|
* @param done Which readers are done
|
||||||
|
* @return The lowest current key in passed <code>rdrs</code>
|
||||||
|
*/
|
||||||
|
private int getLowestKey(final MapFile.Reader [] rdrs,
|
||||||
|
final HStoreKey [] keys, final boolean [] done) {
|
||||||
|
int lowestKey = -1;
|
||||||
|
for (int i = 0; i < rdrs.length; i++) {
|
||||||
|
if (done[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (lowestKey < 0) {
|
||||||
|
lowestKey = i;
|
||||||
|
} else {
|
||||||
|
if (keys[i].compareTo(keys[lowestKey]) < 0) {
|
||||||
|
lowestKey = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lowestKey;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Compact a list of MapFile.Readers into MapFile.Writer.
|
* Compact a list of MapFile.Readers into MapFile.Writer.
|
||||||
*
|
*
|
||||||
* We work by iterating through the readers in parallel. We always increment
|
* We work by iterating through the readers in parallel looking at newest
|
||||||
* the lowest-ranked one. Updates to a single row/column will appear ranked
|
* store file first. We always increment the lowest-ranked one. Updates to a
|
||||||
* by timestamp.
|
* single row/column will appear ranked by timestamp.
|
||||||
* @param compactedOut Where to write compaction.
|
* @param compactedOut Where to write compaction.
|
||||||
* @param pReaders List of readers sorted oldest to newest.
|
* @param pReaders List of readers sorted oldest to newest.
|
||||||
* @param majorCompaction True to force a major compaction regardless of
|
* @param majorCompaction True to force a major compaction regardless of
|
||||||
|
@ -1058,14 +1091,12 @@ public class HStore implements HConstants {
|
||||||
private void compact(final MapFile.Writer compactedOut,
|
private void compact(final MapFile.Writer compactedOut,
|
||||||
final List<MapFile.Reader> pReaders, final boolean majorCompaction)
|
final List<MapFile.Reader> pReaders, final boolean majorCompaction)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Reverse order so newest is first.
|
// Reverse order so newest store file is first.
|
||||||
List<MapFile.Reader> copy = new ArrayList<MapFile.Reader>(pReaders);
|
MapFile.Reader[] rdrs = reverse(pReaders);
|
||||||
Collections.reverse(copy);
|
|
||||||
MapFile.Reader[] rdrs = copy.toArray(new MapFile.Reader[0]);
|
|
||||||
try {
|
try {
|
||||||
HStoreKey[] keys = new HStoreKey[rdrs.length];
|
HStoreKey [] keys = new HStoreKey[rdrs.length];
|
||||||
ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
|
ImmutableBytesWritable [] vals = new ImmutableBytesWritable[rdrs.length];
|
||||||
boolean[] done = new boolean[rdrs.length];
|
boolean [] done = new boolean[rdrs.length];
|
||||||
for(int i = 0; i < rdrs.length; i++) {
|
for(int i = 0; i < rdrs.length; i++) {
|
||||||
keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.info);
|
keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.info);
|
||||||
vals[i] = new ImmutableBytesWritable();
|
vals[i] = new ImmutableBytesWritable();
|
||||||
|
@ -1085,56 +1116,67 @@ public class HStore implements HConstants {
|
||||||
|
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
int timesSeen = 0;
|
int timesSeen = 0;
|
||||||
byte [] lastRow = null;
|
HStoreKey lastSeen = new HStoreKey();
|
||||||
byte [] lastColumn = null;
|
HStoreKey lastDelete = null;
|
||||||
while (numDone < done.length) {
|
while (numDone < done.length) {
|
||||||
int smallestKey = -1;
|
// Get lowest key in all store files.
|
||||||
for (int i = 0; i < rdrs.length; i++) {
|
int lowestKey = getLowestKey(rdrs, keys, done);
|
||||||
if (done[i]) {
|
HStoreKey sk = keys[lowestKey];
|
||||||
continue;
|
// If its same row and column as last key, increment times seen.
|
||||||
}
|
if (HStoreKey.equalsTwoRowKeys(info, lastSeen.getRow(), sk.getRow())
|
||||||
if (smallestKey < 0) {
|
&& Bytes.equals(lastSeen.getColumn(), sk.getColumn())) {
|
||||||
smallestKey = i;
|
|
||||||
} else {
|
|
||||||
if (keys[i].compareTo(keys[smallestKey]) < 0) {
|
|
||||||
smallestKey = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
HStoreKey sk = keys[smallestKey];
|
|
||||||
if (HStoreKey.equalsTwoRowKeys(info,lastRow, sk.getRow())
|
|
||||||
&& Bytes.equals(lastColumn, sk.getColumn())) {
|
|
||||||
timesSeen++;
|
timesSeen++;
|
||||||
|
// Reset last delete if not exact timestamp -- lastDelete only stops
|
||||||
|
// exactly the same key making it out to the compacted store file.
|
||||||
|
if (lastDelete != null &&
|
||||||
|
lastDelete.getTimestamp() != sk.getTimestamp()) {
|
||||||
|
lastDelete = null;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
timesSeen = 1;
|
timesSeen = 1;
|
||||||
|
lastDelete = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't write empty rows or columns. Only remove cells on major
|
// Don't write empty rows or columns. Only remove cells on major
|
||||||
// compaction. Remove if expired of > VERSIONS
|
// compaction. Remove if expired of > VERSIONS
|
||||||
if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
|
if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
|
||||||
boolean expired = false;
|
ImmutableBytesWritable value = vals[lowestKey];
|
||||||
if (!majorCompaction ||
|
if (!majorCompaction) {
|
||||||
(timesSeen <= family.getMaxVersions() &&
|
// Write out all values if not a major compaction.
|
||||||
!(expired = isExpired(sk, ttl, now)))) {
|
compactedOut.append(sk, value);
|
||||||
compactedOut.append(sk, vals[smallestKey]);
|
} else {
|
||||||
}
|
boolean expired = false;
|
||||||
if (expired) {
|
boolean deleted = false;
|
||||||
// HBASE-855 remove one from timesSeen because it did not make it
|
if (timesSeen <= family.getMaxVersions() &&
|
||||||
// past expired check -- don't count against max versions.
|
!(expired = isExpired(sk, ttl, now))) {
|
||||||
timesSeen--;
|
// If this value key is same as a deleted key, skip
|
||||||
|
if (lastDelete != null && sk.equals(lastDelete)) {
|
||||||
|
deleted = true;
|
||||||
|
} else if (HLogEdit.isDeleted(value.get())) {
|
||||||
|
// If a deleted value, skip
|
||||||
|
deleted = true;
|
||||||
|
lastDelete = new HStoreKey(sk);
|
||||||
|
} else {
|
||||||
|
compactedOut.append(sk, vals[lowestKey]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (expired || deleted) {
|
||||||
|
// HBASE-855 remove one from timesSeen because it did not make it
|
||||||
|
// past expired check -- don't count against max versions.
|
||||||
|
timesSeen--;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update last-seen items
|
// Update last-seen items
|
||||||
lastRow = sk.getRow();
|
lastSeen = new HStoreKey(sk);
|
||||||
lastColumn = sk.getColumn();
|
|
||||||
|
|
||||||
// Advance the smallest key. If that reader's all finished, then
|
// Advance the smallest key. If that reader's all finished, then
|
||||||
// mark it as done.
|
// mark it as done.
|
||||||
if (!rdrs[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
|
if (!rdrs[lowestKey].next(keys[lowestKey], vals[lowestKey])) {
|
||||||
done[smallestKey] = true;
|
done[lowestKey] = true;
|
||||||
rdrs[smallestKey].close();
|
rdrs[lowestKey].close();
|
||||||
rdrs[smallestKey] = null;
|
rdrs[lowestKey] = null;
|
||||||
numDone++;
|
numDone++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,34 +103,39 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
assertTrue(cellValues.length == 3);
|
assertTrue(cellValues.length == 3);
|
||||||
r.flushcache();
|
r.flushcache();
|
||||||
r.compactStores();
|
r.compactStores();
|
||||||
// Always 3 version if that is what max versions is.
|
// Always 3 versions if that is what max versions is.
|
||||||
byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
||||||
// Increment the least significant character so we get to next row.
|
// Increment the least significant character so we get to next row.
|
||||||
secondRowBytes[START_KEY_BYTES.length - 1]++;
|
secondRowBytes[START_KEY_BYTES.length - 1]++;
|
||||||
cellValues = r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100/*Too many*/);
|
cellValues = r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100/*Too many*/);
|
||||||
LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " + cellValues.length);
|
LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " +
|
||||||
|
cellValues.length);
|
||||||
assertTrue(cellValues.length == 3);
|
assertTrue(cellValues.length == 3);
|
||||||
|
|
||||||
// Now add deletes to memcache and then flush it. That will put us over
|
// Now add deletes to memcache and then flush it. That will put us over
|
||||||
// the compaction threshold of 3 store files. Compacting these store files
|
// the compaction threshold of 3 store files. Compacting these store files
|
||||||
// should result in a compacted store file that has no references to the
|
// should result in a compacted store file that has no references to the
|
||||||
// deleted row.
|
// deleted row.
|
||||||
r.deleteAll(STARTROW, COLUMN_FAMILY_TEXT, System.currentTimeMillis(), null);
|
r.deleteAll(secondRowBytes, COLUMN_FAMILY_TEXT, System.currentTimeMillis(),
|
||||||
|
null);
|
||||||
// Assert deleted.
|
// Assert deleted.
|
||||||
assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
||||||
r.flushcache();
|
r.flushcache();
|
||||||
assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
||||||
// Add a bit of data and flush. Start adding at 'bbb'.
|
// Add a bit of data and flush. Start adding at 'bbb'.
|
||||||
createSmallerStoreFile(this.r);
|
createSmallerStoreFile(this.r);
|
||||||
r.flushcache();
|
r.flushcache();
|
||||||
// Assert that the first row is still deleted.
|
// Assert that the second row is still deleted.
|
||||||
cellValues = r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/);
|
cellValues = r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/);
|
||||||
assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
||||||
// Force major compaction.
|
// Force major compaction.
|
||||||
r.compactStores(true);
|
r.compactStores(true);
|
||||||
assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
|
assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
|
||||||
assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
|
||||||
// Make sure the store files do have some 'aaa' keys in them.
|
// Make sure the store files do have some 'aaa' keys in them -- exactly 3.
|
||||||
|
// Also, that compacted store files do not have any secondRowBytes because
|
||||||
|
// they were deleted.
|
||||||
|
int count = 0;
|
||||||
boolean containsStartRow = false;
|
boolean containsStartRow = false;
|
||||||
for (MapFile.Reader reader: this.r.stores.
|
for (MapFile.Reader reader: this.r.stores.
|
||||||
get(Bytes.mapKey(COLUMN_FAMILY_TEXT_MINUS_COLON)).getReaders()) {
|
get(Bytes.mapKey(COLUMN_FAMILY_TEXT_MINUS_COLON)).getReaders()) {
|
||||||
|
@ -140,14 +145,16 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
while(reader.next(key, val)) {
|
while(reader.next(key, val)) {
|
||||||
if (Bytes.equals(key.getRow(), STARTROW)) {
|
if (Bytes.equals(key.getRow(), STARTROW)) {
|
||||||
containsStartRow = true;
|
containsStartRow = true;
|
||||||
break;
|
count++;
|
||||||
|
} else {
|
||||||
|
// After major compaction, should be none of these rows in compacted
|
||||||
|
// file.
|
||||||
|
assertFalse(Bytes.equals(key.getRow(), secondRowBytes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (containsStartRow) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
assertTrue(containsStartRow);
|
assertTrue(containsStartRow);
|
||||||
|
assertTrue(count == 3);
|
||||||
// Do a simple TTL test.
|
// Do a simple TTL test.
|
||||||
final int ttlInSeconds = 1;
|
final int ttlInSeconds = 1;
|
||||||
for (HStore store: this.r.stores.values()) {
|
for (HStore store: this.r.stores.values()) {
|
||||||
|
@ -155,6 +162,11 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
}
|
}
|
||||||
Thread.sleep(ttlInSeconds * 1000);
|
Thread.sleep(ttlInSeconds * 1000);
|
||||||
r.compactStores(true);
|
r.compactStores(true);
|
||||||
|
count = count();
|
||||||
|
assertTrue(count == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int count() throws IOException {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (MapFile.Reader reader: this.r.stores.
|
for (MapFile.Reader reader: this.r.stores.
|
||||||
get(Bytes.mapKey(COLUMN_FAMILY_TEXT_MINUS_COLON)).getReaders()) {
|
get(Bytes.mapKey(COLUMN_FAMILY_TEXT_MINUS_COLON)).getReaders()) {
|
||||||
|
@ -165,7 +177,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertTrue(count == 0);
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createStoreFile(final HRegion region) throws IOException {
|
private void createStoreFile(final HRegion region) throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue