From ea4de9d124c34d36171f119ece521151bcd0b709 Mon Sep 17 00:00:00 2001 From: sershe Date: Wed, 23 Apr 2014 02:34:25 +0000 Subject: [PATCH] HBASE-10141 instead of putting expired store files thru compaction, just archive them git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1589332 13f79535-47bb-0310-9956-ffa450edef68 --- .../regionserver/DefaultStoreFileManager.java | 21 ++++++++ .../hadoop/hbase/regionserver/HStore.java | 50 ++++++++++++++++--- .../hbase/regionserver/StoreFileManager.java | 7 +++ .../regionserver/StripeStoreFileManager.java | 30 +++++++++++ .../compactions/CompactionConfiguration.java | 12 +---- .../RatioBasedCompactionPolicy.java | 44 ---------------- .../hadoop/hbase/regionserver/TestStore.java | 45 +++++++++-------- 7 files changed, 127 insertions(+), 82 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index 86649bf593d..4261860b1ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -135,6 +135,27 @@ class DefaultStoreFileManager implements StoreFileManager { return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority; } + @Override + public Collection getUnneededFiles(long maxTs, List filesCompacting) { + Collection expiredStoreFiles = null; + ImmutableList files = storefiles; + // 1) We can never get rid of the last file which has the maximum seqid. + // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. + for (int i = 0; i < files.size() - 1; ++i) { + StoreFile sf = files.get(i); + long fileTs = sf.getReader().getMaxTimestamp(); + if (fileTs < maxTs && !filesCompacting.contains(sf)) { + LOG.info("Found an expired store file: " + sf.getPath() + + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); + if (expiredStoreFiles == null) { + expiredStoreFiles = new ArrayList(); + } + expiredStoreFiles.add(sf); + } + } + return expiredStoreFiles; + } + private void sortAndSetStoreFiles(List storeFiles) { Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID); storefiles = ImmutableList.copyOf(storeFiles); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7f9f48286aa..2b8bec56340 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -155,6 +155,7 @@ public class HStore implements Store { private ScanInfo scanInfo; + // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. final List filesCompacting = Lists.newArrayList(); // All access must be synchronized. @@ -1343,6 +1344,9 @@ public class HStore implements Store { return null; } + // Before we do compaction, try to get rid of unneeded files to simplify things. + removeUnneededFiles(); + CompactionContext compaction = storeEngine.createCompaction(); CompactionRequest request = null; this.lock.readLock().lock(); @@ -1398,13 +1402,7 @@ public class HStore implements Store { return null; } - // Update filesCompacting (check that we do not try to compact the same StoreFile twice). - if (!Collections.disjoint(filesCompacting, selectedFiles)) { - Preconditions.checkArgument(false, "%s overlaps with %s", - selectedFiles, filesCompacting); - } - filesCompacting.addAll(selectedFiles); - Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); + addToCompactingFiles(selectedFiles); // If we're enqueuing a major, clear the force flag. this.forceMajor = this.forceMajor && !request.isMajor(); @@ -1425,6 +1423,44 @@ public class HStore implements Store { return compaction; } + /** Adds the files to compacting files. filesCompacting must be locked. */ + private void addToCompactingFiles(final Collection filesToAdd) { + if (filesToAdd == null) return; + // Check that we do not try to compact the same StoreFile twice. + if (!Collections.disjoint(filesCompacting, filesToAdd)) { + Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); + } + filesCompacting.addAll(filesToAdd); + Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); + } + + private void removeUnneededFiles() throws IOException { + if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return; + this.lock.readLock().lock(); + Collection delSfs = null; + try { + synchronized (filesCompacting) { + long cfTtl = getStoreFileTtl(); + if (cfTtl != Long.MAX_VALUE) { + delSfs = storeEngine.getStoreFileManager().getUnneededFiles( + EnvironmentEdgeManager.currentTimeMillis() - cfTtl, filesCompacting); + addToCompactingFiles(delSfs); + } + } + } finally { + this.lock.readLock().unlock(); + } + if (delSfs == null || delSfs.isEmpty()) return; + + Collection newFiles = new ArrayList(); // No new files. + writeCompactionWalRecord(delSfs, newFiles); + replaceStoreFiles(delSfs, newFiles); + completeCompaction(delSfs); + LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + + this + " of " + this.getRegionInfo().getRegionNameAsString() + + "; total size for store is " + StringUtils.humanReadableInt(storeSize)); + } + @Override public void cancelRequestedCompaction(CompactionContext compaction) { finishCompactionRequest(compaction.getRequest()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index e2495b9b5cd..de39915b10b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -127,4 +127,11 @@ public interface StoreFileManager { * @return The store compaction priority. */ int getStoreCompactionPriority(); + + /** + * @param maxTs Maximum expired timestamp. + * @param filesCompacting Files that are currently compacting. + * @return The files which don't have any necessary data according to TTL and other criteria. + */ + Collection getUnneededFiles(long maxTs, List filesCompacting); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index 9df0396fe88..36661b30f1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -919,4 +919,34 @@ public class StripeStoreFileManager public int getStripeCount() { return this.state.stripeFiles.size(); } + + @Override + public Collection getUnneededFiles(long maxTs, List filesCompacting) { + // 1) We can never get rid of the last file which has the maximum seqid in a stripe. + // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. + State state = this.state; + Collection expiredStoreFiles = null; + for (ImmutableList stripe : state.stripeFiles) { + expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles); + } + return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles); + } + + private Collection findExpiredFiles(ImmutableList stripe, long maxTs, + List filesCompacting, Collection expiredStoreFiles) { + // Order by seqnum is reversed. + for (int i = 1; i < stripe.size(); ++i) { + StoreFile sf = stripe.get(i); + long fileTs = sf.getReader().getMaxTimestamp(); + if (fileTs < maxTs && !filesCompacting.contains(sf)) { + LOG.info("Found an expired store file: " + sf.getPath() + + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); + if (expiredStoreFiles == null) { + expiredStoreFiles = new ArrayList(); + } + expiredStoreFiles.add(sf); + } + } + return expiredStoreFiles; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 69807121100..77b86c7b754 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -61,7 +61,6 @@ public class CompactionConfiguration { double compactionRatio; double offPeekCompactionRatio; long throttlePoint; - boolean shouldDeleteExpired; long majorCompactionPeriod; float majorCompactionJitter; @@ -80,7 +79,6 @@ public class CompactionConfiguration { throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle", 2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize()); - shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true); majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7); // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); @@ -92,7 +90,7 @@ public class CompactionConfiguration { public String toString() { return String.format( "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" - + "%s delete expired; major period %d, major jitter %f", + + " major period %d, major jitter %f", minCompactSize, maxCompactSize, minFilesToCompact, @@ -100,7 +98,6 @@ public class CompactionConfiguration { compactionRatio, offPeekCompactionRatio, throttlePoint, - shouldDeleteExpired ? "" : " don't", majorCompactionPeriod, majorCompactionJitter); } @@ -169,11 +166,4 @@ public class CompactionConfiguration { float getMajorCompactionJitter() { return majorCompactionJitter; } - - /** - * @return Whether expired files should be deleted ASAP using compactions - */ - boolean shouldDeleteExpired() { - return shouldDeleteExpired; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 2b4429225cd..8289454c5ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -96,15 +96,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { // If we can't have all files, we cannot do major anyway boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); if (!(forceMajor && isAllFiles)) { - // If there are expired files, only select them so that compaction deletes them - long cfTtl = this.storeConfigInfo.getStoreFileTtl(); - if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { - ArrayList expiredSelection = selectExpiredStoreFiles( - candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); - if (expiredSelection != null) { - return new CompactionRequest(expiredSelection); - } - } candidateSelection = skipLargeFiles(candidateSelection); isAllFiles = candidateFiles.size() == candidateSelection.size(); } @@ -131,41 +122,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return result; } - /** - * Select the expired store files to compact - * - * @param candidates the initial set of storeFiles - * @param maxExpiredTimeStamp - * The store file will be marked as expired if its max time stamp is - * less than this maxExpiredTimeStamp. - * @return A CompactSelection contains the expired store files as - * filesToCompact - */ - private ArrayList selectExpiredStoreFiles( - ArrayList candidates, long maxExpiredTimeStamp) { - if (candidates == null || candidates.size() == 0) return null; - ArrayList expiredStoreFiles = null; - - for (StoreFile storeFile : candidates) { - if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { - LOG.info("Deleting the expired store file by compaction: " - + storeFile.getPath() + " whose maxTimeStamp is " - + storeFile.getReader().getMaxTimestamp() - + " while the max expired timestamp is " + maxExpiredTimeStamp); - if (expiredStoreFiles == null) { - expiredStoreFiles = new ArrayList(); - } - expiredStoreFiles.add(storeFile); - } - } - if (expiredStoreFiles != null && expiredStoreFiles.size() == 1 - && expiredStoreFiles.get(0).getReader().getEntries() == 0) { - // If just one empty store file, do not select for compaction. - return null; - } - return expiredStoreFiles; - } - /** * @param candidates pre-filtrate * @return filtered subset diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 10e0142f3ae..fd2bf35310e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.*; + import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; @@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; @@ -271,15 +274,19 @@ public class TestStore { int ttl = 4; IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); EnvironmentEdgeManagerTestHelper.injectEdge(edge); - + Configuration conf = HBaseConfiguration.create(); // Enable the expired store file deletion conf.setBoolean("hbase.store.delete.expired.storefile", true); + // Set the compaction threshold higher to avoid normal compactions. + conf.setInt(CompactionConfiguration.MIN_KEY, 5); + HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setTimeToLive(ttl); init(name.getMethodName(), conf, hcd); - long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum; + long storeTtl = this.store.getScanInfo().getTtl(); + long sleepTime = storeTtl / storeFileNum; long timeStamp; // There are 4 store files and the max time stamp difference among these // store files will be (this.store.ttl / storeFileNum) @@ -296,29 +303,27 @@ public class TestStore { // Verify the total number of store files Assert.assertEquals(storeFileNum, this.store.getStorefiles().size()); - // Each compaction request will find one expired store file and delete it - // by the compaction. - for (int i = 1; i <= storeFileNum; i++) { + // Each call will find one expired store file and delete it before compaction happens. + // There will be no compaction due to threshold above. Last file will not be replaced. + for (int i = 1; i <= storeFileNum - 1; i++) { // verify the expired store file. - CompactionContext compaction = this.store.requestCompaction(); - CompactionRequest cr = compaction.getRequest(); - // the first is expired normally. - // If not the first compaction, there is another empty store file, - List files = new ArrayList(cr.getFiles()); - Assert.assertEquals(Math.min(i, 2), cr.getFiles().size()); - for (int j = 0; j < files.size(); j++) { - Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge - .currentTimeMillis() - this.store.getScanInfo().getTtl())); + assertNull(this.store.requestCompaction()); + Collection sfs = this.store.getStorefiles(); + // Ensure i files are gone. + assertEquals(storeFileNum - i, sfs.size()); + // Ensure only non-expired files remain. + for (StoreFile sf : sfs) { + assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTimeMillis() - storeTtl)); } - // Verify that the expired store file is compacted to an empty store file. - // Default compaction policy creates just one and only one compacted file. - StoreFile compactedFile = this.store.compact(compaction).get(0); - // It is an empty store file. - Assert.assertEquals(0, compactedFile.getReader().getEntries()); - // Let the next store file expired. edge.incrementTime(sleepTime); } + assertNull(this.store.requestCompaction()); + Collection sfs = this.store.getStorefiles(); + // Assert the last expired file is not removed. + assertEquals(1, sfs.size()); + long ts = sfs.iterator().next().getReader().getMaxTimestamp(); + assertTrue(ts < (edge.currentTimeMillis() - storeTtl)); } @Test