From 1a766c42a1890b04a2991d39276e721c689777f5 Mon Sep 17 00:00:00 2001 From: jyates Date: Wed, 20 Feb 2013 22:40:55 +0000 Subject: [PATCH 1/2] HBASE-7725: Add ability to create custom compaction request git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1448449 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/coprocessor/BaseRegionObserver.java | 47 ++++- .../hbase/coprocessor/RegionObserver.java | 170 ++++++++++++++---- .../regionserver/CompactSplitThread.java | 55 ++++-- .../regionserver/CompactionRequestor.java | 59 ++++-- .../hbase/regionserver/HRegionServer.java | 31 ++-- .../hadoop/hbase/regionserver/HStore.java | 39 ++-- .../regionserver/RegionCoprocessorHost.java | 47 ++--- .../hadoop/hbase/regionserver/Store.java | 3 +- .../compactions/CompactionRequest.java | 89 ++++++--- .../regionserver/compactions/Compactor.java | 26 ++- .../compactions/DefaultCompactor.java | 21 +-- .../hbase/regionserver/TestCompaction.java | 65 ++++++- 12 files changed, 483 insertions(+), 169 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 8098cfca40f..b8ee2b4b9f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Pair; @@ -134,29 +136,64 @@ public abstract class BaseRegionObserver implements RegionObserver { public void preCompactSelection(final ObserverContext c, final Store store, final List candidates) throws IOException { } + @Override + public void preCompactSelection(final ObserverContext c, + final Store store, final List candidates, final CompactionRequest request) + throws IOException { + preCompactSelection(c, store, candidates); + } + @Override public void postCompactSelection(final ObserverContext c, final Store store, final ImmutableList selected) { } + @Override + public void postCompactSelection(final ObserverContext c, + final Store store, final ImmutableList selected, CompactionRequest request) { + postCompactSelection(c, store, selected); + } + @Override public InternalScanner preCompact(ObserverContext e, final Store store, final InternalScanner scanner, final ScanType scanType) - throws IOException { + throws IOException { return scanner; } @Override - public InternalScanner preCompactScannerOpen(final ObserverContext c, - final Store store, List scanners, final ScanType scanType, - final long earliestPutTs, final InternalScanner s) throws IOException { + public InternalScanner preCompact(ObserverContext e, + final Store store, final InternalScanner scanner, final ScanType scanType, + CompactionRequest request) throws IOException { + return preCompact(e, store, scanner, scanType); + } + + @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s) throws IOException { return null; } + @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s, CompactionRequest request) throws IOException { + return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); + } + @Override public void postCompact(ObserverContext e, final Store store, final StoreFile resultFile) throws IOException { } +@Override + public void postCompact(ObserverContext e, final Store store, + final StoreFile resultFile, CompactionRequest request) throws IOException { + postCompact(e, store, resultFile); + } + @Override public void preGetClosestRowBefore(final ObserverContext e, final byte [] row, final byte [] family, final Result result) @@ -351,4 +388,4 @@ public abstract class BaseRegionObserver implements RegionObserver { List> familyPaths, boolean hasLoaded) throws IOException { return hasLoaded; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 6119e5b81a4..2ba1b4c6520 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -126,87 +127,184 @@ public interface RegionObserver extends Coprocessor { final StoreFile resultFile) throws IOException; /** - * Called prior to selecting the {@link StoreFile}s to compact from the list - * of available candidates. To alter the files used for compaction, you may - * mutate the passed in list of candidates. + * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of + * available candidates. To alter the files used for compaction, you may mutate the passed in list + * of candidates. + * @param c the environment provided by the region server + * @param store the store where compaction is being requested + * @param candidates the store files currently available for compaction + * @param request custom compaction request + * @throws IOException if an error occurred on the coprocessor + */ + void preCompactSelection(final ObserverContext c, + final Store store, final List candidates, final CompactionRequest request) + throws IOException; + + /** + * Called prior to selecting the {@link StoreFile}s to compact from the list of available + * candidates. To alter the files used for compaction, you may mutate the passed in list of + * candidates. * @param c the environment provided by the region server * @param store the store where compaction is being requested * @param candidates the store files currently available for compaction * @throws IOException if an error occurred on the coprocessor + * @deprecated Use {@link #preCompactSelection(ObserverContext, Store, List, Object)} instead */ void preCompactSelection(final ObserverContext c, final Store store, final List candidates) throws IOException; /** - * Called after the {@link StoreFile}s to compact have been selected from the - * available candidates. + * Called after the {@link StoreFile}s to compact have been selected from the available + * candidates. * @param c the environment provided by the region server * @param store the store being compacted * @param selected the store files selected to compact + * @param request custom compaction request */ + void postCompactSelection(final ObserverContext c, + final Store store, final ImmutableList selected, CompactionRequest request); + + /** + * Called after the {@link StoreFile}s to compact have been selected from the available + * candidates. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param selected the store files selected to compact + * @param compactionAttributes custom attributes for the compaction + * @deprecated use {@link #postCompactSelection(ObserverContext, Store, ImmutableList, Object)} + * instead. + */ + @Deprecated void postCompactSelection(final ObserverContext c, final Store store, final ImmutableList selected); /** - * Called prior to writing the {@link StoreFile}s selected for compaction into - * a new {@code StoreFile}. To override or modify the compaction process, - * implementing classes have two options: + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile}. To override or modify the compaction process, implementing classes have two + * options: *
    - *
  • Wrap the provided {@link InternalScanner} with a custom - * implementation that is returned from this method. The custom scanner - * can then inspect {@link KeyValue}s from the wrapped scanner, applying - * its own policy to what gets written.
  • - *
  • Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} - * and provide a custom implementation for writing of new - * {@link StoreFile}s. Note: any implementations bypassing - * core compaction using this approach must write out new store files - * themselves or the existing data will no longer be available after - * compaction.
  • + *
  • Wrap the provided {@link InternalScanner} with a custom implementation that is returned + * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped + * scanner, applying its own policy to what gets written.
  • + *
  • Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a + * custom implementation for writing of new {@link StoreFile}s. Note: any implementations + * bypassing core compaction using this approach must write out new store files themselves or the + * existing data will no longer be available after compaction.
  • *
* @param c the environment provided by the region server * @param store the store being compacted - * @param scanner the scanner over existing data used in the store file - * rewriting + * @param scanner the scanner over existing data used in the store file rewriting * @param scanType type of Scan - * @return the scanner to use during compaction. Should not be {@code null} - * unless the implementation is writing new store files on its own. + * @param request the requested compaction + * @return the scanner to use during compaction. Should not be {@code null} unless the + * implementation is writing new store files on its own. * @throws IOException if an error occurred on the coprocessor */ InternalScanner preCompact(final ObserverContext c, - final Store store, final InternalScanner scanner, - final ScanType scanType) throws IOException; + final Store store, final InternalScanner scanner, final ScanType scanType, + CompactionRequest request) throws IOException; /** - * Called prior to writing the {@link StoreFile}s selected for compaction into - * a new {@code StoreFile} and prior to creating the scanner used to read the - * input files. To override or modify the compaction process, - * implementing classes can return a new scanner to provide the KeyValues to be - * stored into the new {@code StoreFile} or null to perform the default processing. - * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile}. To override or modify the compaction process, implementing classes have two + * options: + *
    + *
  • Wrap the provided {@link InternalScanner} with a custom implementation that is returned + * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped + * scanner, applying its own policy to what gets written.
  • + *
  • Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a + * custom implementation for writing of new {@link StoreFile}s. Note: any implementations + * bypassing core compaction using this approach must write out new store files themselves or the + * existing data will no longer be available after compaction.
  • + *
+ * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanner the scanner over existing data used in the store file rewriting + * @param scanType type of Scan + * @param request the requested compaction + * @return the scanner to use during compaction. Should not be {@code null} unless the + * implementation is writing new store files on its own. + * @throws IOException if an error occurred on the coprocessor + * @deprecated use + * {@link #preCompact(ObserverContext, Store, InternalScanner, ScanType, CompactionRequest)} + * instead + */ + @Deprecated + InternalScanner preCompact(final ObserverContext c, + final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException; + + /** + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile} and prior to creating the scanner used to read the input files. To override + * or modify the compaction process, implementing classes can return a new scanner to provide the + * KeyValues to be stored into the new {@code StoreFile} or null to perform the default + * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no * effect in this hook. * @param c the environment provided by the region server * @param store the store being compacted * @param scanners the list {@link StoreFileScanner}s to be read from * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction - * @param earliestPutTs timestamp of the earliest put that was found in any of the involved - * store files + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store + * files * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @return the scanner to use during compaction. {@code null} if the default implementation - * is to be used. + * @param request the requested compaction + * @return the scanner to use during compaction. {@code null} if the default implementation is to + * be used. * @throws IOException if an error occurred on the coprocessor */ + InternalScanner preCompactScannerOpen(final ObserverContext c, + final Store store, List scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s, CompactionRequest request) + throws IOException; + + /** + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile} and prior to creating the scanner used to read the input files. To override + * or modify the compaction process, implementing classes can return a new scanner to provide the + * KeyValues to be stored into the new {@code StoreFile} or null to perform the default + * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * effect in this hook. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanners the list {@link StoreFileScanner}s to be read from + * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store + * files + * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain + * @param request the requested compaction + * @return the scanner to use during compaction. {@code null} if the default implementation is to + * be used. + * @throws IOException if an error occurred on the coprocessor + * @deprecated Use + * {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} + * instead. + */ + @Deprecated InternalScanner preCompactScannerOpen(final ObserverContext c, final Store store, List scanners, final ScanType scanType, final long earliestPutTs, final InternalScanner s) throws IOException; /** - * Called after compaction has completed and the new store file has been - * moved in to place. + * Called after compaction has completed and the new store file has been moved in to place. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param resultFile the new store file written out during compaction + * @param request the requested compaction + * @throws IOException if an error occurred on the coprocessor + */ + void postCompact(final ObserverContext c, final Store store, + StoreFile resultFile, CompactionRequest request) throws IOException; + + /** + * Called after compaction has completed and the new store file has been moved in to place. * @param c the environment provided by the region server * @param store the store being compacted * @param resultFile the new store file written out during compaction * @throws IOException if an error occurred on the coprocessor + * @deprecated Use {@link #postCompact(ObserverContext, Store, StoreFile, CompactionRequest)} + * instead */ + @Deprecated void postCompact(final ObserverContext c, final Store store, StoreFile resultFile) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index d601aee32aa..39cb708f60c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; @@ -183,23 +185,41 @@ public class CompactSplitThread implements CompactionRequestor { } } - public synchronized void requestCompaction(final HRegion r, - final String why) throws IOException { - for (Store s : r.getStores().values()) { - requestCompaction(r, s, why, Store.NO_PRIORITY); - } + @Override + public synchronized List requestCompaction(final HRegion r, final String why) + throws IOException { + return requestCompaction(r, why, null); } - public synchronized void requestCompaction(final HRegion r, final Store s, - final String why) throws IOException { - requestCompaction(r, s, why, Store.NO_PRIORITY); + @Override + public synchronized List requestCompaction(final HRegion r, final String why, + List requests) throws IOException { + return requestCompaction(r, why, Store.NO_PRIORITY, requests); } - public synchronized void requestCompaction(final HRegion r, final String why, - int p) throws IOException { - for (Store s : r.getStores().values()) { - requestCompaction(r, s, why, p); + @Override + public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s, + final String why, CompactionRequest request) throws IOException { + return requestCompaction(r, s, why, Store.NO_PRIORITY, request); + } + + @Override + public synchronized List requestCompaction(final HRegion r, final String why, + int p, List requests) throws IOException { + // not a special compaction request, so make our own list + List ret; + if (requests == null) { + ret = new ArrayList(r.getStores().size()); + for (Store s : r.getStores().values()) { + ret.add(requestCompaction(r, s, why, p, null)); + } + } else { + ret = new ArrayList(requests.size()); + for (CompactionRequest request : requests) { + requests.add(requestCompaction(r, request.getStore(), why, p, request)); + } } + return ret; } /** @@ -207,13 +227,15 @@ public class CompactSplitThread implements CompactionRequestor { * @param s Store to request compaction on * @param why Why compaction requested -- used in debug messages * @param priority override the default priority (NO_PRIORITY == decide) + * @param request custom compaction request. Can be null in which case a simple + * compaction will be used. */ - public synchronized void requestCompaction(final HRegion r, final Store s, - final String why, int priority) throws IOException { + public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s, + final String why, int priority, CompactionRequest request) throws IOException { if (this.server.isStopped()) { - return; + return null; } - CompactionRequest cr = s.requestCompaction(priority); + CompactionRequest cr = s.requestCompaction(priority, request); if (cr != null) { cr.setServer(server); if (priority != Store.NO_PRIORITY) { @@ -234,6 +256,7 @@ public class CompactSplitThread implements CompactionRequestor { " because compaction request was cancelled"); } } + return cr; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index d6814acca07..053728c29d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -19,42 +19,73 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @InterfaceAudience.Private public interface CompactionRequestor { /** * @param r Region to compact * @param why Why compaction was requested -- used in debug messages + * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no + * compactions were started * @throws IOException */ - public void requestCompaction(final HRegion r, final String why) throws IOException; - - /** - * @param r Region to compact - * @param s Store within region to compact - * @param why Why compaction was requested -- used in debug messages - * @throws IOException - */ - public void requestCompaction(final HRegion r, final Store s, final String why) + public List requestCompaction(final HRegion r, final String why) throws IOException; /** * @param r Region to compact * @param why Why compaction was requested -- used in debug messages - * @param pri Priority of this compaction. minHeap. <=0 is critical + * @param requests custom compaction requests. Each compaction must specify the store on which it + * is acting. Can be null in which case a compaction will be attempted on all + * stores for the region. + * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no + * compactions were started * @throws IOException */ - public void requestCompaction(final HRegion r, final String why, int pri) throws IOException; + public List requestCompaction(final HRegion r, final String why, + List requests) + throws IOException; + + /** + * @param r Region to compact + * @param s Store within region to compact + * @param why Why compaction was requested -- used in debug messages + * @param request custom compaction request for the {@link HRegion} and {@link Store}. Custom + * request must be null or be constructed with matching region and store. + * @return The created {@link CompactionRequest} or null if no compaction was started. + * @throws IOException + */ + public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why, + CompactionRequest request) throws IOException; + + /** + * @param r Region to compact + * @param why Why compaction was requested -- used in debug messages + * @param pri Priority of this compaction. minHeap. <=0 is critical + * @param requests custom compaction requests. Each compaction must specify the store on which it + * is acting. Can be null in which case a compaction will be attempted on all + * stores for the region. + * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no + * compactions were started. + * @throws IOException + */ + public List requestCompaction(final HRegion r, final String why, int pri, + List requests) throws IOException; /** * @param r Region to compact * @param s Store within region to compact * @param why Why compaction was requested -- used in debug messages * @param pri Priority of this compaction. minHeap. <=0 is critical + * @param request custom compaction request to run. {@link Store} and {@link HRegion} for the + * request must match the region and store specified here. + * @return The created {@link CompactionRequest} or null if no compaction was started * @throws IOException */ - public void requestCompaction(final HRegion r, final Store s, - final String why, int pri) throws IOException; - + public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why, + int pri, CompactionRequest request) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8362ee9648a..c23e12f4277 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1348,17 +1348,17 @@ public class HRegionServer implements ClientProtocol, try { if (s.needsCompaction()) { // Queue a compaction. Will recognize if major is needed. - this.instance.compactSplitThread.requestCompaction(r, s, - getName() + " requests compaction"); + this.instance.compactSplitThread.requestCompaction(r, s, getName() + + " requests compaction", null); } else if (s.isMajorCompaction()) { - if (majorCompactPriority == DEFAULT_PRIORITY || - majorCompactPriority > r.getCompactPriority()) { - this.instance.compactSplitThread.requestCompaction(r, s, - getName() + " requests major compaction; use default priority"); + if (majorCompactPriority == DEFAULT_PRIORITY + || majorCompactPriority > r.getCompactPriority()) { + this.instance.compactSplitThread.requestCompaction(r, s, getName() + + " requests major compaction; use default priority", null); } else { - this.instance.compactSplitThread.requestCompaction(r, s, - getName() + " requests major compaction; use configured priority", - this.majorCompactPriority); + this.instance.compactSplitThread.requestCompaction(r, s, getName() + + " requests major compaction; use configured priority", + this.majorCompactPriority, null); } } } catch (IOException e) { @@ -1665,7 +1665,7 @@ public class HRegionServer implements ClientProtocol, // Do checks to see if we need to compact (references or too many files) for (Store s : r.getStores().values()) { if (s.hasReferences() || s.needsCompaction()) { - getCompactionRequester().requestCompaction(r, s, "Opening Region"); + getCompactionRequester().requestCompaction(r, s, "Opening Region", null); } } long openSeqNum = r.getOpenSeqNum(); @@ -3630,10 +3630,10 @@ public class HRegionServer implements ClientProtocol, String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; if(family != null) { compactSplitThread.requestCompaction(region, store, log, - Store.PRIORITY_USER); + Store.PRIORITY_USER, null); } else { compactSplitThread.requestCompaction(region, log, - Store.PRIORITY_USER); + Store.PRIORITY_USER, null); } return CompactRegionResponse.newBuilder().build(); } catch (IOException ie) { @@ -4034,4 +4034,11 @@ public class HRegionServer implements ClientProtocol, String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation); } + + /** + * @return the underlying {@link CompactSplitThread} for the servers + */ + public CompactSplitThread getCompactSplitThread() { + return this.compactSplitThread; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 3638ca299bc..b3a2cfd0f8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1090,14 +1090,13 @@ public class HStore implements Store { List sfs = new ArrayList(); long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { - List newFiles = - this.compactor.compact(filesToCompact, cr.isMajor()); + List newFiles = this.compactor.compact(cr); // Move the compaction into place. if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { for (Path newFile: newFiles) { StoreFile sf = completeCompaction(filesToCompact, newFile); if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postCompact(this, sf); + region.getCoprocessorHost().postCompact(this, sf, cr); } sfs.add(sf); } @@ -1181,13 +1180,12 @@ public class HStore implements Store { try { // Ready to go. Have list of files to compact. - List newFiles = - this.compactor.compact(filesToCompact, isMajor); + List newFiles = this.compactor.compactForTesting(filesToCompact, isMajor); for (Path newFile: newFiles) { // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, newFile); if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postCompact(this, sf); + region.getCoprocessorHost().postCompact(this, sf, null); } } } finally { @@ -1219,17 +1217,19 @@ public class HStore implements Store { return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles()); } + @Override public CompactionRequest requestCompaction() throws IOException { - return requestCompaction(Store.NO_PRIORITY); + return requestCompaction(Store.NO_PRIORITY, null); } - public CompactionRequest requestCompaction(int priority) throws IOException { + @Override + public CompactionRequest requestCompaction(int priority, CompactionRequest request) + throws IOException { // don't even select for compaction if writes are disabled if (!this.region.areWritesEnabled()) { return null; } - CompactionRequest ret = null; this.lock.readLock().lock(); try { List candidates = Lists.newArrayList(storeFileManager.getStorefiles()); @@ -1238,7 +1238,7 @@ public class HStore implements Store { candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting); boolean override = false; if (region.getCoprocessorHost() != null) { - override = region.getCoprocessorHost().preCompactSelection(this, candidates); + override = region.getCoprocessorHost().preCompactSelection(this, candidates, request); } CompactSelection filesToCompact; if (override) { @@ -1257,7 +1257,7 @@ public class HStore implements Store { if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postCompactSelection(this, - ImmutableList.copyOf(filesToCompact.getFilesToCompact())); + ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request); } // no files to compact @@ -1287,15 +1287,24 @@ public class HStore implements Store { // everything went better than expected. create a compaction request int pri = getCompactPriority(priority); - ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); + //not a special compaction request, so we need to make one + if(request == null){ + request = new CompactionRequest(region, this, filesToCompact, isMajor, pri); + }else{ + //update the request with what the system thinks the request should be + //its up to the request if it wants to listen + request.setSelection(filesToCompact); + request.setIsMajor(isMajor); + request.setPriority(pri); + } } } finally { this.lock.readLock().unlock(); } - if (ret != null) { - this.region.reportCompactionRequestStart(ret.isMajor()); + if (request != null) { + this.region.reportCompactionRequestStart(request.isMajor()); } - return ret; + return request; } public void finishRequest(CompactionRequest cr) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 2805890a4df..2ec9f93ef26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -345,11 +347,10 @@ public class RegionCoprocessorHost /** * See - * {@link RegionObserver#preCompactScannerOpen(ObserverContext, - * Store, List, ScanType, long, InternalScanner)} + * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} */ public InternalScanner preCompactScannerOpen(Store store, List scanners, - ScanType scanType, long earliestPutTs) throws IOException { + ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException { ObserverContext ctx = null; InternalScanner s = null; for (RegionEnvironment env: coprocessors) { @@ -357,7 +358,7 @@ public class RegionCoprocessorHost ctx = ObserverContext.createAndPrepare(env, ctx); try { s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners, - scanType, earliestPutTs, s); + scanType, earliestPutTs, s, request); } catch (Throwable e) { handleCoprocessorThrowable(env,e); } @@ -370,22 +371,23 @@ public class RegionCoprocessorHost } /** - * Called prior to selecting the {@link StoreFile}s for compaction from - * the list of currently available candidates. + * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently + * available candidates. * @param store The store where compaction is being requested * @param candidates The currently available store files + * @param request custom compaction request * @return If {@code true}, skip the normal selection process and use the current list * @throws IOException */ - public boolean preCompactSelection(Store store, List candidates) throws IOException { + public boolean preCompactSelection(Store store, List candidates, + CompactionRequest request) throws IOException { ObserverContext ctx = null; boolean bypass = false; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - ((RegionObserver)env.getInstance()).preCompactSelection( - ctx, store, candidates); + ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request); } catch (Throwable e) { handleCoprocessorThrowable(env,e); @@ -400,20 +402,20 @@ public class RegionCoprocessorHost } /** - * Called after the {@link StoreFile}s to be compacted have been selected - * from the available candidates. + * Called after the {@link StoreFile}s to be compacted have been selected from the available + * candidates. * @param store The store where compaction is being requested * @param selected The store files selected to compact + * @param request custom compaction */ - public void postCompactSelection(Store store, - ImmutableList selected) { + public void postCompactSelection(Store store, ImmutableList selected, + CompactionRequest request) { ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - ((RegionObserver)env.getInstance()).postCompactSelection( - ctx, store, selected); + ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request); } catch (Throwable e) { handleCoprocessorThrowableNoRethrow(env,e); } @@ -429,18 +431,19 @@ public class RegionCoprocessorHost * @param store the store being compacted * @param scanner the scanner used to read store data during compaction * @param scanType type of Scan + * @param request the compaction that will be executed * @throws IOException */ - public InternalScanner preCompact(Store store, InternalScanner scanner, - ScanType scanType) throws IOException { + public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType, + CompactionRequest request) throws IOException { ObserverContext ctx = null; boolean bypass = false; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - scanner = ((RegionObserver)env.getInstance()).preCompact( - ctx, store, scanner, scanType); + scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, scanType, + request); } catch (Throwable e) { handleCoprocessorThrowable(env,e); } @@ -457,15 +460,17 @@ public class RegionCoprocessorHost * Called after the store compaction has completed. * @param store the store being compacted * @param resultFile the new store file written during compaction + * @param request the compaction that is being executed * @throws IOException */ - public void postCompact(Store store, StoreFile resultFile) throws IOException { + public void postCompact(Store store, StoreFile resultFile, CompactionRequest request) + throws IOException { ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - ((RegionObserver)env.getInstance()).postCompact(ctx, store, resultFile); + ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 860931b7997..d3a8edcdd9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -160,7 +160,8 @@ public interface Store extends HeapSize, StoreConfigInformation { public CompactionRequest requestCompaction() throws IOException; - public CompactionRequest requestCompaction(int priority) throws IOException; + public CompactionRequest requestCompaction(int priority, CompactionRequest request) + throws IOException; public void finishRequest(CompactionRequest cr); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 488da3dd46e..2b2427d14b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -19,20 +19,22 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -46,35 +48,51 @@ import com.google.common.collect.Collections2; /** * This class holds all details necessary to run a compaction. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate({ "coprocessor" }) +@InterfaceStability.Evolving public class CompactionRequest implements Comparable, Runnable { static final Log LOG = LogFactory.getLog(CompactionRequest.class); private final HRegion region; private final HStore store; - private final CompactSelection compactSelection; - private final long totalSize; - private final boolean isMajor; + private CompactSelection compactSelection; + private long totalSize; + private boolean isMajor; private int priority; private final Long timeInNanos; private HRegionServer server = null; - public CompactionRequest(HRegion region, HStore store, - CompactSelection files, boolean isMajor, int priority) { - Preconditions.checkNotNull(region); - Preconditions.checkNotNull(files); + public static CompactionRequest getRequestForTesting(Collection selection, + boolean isMajor) { + return new CompactionRequest(null, null, new CompactSelection(new ArrayList( + selection)), isMajor, 0, System.nanoTime()); + } + /** + * Constructor for a custom compaction. Uses the setXXX methods to update the state of the + * compaction before being used. + */ + public CompactionRequest(HRegion region, HStore store, int priority) { + this(region, store, null, false, priority, System + .nanoTime()); + } + + public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) { + // delegate to the internal constructor after checking basic preconditions + this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System + .nanoTime()); + } + + private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor, + int priority, long startTime) { this.region = region; this.store = store; - this.compactSelection = files; - long sz = 0; - for (StoreFile sf : files.getFilesToCompact()) { - sz += sf.getReader().length(); - } - this.totalSize = sz; this.isMajor = isMajor; this.priority = priority; - this.timeInNanos = System.nanoTime(); + this.timeInNanos = startTime; + if (files != null) { + this.setSelection(files); + } } /** @@ -162,6 +180,28 @@ public class CompactionRequest implements Comparable, this.server = hrs; } + /** + * Set the files (and, implicitly, the size of the compaction based on those files) + * @param files files that should be included in the compaction + */ + public void setSelection(CompactSelection files) { + long sz = 0; + for (StoreFile sf : files.getFilesToCompact()) { + sz += sf.getReader().length(); + } + this.totalSize = sz; + this.compactSelection = files; + } + + /** + * Specify if this compaction should be a major compaction based on the state of the store + * @param isMajor true if the system determines that this compaction should be a major + * compaction + */ + public void setIsMajor(boolean isMajor) { + this.isMajor = isMajor; + } + @Override public String toString() { String fsList = Joiner.on(", ").join( @@ -200,12 +240,11 @@ public class CompactionRequest implements Comparable, if (completed) { // degenerate case: blocked regions require recursive enqueues if (store.getCompactPriority() <= 0) { - server.compactSplitThread - .requestCompaction(region, store, "Recursive enqueue"); - } else { - // see if the compaction has caused us to exceed max region size - server.compactSplitThread.requestSplit(region); - } + server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null); + } else { + // see if the compaction has caused us to exceed max region size + server.getCompactSplitThread().requestSplit(region); + } } } catch (IOException ex) { LOG.error("Compaction failed " + this, RemoteExceptionHandler @@ -234,4 +273,4 @@ public class CompactionRequest implements Comparable, } } } - } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 536f4dfb1f6..f70b6c8b4de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -41,15 +42,28 @@ public abstract class Compactor { /** * Do a minor/major compaction on an explicit set of storefiles from a Store. - * * @param filesToCompact which files to compact - * @param majorCompaction true to major compact (prune all deletes, max versions, etc) - * @return Product of compaction or an empty list if all cells expired or deleted and - * nothing made it through the compaction. + * @param request the requested compaction + * @return Product of compaction or an empty list if all cells expired or deleted and nothing made + * it through the compaction. * @throws IOException */ - public abstract List compact(final Collection filesToCompact, - final boolean majorCompaction) throws IOException; + public abstract List compact(final CompactionRequest request) throws IOException; + + /** + * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to + * {@link #compact(CompactionRequest)}; + * @param filesToCompact the files to compact. These are used as the compactionSelection for the + * generated {@link CompactionRequest}. + * @param isMajor true to major compact (prune all deletes, max versions, etc) + * @return Product of compaction or an empty list if all cells expired or deleted and nothing made + * it through the compaction. + * @throws IOException + */ + public List compactForTesting(final Collection filesToCompact, boolean isMajor) + throws IOException { + return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor)); + } public CompactionProgress getProgress() { return this.progress; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index a1a4705eac9..060347b03a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -59,16 +59,12 @@ public class DefaultCompactor extends Compactor { /** * Do a minor/major compaction on an explicit set of storefiles from a Store. - * - * @param filesToCompact which files to compact - * @param majorCompaction true to major compact (prune all deletes, max versions, etc) - * @return Product of compaction or an empty list if all cells expired or deleted and - * nothing made it through the compaction. - * @throws IOException */ @SuppressWarnings("deprecation") - public List compact(final Collection filesToCompact, - final boolean majorCompaction) throws IOException { + @Override + public List compact(final CompactionRequest request) throws IOException { + final Collection filesToCompact = request.getFiles(); + boolean majorCompaction = request.isMajor(); // Max-sequenceID is the last key in the files we're compacting long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true); @@ -139,7 +135,8 @@ public class DefaultCompactor extends Compactor { scanner = store .getCoprocessorHost() .preCompactScannerOpen(store, scanners, - majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs); + majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs, + request); } ScanType scanType = majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT; if (scanner == null) { @@ -150,11 +147,11 @@ public class DefaultCompactor extends Compactor { scanType, smallestReadPoint, earliestPutTs); } if (store.getCoprocessorHost() != null) { - InternalScanner cpScanner = - store.getCoprocessorHost().preCompact(store, scanner, scanType); + InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner, + scanType, request); // NULL scanner returned from coprocessor hooks means skip normal processing if (cpScanner == null) { - return newFiles; // an empty list + return newFiles; // an empty list } scanner = cpScanner; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 48287f9d1b3..2e82b45ba08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,13 +48,17 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.compactions.*; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -591,7 +596,7 @@ public class TestCompaction extends HBaseTestCase { Collection storeFiles = store.getStorefiles(); Compactor tool = store.compactor; - List newFiles = tool.compact(storeFiles, false); + List newFiles = tool.compactForTesting(storeFiles, false); // Now lets corrupt the compacted file. FileSystem fs = FileSystem.get(conf); @@ -630,7 +635,7 @@ public class TestCompaction extends HBaseTestCase { } store.triggerMajorCompaction(); - CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY); + CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null); assertNotNull("Expected to receive a compaction request", request); assertEquals( "System-requested major compaction should not occur if there are too many store files", @@ -648,7 +653,7 @@ public class TestCompaction extends HBaseTestCase { createStoreFile(r); } store.triggerMajorCompaction(); - CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER); + CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null); assertNotNull("Expected to receive a compaction request", request); assertEquals( "User-requested major compaction should always occur, even if there are too many store files", @@ -656,5 +661,53 @@ public class TestCompaction extends HBaseTestCase { request.isMajor()); } -} + /** + * Create a custom compaction request and be sure that we can track it through the queue, knowing + * when the compaction is completed. + */ + public void testTrackingCompactionRequest() throws Exception { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplitThread thread = new CompactSplitThread(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + // setup a region/store with some files + Store store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { + createStoreFile(r); + } + + CountDownLatch latch = new CountDownLatch(1); + TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch); + thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request); + // wait for the latch to complete. + latch.await(); + + thread.interruptIfNecessary(); + } + + /** + * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes. + */ + public static class TrackableCompactionRequest extends CompactionRequest { + private CountDownLatch done; + + /** + * Constructor for a custom compaction. Uses the setXXX methods to update the state of the + * compaction before being used. + */ + public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) { + super(region, store, Store.PRIORITY_USER); + this.done = finished; + } + + @Override + public void run() { + super.run(); + this.done.countDown(); + } + } + +} From 434d38a5a846aec2347ab5a9bae20be62328cd75 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Thu, 21 Feb 2013 00:25:58 +0000 Subject: [PATCH 2/2] HBASE-7883 Update memstore size when removing the entries in append operation (Himanshu) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1448480 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/MemStore.java | 4 ++- .../hbase/regionserver/TestMemStore.java | 30 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 40b8c7c1848..b7b8df46806 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -586,7 +586,9 @@ public class MemStore implements HeapSize { // which means we can prove that no scanner will see this version // false means there was a change, so give us the size. - addedSize -= heapSizeChange(cur, true); + long delta = heapSizeChange(cur, true); + addedSize -= delta; + this.size.addAndGet(-delta); it.remove(); } else { versionsVisible++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index eb3c3de2c97..478c6e57b92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -846,6 +846,36 @@ public class TestMemStore extends TestCase { Integer.toString(i2)); } + /** + * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased + * as older keyvalues are deleted from the memstore. + * @throws Exception + */ + public void testUpsertMemstoreSize() throws Exception { + Configuration conf = HBaseConfiguration.create(); + memstore = new MemStore(conf, KeyValue.COMPARATOR); + long oldSize = memstore.size.get(); + + List l = new ArrayList(); + KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); + KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); + KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); + + kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1); + l.add(kv1); l.add(kv2); l.add(kv3); + + this.memstore.upsert(l, 2);// readpoint is 2 + long newSize = this.memstore.size.get(); + assert(newSize > oldSize); + + KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); + kv4.setMvccVersion(1); + l.clear(); l.add(kv4); + this.memstore.upsert(l, 3); + assertEquals(newSize, this.memstore.size.get()); + //this.memstore = null; + } + /** * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to.