diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b17d0d7c437..1de3b12931e 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -54,9 +54,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; -import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; @@ -1271,6 +1271,18 @@ public class Store extends SchemaConfigured implements HeapSize { boolean forcemajor = this.forceMajor && filesCompacting.isEmpty(); if (!forcemajor) { + // Delete the expired store files before the compaction selection. + if (conf.getBoolean("hbase.store.delete.expired.storefile", false) + && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) { + CompactSelection expiredSelection = compactSelection + .selectExpiredStoreFilesToCompact( + EnvironmentEdgeManager.currentTimeMillis() - this.ttl); + + // If there is any expired store files, delete them by compaction. + if (expiredSelection != null) { + return expiredSelection; + } + } // do not compact old files above a configurable threshold // save all references. we MUST compact them int pos = 0; @@ -1650,7 +1662,7 @@ public class Store extends SchemaConfigured implements HeapSize { /** * @return the number of files in this store */ - public int getNumberOfstorefiles() { + public int getNumberOfStoreFiles() { return this.storefiles.size(); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 251b67f93de..676ba7318a3 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -273,7 +273,7 @@ public class StoreFile extends SchemaConfigured { /** * @return Path or null if this StoreFile was made with a Stream. */ - Path getPath() { + public Path getPath() { return this.path; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java index 4df8f6a52c3..26338e17eef 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java @@ -19,7 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; -import java.util.AbstractList; import java.util.ArrayList; import java.util.Calendar; import java.util.GregorianCalendar; @@ -82,9 +81,46 @@ public class CompactSelection { } /** - * If the current hour falls in the off peak times and there are no - * outstanding off peak compactions, the current compaction is - * promoted to an off peak compaction. Currently only one off peak + * Select the expired store files to compact + * + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + public CompactSelection selectExpiredStoreFilesToCompact( + long maxExpiredTimeStamp) { + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : this.filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(conf, expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * If the current hour falls in the off peak times and there are no + * outstanding off peak compactions, the current compaction is + * promoted to an off peak compaction. Currently only one off peak * compaction is present in the compaction queue. * * @param currentHour diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 713f2b3ec91..2c51dc9e070 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4521,11 +4521,11 @@ public class TestFromClientSide { assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); // compact, net minus two blocks, two hits, no misses System.out.println("Compacting"); - assertEquals(2, store.getNumberOfstorefiles()); + assertEquals(2, store.getNumberOfStoreFiles()); store.triggerMajorCompaction(); region.compactStores(); waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max - assertEquals(1, store.getNumberOfstorefiles()); + assertEquals(1, store.getNumberOfStoreFiles()); expectedBlockCount -= 2; // evicted two blocks, cached none assertEquals(expectedBlockCount, cache.getBlockCount()); expectedBlockHits += 2; @@ -4546,12 +4546,12 @@ public class TestFromClientSide { throws InterruptedException { long start = System.currentTimeMillis(); while (start + timeout > System.currentTimeMillis() && - store.getNumberOfstorefiles() != count) { + store.getNumberOfStoreFiles() != count) { Thread.sleep(100); } System.out.println("start=" + start + ", now=" + - System.currentTimeMillis() + ", cur=" + store.getNumberOfstorefiles()); - assertEquals(count, store.getNumberOfstorefiles()); + System.currentTimeMillis() + ", cur=" + store.getNumberOfStoreFiles()); + assertEquals(count, store.getNumberOfStoreFiles()); } @Test diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index b6f691b66e2..1dd64833f08 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -43,10 +43,18 @@ import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -116,14 +124,19 @@ public class TestStore extends TestCase { private void init(String methodName, Configuration conf) throws IOException { - //Setting up a Store - Path basedir = new Path(DIR+methodName); - Path logdir = new Path(DIR+methodName+"/logs"); - Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); HColumnDescriptor hcd = new HColumnDescriptor(family); // some of the tests write 4 versions and then flush // (with HBASE-4241, lower versions are collected on flush) hcd.setMaxVersions(4); + init(methodName, conf, hcd); + } + + private void init(String methodName, Configuration conf, + HColumnDescriptor hcd) throws IOException { + //Setting up a Store + Path basedir = new Path(DIR+methodName); + Path logdir = new Path(DIR+methodName+"/logs"); + Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); FileSystem fs = FileSystem.get(conf); fs.delete(logdir, true); @@ -137,6 +150,51 @@ public class TestStore extends TestCase { store = new Store(basedir, region, hcd, fs, conf); } + public void testDeleteExpiredStoreFiles() throws Exception { + int storeFileNum = 4; + int ttl = 1; + + Configuration conf = HBaseConfiguration.create(); + // Enable the expired store file deletion + conf.setBoolean("hbase.store.delete.expired.storefile", true); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setTimeToLive(ttl); + init(getName(), conf, hcd); + + long sleepTime = this.store.scanInfo.getTtl() / storeFileNum; + long timeStamp; + // There are 4 store files and the max time stamp difference among these + // store files will be (this.store.ttl / storeFileNum) + for (int i = 1; i <= storeFileNum; i++) { + LOG.info("Adding some data for the store file #" + i); + timeStamp = EnvironmentEdgeManager.currentTimeMillis(); + this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null)); + this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null)); + this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null)); + flush(i); + Thread.sleep(sleepTime); + } + + // Verify the total number of store files + assertEquals(storeFileNum, this.store.getStorefiles().size()); + + // Each compaction request will find one expired store file and delete it + // by the compaction. + for (int i = 1; i <= storeFileNum; i++) { + // verify the expired store file. + CompactionRequest cr = this.store.requestCompaction(); + assertEquals(1, cr.getFiles().size()); + assertTrue(cr.getFiles().get(0).getReader().getMaxTimestamp() < + (System.currentTimeMillis() - this.store.scanInfo.getTtl())); + // Verify that the expired the store has been deleted. + this.store.compact(cr); + assertEquals(storeFileNum - i, this.store.getStorefiles().size()); + + // Let the next store file expired. + Thread.sleep(sleepTime); + } + } + public void testLowestModificationTime() throws Exception { Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf);