HBASE-5199 Differential revision 1311 Delete out of TTL store files before compaction selection (Liyin)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1243086 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9793537b80
commit
0e0394bc35
|
@ -54,9 +54,9 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
|
@ -1271,6 +1271,18 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
|
||||
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
|
||||
if (!forcemajor) {
|
||||
// Delete the expired store files before the compaction selection.
|
||||
if (conf.getBoolean("hbase.store.delete.expired.storefile", false)
|
||||
&& (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
|
||||
CompactSelection expiredSelection = compactSelection
|
||||
.selectExpiredStoreFilesToCompact(
|
||||
EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
|
||||
|
||||
// If there is any expired store files, delete them by compaction.
|
||||
if (expiredSelection != null) {
|
||||
return expiredSelection;
|
||||
}
|
||||
}
|
||||
// do not compact old files above a configurable threshold
|
||||
// save all references. we MUST compact them
|
||||
int pos = 0;
|
||||
|
@ -1650,7 +1662,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
/**
|
||||
* @return the number of files in this store
|
||||
*/
|
||||
public int getNumberOfstorefiles() {
|
||||
public int getNumberOfStoreFiles() {
|
||||
return this.storefiles.size();
|
||||
}
|
||||
|
||||
|
|
|
@ -273,7 +273,7 @@ public class StoreFile extends SchemaConfigured {
|
|||
/**
|
||||
* @return Path or null if this StoreFile was made with a Stream.
|
||||
*/
|
||||
Path getPath() {
|
||||
public Path getPath() {
|
||||
return this.path;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
|
@ -82,9 +81,46 @@ public class CompactSelection {
|
|||
}
|
||||
|
||||
/**
|
||||
* If the current hour falls in the off peak times and there are no
|
||||
* outstanding off peak compactions, the current compaction is
|
||||
* promoted to an off peak compaction. Currently only one off peak
|
||||
* Select the expired store files to compact
|
||||
*
|
||||
* @param maxExpiredTimeStamp
|
||||
* The store file will be marked as expired if its max time stamp is
|
||||
* less than this maxExpiredTimeStamp.
|
||||
* @return A CompactSelection contains the expired store files as
|
||||
* filesToCompact
|
||||
*/
|
||||
public CompactSelection selectExpiredStoreFilesToCompact(
|
||||
long maxExpiredTimeStamp) {
|
||||
if (filesToCompact == null || filesToCompact.size() == 0)
|
||||
return null;
|
||||
ArrayList<StoreFile> expiredStoreFiles = null;
|
||||
boolean hasExpiredStoreFiles = false;
|
||||
CompactSelection expiredSFSelection = null;
|
||||
|
||||
for (StoreFile storeFile : this.filesToCompact) {
|
||||
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
|
||||
LOG.info("Deleting the expired store file by compaction: "
|
||||
+ storeFile.getPath() + " whose maxTimeStamp is "
|
||||
+ storeFile.getReader().getMaxTimestamp()
|
||||
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
|
||||
if (!hasExpiredStoreFiles) {
|
||||
expiredStoreFiles = new ArrayList<StoreFile>();
|
||||
hasExpiredStoreFiles = true;
|
||||
}
|
||||
expiredStoreFiles.add(storeFile);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasExpiredStoreFiles) {
|
||||
expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
|
||||
}
|
||||
return expiredSFSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the current hour falls in the off peak times and there are no
|
||||
* outstanding off peak compactions, the current compaction is
|
||||
* promoted to an off peak compaction. Currently only one off peak
|
||||
* compaction is present in the compaction queue.
|
||||
*
|
||||
* @param currentHour
|
||||
|
|
|
@ -4521,11 +4521,11 @@ public class TestFromClientSide {
|
|||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
// compact, net minus two blocks, two hits, no misses
|
||||
System.out.println("Compacting");
|
||||
assertEquals(2, store.getNumberOfstorefiles());
|
||||
assertEquals(2, store.getNumberOfStoreFiles());
|
||||
store.triggerMajorCompaction();
|
||||
region.compactStores();
|
||||
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
|
||||
assertEquals(1, store.getNumberOfstorefiles());
|
||||
assertEquals(1, store.getNumberOfStoreFiles());
|
||||
expectedBlockCount -= 2; // evicted two blocks, cached none
|
||||
assertEquals(expectedBlockCount, cache.getBlockCount());
|
||||
expectedBlockHits += 2;
|
||||
|
@ -4546,12 +4546,12 @@ public class TestFromClientSide {
|
|||
throws InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (start + timeout > System.currentTimeMillis() &&
|
||||
store.getNumberOfstorefiles() != count) {
|
||||
store.getNumberOfStoreFiles() != count) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
System.out.println("start=" + start + ", now=" +
|
||||
System.currentTimeMillis() + ", cur=" + store.getNumberOfstorefiles());
|
||||
assertEquals(count, store.getNumberOfstorefiles());
|
||||
System.currentTimeMillis() + ", cur=" + store.getNumberOfStoreFiles());
|
||||
assertEquals(count, store.getNumberOfStoreFiles());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -43,10 +43,18 @@ import org.apache.hadoop.fs.FilterFileSystem;
|
|||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -116,14 +124,19 @@ public class TestStore extends TestCase {
|
|||
|
||||
private void init(String methodName, Configuration conf)
|
||||
throws IOException {
|
||||
//Setting up a Store
|
||||
Path basedir = new Path(DIR+methodName);
|
||||
Path logdir = new Path(DIR+methodName+"/logs");
|
||||
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
// some of the tests write 4 versions and then flush
|
||||
// (with HBASE-4241, lower versions are collected on flush)
|
||||
hcd.setMaxVersions(4);
|
||||
init(methodName, conf, hcd);
|
||||
}
|
||||
|
||||
private void init(String methodName, Configuration conf,
|
||||
HColumnDescriptor hcd) throws IOException {
|
||||
//Setting up a Store
|
||||
Path basedir = new Path(DIR+methodName);
|
||||
Path logdir = new Path(DIR+methodName+"/logs");
|
||||
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
fs.delete(logdir, true);
|
||||
|
@ -137,6 +150,51 @@ public class TestStore extends TestCase {
|
|||
store = new Store(basedir, region, hcd, fs, conf);
|
||||
}
|
||||
|
||||
public void testDeleteExpiredStoreFiles() throws Exception {
|
||||
int storeFileNum = 4;
|
||||
int ttl = 1;
|
||||
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Enable the expired store file deletion
|
||||
conf.setBoolean("hbase.store.delete.expired.storefile", true);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setTimeToLive(ttl);
|
||||
init(getName(), conf, hcd);
|
||||
|
||||
long sleepTime = this.store.scanInfo.getTtl() / storeFileNum;
|
||||
long timeStamp;
|
||||
// There are 4 store files and the max time stamp difference among these
|
||||
// store files will be (this.store.ttl / storeFileNum)
|
||||
for (int i = 1; i <= storeFileNum; i++) {
|
||||
LOG.info("Adding some data for the store file #" + i);
|
||||
timeStamp = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
|
||||
this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
|
||||
this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
|
||||
flush(i);
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
|
||||
// Verify the total number of store files
|
||||
assertEquals(storeFileNum, this.store.getStorefiles().size());
|
||||
|
||||
// Each compaction request will find one expired store file and delete it
|
||||
// by the compaction.
|
||||
for (int i = 1; i <= storeFileNum; i++) {
|
||||
// verify the expired store file.
|
||||
CompactionRequest cr = this.store.requestCompaction();
|
||||
assertEquals(1, cr.getFiles().size());
|
||||
assertTrue(cr.getFiles().get(0).getReader().getMaxTimestamp() <
|
||||
(System.currentTimeMillis() - this.store.scanInfo.getTtl()));
|
||||
// Verify that the expired the store has been deleted.
|
||||
this.store.compact(cr);
|
||||
assertEquals(storeFileNum - i, this.store.getStorefiles().size());
|
||||
|
||||
// Let the next store file expired.
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
}
|
||||
|
||||
public void testLowestModificationTime() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
|
Loading…
Reference in New Issue