HBASE-7725: Add ability to create custom compaction request

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1448449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jyates 2013-02-20 22:40:55 +00:00
parent a6f8131f9d
commit 1a766c42a1
12 changed files with 483 additions and 169 deletions

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
@ -134,29 +136,64 @@ public abstract class BaseRegionObserver implements RegionObserver {
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final List<StoreFile> candidates) throws IOException { }
@Override
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final List<StoreFile> candidates, final CompactionRequest request)
throws IOException {
preCompactSelection(c, store, candidates);
}
@Override
public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected) { }
@Override
public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected, CompactionRequest request) {
postCompactSelection(c, store, selected);
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
final Store store, final InternalScanner scanner, final ScanType scanType)
throws IOException {
throws IOException {
return scanner;
}
@Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s) throws IOException {
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
final Store store, final InternalScanner scanner, final ScanType scanType,
CompactionRequest request) throws IOException {
return preCompact(e, store, scanner, scanType);
}
@Override
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
final InternalScanner s) throws IOException {
return null;
}
@Override
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
final InternalScanner s, CompactionRequest request) throws IOException {
return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
final StoreFile resultFile) throws IOException {
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
final StoreFile resultFile, CompactionRequest request) throws IOException {
postCompact(e, store, resultFile);
}
@Override
public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final Result result)
@ -351,4 +388,4 @@ public abstract class BaseRegionObserver implements RegionObserver {
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
return hasLoaded;
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -126,87 +127,184 @@ public interface RegionObserver extends Coprocessor {
final StoreFile resultFile) throws IOException;
/**
* Called prior to selecting the {@link StoreFile}s to compact from the list
* of available candidates. To alter the files used for compaction, you may
* mutate the passed in list of candidates.
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
* available candidates. To alter the files used for compaction, you may mutate the passed in list
* of candidates.
* @param c the environment provided by the region server
* @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction
* @param request custom compaction request
* @throws IOException if an error occurred on the coprocessor
*/
void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final List<StoreFile> candidates, final CompactionRequest request)
throws IOException;
/**
* Called prior to selecting the {@link StoreFile}s to compact from the list of available
* candidates. To alter the files used for compaction, you may mutate the passed in list of
* candidates.
* @param c the environment provided by the region server
* @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction
* @throws IOException if an error occurred on the coprocessor
* @deprecated Use {@link #preCompactSelection(ObserverContext, Store, List, Object)} instead
*/
void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final List<StoreFile> candidates) throws IOException;
/**
* Called after the {@link StoreFile}s to compact have been selected from the
* available candidates.
* Called after the {@link StoreFile}s to compact have been selected from the available
* candidates.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param selected the store files selected to compact
* @param request custom compaction request
*/
void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected, CompactionRequest request);
/**
* Called after the {@link StoreFile}s to compact have been selected from the available
* candidates.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param selected the store files selected to compact
* @param compactionAttributes custom attributes for the compaction
* @deprecated use {@link #postCompactSelection(ObserverContext, Store, ImmutableList, Object)}
* instead.
*/
@Deprecated
void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected);
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into
* a new {@code StoreFile}. To override or modify the compaction process,
* implementing classes have two options:
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile}. To override or modify the compaction process, implementing classes have two
* options:
* <ul>
* <li>Wrap the provided {@link InternalScanner} with a custom
* implementation that is returned from this method. The custom scanner
* can then inspect {@link KeyValue}s from the wrapped scanner, applying
* its own policy to what gets written.</li>
* <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()}
* and provide a custom implementation for writing of new
* {@link StoreFile}s. <strong>Note: any implementations bypassing
* core compaction using this approach must write out new store files
* themselves or the existing data will no longer be available after
* compaction.</strong></li>
* <li>Wrap the provided {@link InternalScanner} with a custom implementation that is returned
* from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped
* scanner, applying its own policy to what gets written.</li>
* <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
* custom implementation for writing of new {@link StoreFile}s. <strong>Note: any implementations
* bypassing core compaction using this approach must write out new store files themselves or the
* existing data will no longer be available after compaction.</strong></li>
* </ul>
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanner the scanner over existing data used in the store file
* rewriting
* @param scanner the scanner over existing data used in the store file rewriting
* @param scanType type of Scan
* @return the scanner to use during compaction. Should not be {@code null}
* unless the implementation is writing new store files on its own.
* @param request the requested compaction
* @return the scanner to use during compaction. Should not be {@code null} unless the
* implementation is writing new store files on its own.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final InternalScanner scanner,
final ScanType scanType) throws IOException;
final Store store, final InternalScanner scanner, final ScanType scanType,
CompactionRequest request) throws IOException;
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into
* a new {@code StoreFile} and prior to creating the scanner used to read the
* input files. To override or modify the compaction process,
* implementing classes can return a new scanner to provide the KeyValues to be
* stored into the new {@code StoreFile} or null to perform the default processing.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile}. To override or modify the compaction process, implementing classes have two
* options:
* <ul>
* <li>Wrap the provided {@link InternalScanner} with a custom implementation that is returned
* from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped
* scanner, applying its own policy to what gets written.</li>
* <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
* custom implementation for writing of new {@link StoreFile}s. <strong>Note: any implementations
* bypassing core compaction using this approach must write out new store files themselves or the
* existing data will no longer be available after compaction.</strong></li>
* </ul>
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanner the scanner over existing data used in the store file rewriting
* @param scanType type of Scan
* @param request the requested compaction
* @return the scanner to use during compaction. Should not be {@code null} unless the
* implementation is writing new store files on its own.
* @throws IOException if an error occurred on the coprocessor
* @deprecated use
* {@link #preCompact(ObserverContext, Store, InternalScanner, ScanType, CompactionRequest)}
* instead
*/
@Deprecated
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException;
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile} and prior to creating the scanner used to read the input files. To override
* or modify the compaction process, implementing classes can return a new scanner to provide the
* KeyValues to be stored into the new {@code StoreFile} or null to perform the default
* processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanners the list {@link StoreFileScanner}s to be read from
* @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved
* store files
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
* files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @return the scanner to use during compaction. {@code null} if the default implementation
* is to be used.
* @param request the requested compaction
* @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s, CompactionRequest request)
throws IOException;
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile} and prior to creating the scanner used to read the input files. To override
* or modify the compaction process, implementing classes can return a new scanner to provide the
* KeyValues to be stored into the new {@code StoreFile} or null to perform the default
* processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanners the list {@link StoreFileScanner}s to be read from
* @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
* files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param request the requested compaction
* @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used.
* @throws IOException if an error occurred on the coprocessor
* @deprecated Use
* {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
* instead.
*/
@Deprecated
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s) throws IOException;
/**
* Called after compaction has completed and the new store file has been
* moved in to place.
* Called after compaction has completed and the new store file has been moved in to place.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param resultFile the new store file written out during compaction
* @param request the requested compaction
* @throws IOException if an error occurred on the coprocessor
*/
void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
StoreFile resultFile, CompactionRequest request) throws IOException;
/**
* Called after compaction has completed and the new store file has been moved in to place.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param resultFile the new store file written out during compaction
* @throws IOException if an error occurred on the coprocessor
* @deprecated Use {@link #postCompact(ObserverContext, Store, StoreFile, CompactionRequest)}
* instead
*/
@Deprecated
void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
StoreFile resultFile) throws IOException;

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
@ -183,23 +185,41 @@ public class CompactSplitThread implements CompactionRequestor {
}
}
public synchronized void requestCompaction(final HRegion r,
final String why) throws IOException {
for (Store s : r.getStores().values()) {
requestCompaction(r, s, why, Store.NO_PRIORITY);
}
@Override
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
throws IOException {
return requestCompaction(r, why, null);
}
public synchronized void requestCompaction(final HRegion r, final Store s,
final String why) throws IOException {
requestCompaction(r, s, why, Store.NO_PRIORITY);
@Override
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
List<CompactionRequest> requests) throws IOException {
return requestCompaction(r, why, Store.NO_PRIORITY, requests);
}
public synchronized void requestCompaction(final HRegion r, final String why,
int p) throws IOException {
for (Store s : r.getStores().values()) {
requestCompaction(r, s, why, p);
@Override
public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
final String why, CompactionRequest request) throws IOException {
return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
}
@Override
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
int p, List<CompactionRequest> requests) throws IOException {
// not a special compaction request, so make our own list
List<CompactionRequest> ret;
if (requests == null) {
ret = new ArrayList<CompactionRequest>(r.getStores().size());
for (Store s : r.getStores().values()) {
ret.add(requestCompaction(r, s, why, p, null));
}
} else {
ret = new ArrayList<CompactionRequest>(requests.size());
for (CompactionRequest request : requests) {
requests.add(requestCompaction(r, request.getStore(), why, p, request));
}
}
return ret;
}
/**
@ -207,13 +227,15 @@ public class CompactSplitThread implements CompactionRequestor {
* @param s Store to request compaction on
* @param why Why compaction requested -- used in debug messages
* @param priority override the default priority (NO_PRIORITY == decide)
* @param request custom compaction request. Can be <tt>null</tt> in which case a simple
* compaction will be used.
*/
public synchronized void requestCompaction(final HRegion r, final Store s,
final String why, int priority) throws IOException {
public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
final String why, int priority, CompactionRequest request) throws IOException {
if (this.server.isStopped()) {
return;
return null;
}
CompactionRequest cr = s.requestCompaction(priority);
CompactionRequest cr = s.requestCompaction(priority, request);
if (cr != null) {
cr.setServer(server);
if (priority != Store.NO_PRIORITY) {
@ -234,6 +256,7 @@ public class CompactSplitThread implements CompactionRequestor {
" because compaction request was cancelled");
}
}
return cr;
}
/**

View File

@ -19,42 +19,73 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@InterfaceAudience.Private
public interface CompactionRequestor {
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
* compactions were started
* @throws IOException
*/
public void requestCompaction(final HRegion r, final String why) throws IOException;
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
* @throws IOException
*/
public void requestCompaction(final HRegion r, final Store s, final String why)
public List<CompactionRequest> requestCompaction(final HRegion r, final String why)
throws IOException;
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
* @param requests custom compaction requests. Each compaction must specify the store on which it
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
* stores for the region.
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
* compactions were started
* @throws IOException
*/
public void requestCompaction(final HRegion r, final String why, int pri) throws IOException;
public List<CompactionRequest> requestCompaction(final HRegion r, final String why,
List<CompactionRequest> requests)
throws IOException;
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
* @param request custom compaction request for the {@link HRegion} and {@link Store}. Custom
* request must be <tt>null</tt> or be constructed with matching region and store.
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started.
* @throws IOException
*/
public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why,
CompactionRequest request) throws IOException;
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
* @param requests custom compaction requests. Each compaction must specify the store on which it
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
* stores for the region.
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
* compactions were started.
* @throws IOException
*/
public List<CompactionRequest> requestCompaction(final HRegion r, final String why, int pri,
List<CompactionRequest> requests) throws IOException;
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
* @param request custom compaction request to run. {@link Store} and {@link HRegion} for the
* request must match the region and store specified here.
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
* @throws IOException
*/
public void requestCompaction(final HRegion r, final Store s,
final String why, int pri) throws IOException;
public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why,
int pri, CompactionRequest request) throws IOException;
}

