diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 443075c898f..6804d361ce6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -162,7 +162,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { return filesCompacting.isEmpty() && (StoreUtils.hasReferences(si.getStorefiles()) || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles()) - || needsSingleStripeCompaction(si)); + || needsSingleStripeCompaction(si) || hasExpiredStripes(si)); } @Override @@ -338,6 +338,33 @@ public class StripeCompactionPolicy extends CompactionPolicy { return result; } + private boolean isStripeExpired(ImmutableList storeFiles) { + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + if (cfTtl == Long.MAX_VALUE) { + return false; // minversion might be set, cannot delete old files + } + long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl; + for (HStoreFile storeFile : storeFiles) { + // Check store file is not empty and has not expired + if (storeFile.getReader().getMaxTimestamp() >= timestampCutoff + && storeFile.getReader().getEntries() != 0) { + return false; + } + } + return true; + } + + protected boolean hasExpiredStripes(StripeInformationProvider si) { + // Find if exists a stripe where all files have expired, if any. + ArrayList> stripes = si.getStripes(); + for (ImmutableList stripe : stripes) { + if (isStripeExpired(stripe)) { + return true; + } + } + return false; + } + private static long getTotalKvCount(final Collection candidates) { long totalSize = 0; for (HStoreFile storeFile : candidates) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 2b05830cfc9..5eb94ac1d44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; @@ -288,6 +289,35 @@ public class TestStripeCompactionPolicy { verifyNoCompaction(policy, si); } + @Test + public void testCheckExpiredStripeCompaction() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5); + conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); + + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + long now = defaultTtl + 2; + edge.setValue(now); + EnvironmentEdgeManager.injectEdge(edge); + HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10); + when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); + when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); + List expired = Lists.newArrayList(expiredFile, expiredFile); + List mixed = Lists.newArrayList(expiredFile, notExpiredFile); + + StripeCompactionPolicy policy = + createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true); + // Merge expired if there are eligible stripes. + StripeCompactionPolicy.StripeInformationProvider si = + createStripesWithFiles(mixed, mixed, mixed); + assertFalse(policy.needsCompactions(si, al())); + + si = createStripesWithFiles(mixed, mixed, mixed, expired); + assertFalse(policy.needsSingleStripeCompaction(si)); + assertTrue(policy.hasExpiredStripes(si)); + assertTrue(policy.needsCompactions(si, al())); + } + @Test public void testSplitOffStripe() throws Exception { Configuration conf = HBaseConfiguration.create(); @@ -776,6 +806,7 @@ public class TestStripeCompactionPolicy { anyBoolean())).thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); + when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP); return sf; }