View File

@ -1348,17 +1348,17 @@ public class HRegionServer implements ClientProtocol,
try {
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests compaction");
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests compaction", null);
} else if (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY ||
majorCompactPriority > r.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests major compaction; use default priority");
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > r.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use default priority", null);
} else {
this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests major compaction; use configured priority",
this.majorCompactPriority);
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
this.majorCompactPriority, null);
}
}
} catch (IOException e) {
@ -1665,7 +1665,7 @@ public class HRegionServer implements ClientProtocol,
// Do checks to see if we need to compact (references or too many files)
for (Store s : r.getStores().values()) {
if (s.hasReferences() || s.needsCompaction()) {
getCompactionRequester().requestCompaction(r, s, "Opening Region");
getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
}
}
long openSeqNum = r.getOpenSeqNum();
@ -3630,10 +3630,10 @@ public class HRegionServer implements ClientProtocol,
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
if(family != null) {
compactSplitThread.requestCompaction(region, store, log,
Store.PRIORITY_USER);
Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log,
Store.PRIORITY_USER);
Store.PRIORITY_USER, null);
}
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
@ -4034,4 +4034,11 @@ public class HRegionServer implements ClientProtocol,
String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
}
/**
* @return the underlying {@link CompactSplitThread} for the servers
*/
public CompactSplitThread getCompactSplitThread() {
return this.compactSplitThread;
}
}

View File

@ -1090,14 +1090,13 @@ public class HStore implements Store {
List<StoreFile> sfs = new ArrayList<StoreFile>();
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
List<Path> newFiles =
this.compactor.compact(filesToCompact, cr.isMajor());
List<Path> newFiles = this.compactor.compact(cr);
// Move the compaction into place.
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
for (Path newFile: newFiles) {
StoreFile sf = completeCompaction(filesToCompact, newFile);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
region.getCoprocessorHost().postCompact(this, sf, cr);
}
sfs.add(sf);
}
@ -1181,13 +1180,12 @@ public class HStore implements Store {
try {
// Ready to go. Have list of files to compact.
List<Path> newFiles =
this.compactor.compact(filesToCompact, isMajor);
List<Path> newFiles = this.compactor.compactForTesting(filesToCompact, isMajor);
for (Path newFile: newFiles) {
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, newFile);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
region.getCoprocessorHost().postCompact(this, sf, null);
}
}
} finally {
@ -1219,17 +1217,19 @@ public class HStore implements Store {
return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles());
}
@Override
public CompactionRequest requestCompaction() throws IOException {
return requestCompaction(Store.NO_PRIORITY);
return requestCompaction(Store.NO_PRIORITY, null);
}
public CompactionRequest requestCompaction(int priority) throws IOException {
@Override
public CompactionRequest requestCompaction(int priority, CompactionRequest request)
throws IOException {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
}
CompactionRequest ret = null;
this.lock.readLock().lock();
try {
List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
@ -1238,7 +1238,7 @@ public class HStore implements Store {
candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
boolean override = false;
if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(this, candidates);
override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
}
CompactSelection filesToCompact;
if (override) {
@ -1257,7 +1257,7 @@ public class HStore implements Store {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
}
// no files to compact
@ -1287,15 +1287,24 @@ public class HStore implements Store {
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
//not a special compaction request, so we need to make one
if(request == null){
request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
}else{
//update the request with what the system thinks the request should be
//its up to the request if it wants to listen
request.setSelection(filesToCompact);
request.setIsMajor(isMajor);
request.setPriority(pri);
}
}
} finally {
this.lock.readLock().unlock();
}
if (ret != null) {
this.region.reportCompactionRequestStart(ret.isMajor());
if (request != null) {
this.region.reportCompactionRequestStart(request.isMajor());
}
return ret;
return request;
}
public void finishRequest(CompactionRequest cr) {

View File

@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@ -345,11 +347,10 @@ public class RegionCoprocessorHost
/**
* See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner)}
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
*/
public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long earliestPutTs) throws IOException {
ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
InternalScanner s = null;
for (RegionEnvironment env: coprocessors) {
@ -357,7 +358,7 @@ public class RegionCoprocessorHost
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
scanType, earliestPutTs, s);
scanType, earliestPutTs, s, request);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
}
@ -370,22 +371,23 @@ public class RegionCoprocessorHost
}
/**
* Called prior to selecting the {@link StoreFile}s for compaction from
* the list of currently available candidates.
* Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
* available candidates.
* @param store The store where compaction is being requested
* @param candidates The currently available store files
* @param request custom compaction request
* @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException
*/
public boolean preCompactSelection(Store store, List<StoreFile> candidates) throws IOException {
public boolean preCompactSelection(Store store, List<StoreFile> candidates,
CompactionRequest request) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver)env.getInstance()).preCompactSelection(
ctx, store, candidates);
((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
@ -400,20 +402,20 @@ public class RegionCoprocessorHost
}
/**
* Called after the {@link StoreFile}s to be compacted have been selected
* from the available candidates.
* Called after the {@link StoreFile}s to be compacted have been selected from the available
* candidates.
* @param store The store where compaction is being requested
* @param selected The store files selected to compact
* @param request custom compaction
*/
public void postCompactSelection(Store store,
ImmutableList<StoreFile> selected) {
public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
CompactionRequest request) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver)env.getInstance()).postCompactSelection(
ctx, store, selected);
((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request);
} catch (Throwable e) {
handleCoprocessorThrowableNoRethrow(env,e);
}
@ -429,18 +431,19 @@ public class RegionCoprocessorHost
* @param store the store being compacted
* @param scanner the scanner used to read store data during compaction
* @param scanType type of Scan
* @param request the compaction that will be executed
* @throws IOException
*/
public InternalScanner preCompact(Store store, InternalScanner scanner,
ScanType scanType) throws IOException {
public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
CompactionRequest request) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
scanner = ((RegionObserver)env.getInstance()).preCompact(
ctx, store, scanner, scanType);
scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, scanType,
request);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
}
@ -457,15 +460,17 @@ public class RegionCoprocessorHost
* Called after the store compaction has completed.
* @param store the store being compacted
* @param resultFile the new store file written during compaction
* @param request the compaction that is being executed
* @throws IOException
*/
public void postCompact(Store store, StoreFile resultFile) throws IOException {
public void postCompact(Store store, StoreFile resultFile, CompactionRequest request)
throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver)env.getInstance()).postCompact(ctx, store, resultFile);
((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}

View File

@ -160,7 +160,8 @@ public interface Store extends HeapSize, StoreConfigInformation {
public CompactionRequest requestCompaction() throws IOException;
public CompactionRequest requestCompaction(int priority) throws IOException;
public CompactionRequest requestCompaction(int priority, CompactionRequest request)
throws IOException;
public void finishRequest(CompactionRequest cr);

View File

@ -19,20 +19,22 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
@ -46,35 +48,51 @@ import com.google.common.collect.Collections2;
/**
* This class holds all details necessary to run a compaction.
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate({ "coprocessor" })
@InterfaceStability.Evolving
public class CompactionRequest implements Comparable<CompactionRequest>,
Runnable {
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion region;
private final HStore store;
private final CompactSelection compactSelection;
private final long totalSize;
private final boolean isMajor;
private CompactSelection compactSelection;
private long totalSize;
private boolean isMajor;
private int priority;
private final Long timeInNanos;
private HRegionServer server = null;
public CompactionRequest(HRegion region, HStore store,
CompactSelection files, boolean isMajor, int priority) {
Preconditions.checkNotNull(region);
Preconditions.checkNotNull(files);
public static CompactionRequest getRequestForTesting(Collection<StoreFile> selection,
boolean isMajor) {
return new CompactionRequest(null, null, new CompactSelection(new ArrayList<StoreFile>(
selection)), isMajor, 0, System.nanoTime());
}
/**
* Constructor for a custom compaction. Uses the setXXX methods to update the state of the
* compaction before being used.
*/
public CompactionRequest(HRegion region, HStore store, int priority) {
this(region, store, null, false, priority, System
.nanoTime());
}
public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) {
// delegate to the internal constructor after checking basic preconditions
this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System
.nanoTime());
}
private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor,
int priority, long startTime) {
this.region = region;
this.store = store;
this.compactSelection = files;
long sz = 0;
for (StoreFile sf : files.getFilesToCompact()) {
sz += sf.getReader().length();
}
this.totalSize = sz;
this.isMajor = isMajor;
this.priority = priority;
this.timeInNanos = System.nanoTime();
this.timeInNanos = startTime;
if (files != null) {
this.setSelection(files);
}
}
/**
@ -162,6 +180,28 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
this.server = hrs;
}
/**
* Set the files (and, implicitly, the size of the compaction based on those files)
* @param files files that should be included in the compaction
*/
public void setSelection(CompactSelection files) {
long sz = 0;
for (StoreFile sf : files.getFilesToCompact()) {
sz += sf.getReader().length();
}
this.totalSize = sz;
this.compactSelection = files;
}
/**
* Specify if this compaction should be a major compaction based on the state of the store
* @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
* compaction
*/
public void setIsMajor(boolean isMajor) {
this.isMajor = isMajor;
}
@Override
public String toString() {
String fsList = Joiner.on(", ").join(
@ -200,12 +240,11 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
if (completed) {
// degenerate case: blocked regions require recursive enqueues
if (store.getCompactPriority() <= 0) {
server.compactSplitThread
.requestCompaction(region, store, "Recursive enqueue");
} else {
// see if the compaction has caused us to exceed max region size
server.compactSplitThread.requestSplit(region);
}
server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null);
} else {
// see if the compaction has caused us to exceed max region size
server.getCompactSplitThread().requestSplit(region);
}
}
} catch (IOException ex) {
LOG.error("Compaction failed " + this, RemoteExceptionHandler
@ -234,4 +273,4 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
}
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -41,15 +42,28 @@ public abstract class Compactor {
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and
* nothing made it through the compaction.
* @param request the requested compaction
* @return Product of compaction or an empty list if all cells expired or deleted and nothing made
* it through the compaction.
* @throws IOException
*/
public abstract List<Path> compact(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction) throws IOException;
public abstract List<Path> compact(final CompactionRequest request) throws IOException;
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
* generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and nothing made
* it through the compaction.
* @throws IOException
*/
public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
throws IOException {
return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor));
}
public CompactionProgress getProgress() {
return this.progress;

View File

@ -59,16 +59,12 @@ public class DefaultCompactor extends Compactor {
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and
* nothing made it through the compaction.
* @throws IOException
*/
@SuppressWarnings("deprecation")
public List<Path> compact(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction) throws IOException {
@Override
public List<Path> compact(final CompactionRequest request) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
boolean majorCompaction = request.isMajor();
// Max-sequenceID is the last key in the files we're compacting
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
@ -139,7 +135,8 @@ public class DefaultCompactor extends Compactor {
scanner = store
.getCoprocessorHost()
.preCompactScannerOpen(store, scanners,
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs,
request);
}
ScanType scanType = majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT;
if (scanner == null) {
@ -150,11 +147,11 @@ public class DefaultCompactor extends Compactor {
scanType, smallestReadPoint, earliestPutTs);
}
if (store.getCoprocessorHost() != null) {
InternalScanner cpScanner =
store.getCoprocessorHost().preCompact(store, scanner, scanType);
InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner,
scanType, request);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return newFiles; // an empty list
return newFiles; // an empty list
}
scanner = cpScanner;
}

View File

@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -47,13 +48,17 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.*;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -591,7 +596,7 @@ public class TestCompaction extends HBaseTestCase {
Collection<StoreFile> storeFiles = store.getStorefiles();
Compactor tool = store.compactor;
List<Path> newFiles = tool.compact(storeFiles, false);
List<Path> newFiles = tool.compactForTesting(storeFiles, false);
// Now lets corrupt the compacted file.
FileSystem fs = FileSystem.get(conf);
@ -630,7 +635,7 @@ public class TestCompaction extends HBaseTestCase {
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null);
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"System-requested major compaction should not occur if there are too many store files",
@ -648,7 +653,7 @@ public class TestCompaction extends HBaseTestCase {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null);
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"User-requested major compaction should always occur, even if there are too many store files",
@ -656,5 +661,53 @@ public class TestCompaction extends HBaseTestCase {
request.isMajor());
}
}
/**
* Create a custom compaction request and be sure that we can track it through the queue, knowing
* when the compaction is completed.
*/
public void testTrackingCompactionRequest() throws Exception {
// setup a compact/split thread on a mock server
HRegionServer mockServer = Mockito.mock(HRegionServer.class);
Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
CompactSplitThread thread = new CompactSplitThread(mockServer);
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
// setup a region/store with some files
Store store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r);
}
CountDownLatch latch = new CountDownLatch(1);
TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch);
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
// wait for the latch to complete.
latch.await();
thread.interruptIfNecessary();
}
/**
* Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
*/
public static class TrackableCompactionRequest extends CompactionRequest {
private CountDownLatch done;
/**
* Constructor for a custom compaction. Uses the setXXX methods to update the state of the
* compaction before being used.
*/
public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) {
super(region, store, Store.PRIORITY_USER);
this.done = finished;
}
@Override
public void run() {
super.run();
this.done.countDown();
}
}
}