HBASE-3842 Refactor RegionObserver compaction API for easier overriding of policy

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1165793 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2011-09-06 19:21:07 +00:00
parent ab61758985
commit 5661aada9b
10 changed files with 344 additions and 41 deletions

View File

@ -443,6 +443,7 @@ Release 0.91.0 - Unreleased
HBASE-1730 Online Schema Changes
HBASE-4206 jenkins hash implementation uses longs unnecessarily
(Ron Yang)
HBASE-3842 Refactor Coprocessor Compaction API
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@ -33,6 +34,8 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -78,12 +81,22 @@ public abstract class BaseRegionObserver implements RegionObserver {
HRegion l, HRegion r) { }
@Override
public void preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
boolean willSplit) { }
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final List<StoreFile> candidates) { }
@Override
public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected) { }
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
final Store store, final InternalScanner scanner) {
return scanner;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
boolean willSplit) { }
final Store store, final StoreFile resultFile) { }
@Override
public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> e,

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@ -33,6 +34,8 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -69,22 +72,61 @@ public interface RegionObserver extends Coprocessor {
void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
* Called before compaction.
* Called prior to selecting the {@link StoreFile}s to compact from the list
* of available candidates. To alter the files used for compaction, you may
* mutate the passed in list of candidates.
* @param c the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
* @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction
*/
void preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final boolean willSplit);
void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final List<StoreFile> candidates);
/**
* Called after compaction.
* Called after the {@link StoreFile}s to compact have been selected from the
* available candidates.
* @param c the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
* @param store the store being compacted
* @param selected the store files selected to compact
*/
void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected);
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into
* a new {@code StoreFile}. To override or modify the compaction process,
* implementing classes have two options:
* <ul>
* <li>Wrap the provided {@link InternalScanner} with a custom
* implementation that is returned from this method. The custom scanner
* can then inspect {@link KeyValue}s from the wrapped scanner, applying
* its own policy to what gets written.</li>
* <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()}
* and provide a custom implementation for writing of new
* {@link StoreFile}s. <strong>Note: any implementations bypassing
* core compaction using this approach must write out new store files
* themselves or the existing data will no longer be available after
* compaction.</strong></li>
* </ul>
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanner the scanner over existing data used in the store file
* rewriting
* @return the scanner to use during compaction. Should not be {@code null}
* unless the implementation is writing new store files on its own.
*/
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final InternalScanner scanner);
/**
* Called after compaction has completed and the new store file has been
* moved in to place.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param resultFile the new store file written out during compaction
*/
void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final boolean willSplit);
final Store store, StoreFile resultFile);
/**
* Called before the region is split.

View File

@ -937,10 +937,6 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Skipping compaction on " + this + " because closed");
return false;
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor preCompact hooks");
coprocessorHost.preCompact(false);
}
boolean decr = true;
try {
synchronized (writestate) {
@ -976,10 +972,6 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-compact hooks");
coprocessorHost.postCompact(false);
}
status.markComplete("Compaction complete");
return true;
} finally {

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -257,15 +258,43 @@ public class RegionCoprocessorHost
}
/**
* Invoked before a region is compacted.
* @param willSplit true if the compaction is about to trigger a split
* Called prior to selecting the {@link StoreFile}s for compaction from
* the list of currently available candidates.
* @param store The store where compaction is being requested
* @param candidates The currently available store files
* @return If {@code true}, skip the normal selection process and use the current list
*/
public void preCompact(boolean willSplit) {
public boolean preCompactSelection(Store store, List<StoreFile> candidates) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
((RegionObserver)env.getInstance()).preCompactSelection(
ctx, store, candidates);
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass;
}
/**
* Called after the {@link StoreFile}s to be compacted have been selected
* from the available candidates.
* @param store The store where compaction is being requested
* @param selected The store files selected to compact
*/
public void postCompactSelection(Store store,
ImmutableList<StoreFile> selected) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
((RegionObserver)env.getInstance()).preCompact(ctx, willSplit);
((RegionObserver)env.getInstance()).postCompactSelection(
ctx, store, selected);
if (ctx.shouldComplete()) {
break;
}
@ -274,15 +303,38 @@ public class RegionCoprocessorHost
}
/**
* Invoked after a region is compacted.
* @param willSplit true if the compaction is about to trigger a split
* Called prior to rewriting the store files selected for compaction
* @param store the store being compacted
* @param scanner the scanner used to read store data during compaction
*/
public void postCompact(boolean willSplit) {
public InternalScanner preCompact(Store store, InternalScanner scanner) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
scanner = ((RegionObserver)env.getInstance()).preCompact(
ctx, store, scanner);
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass ? null : scanner;
}
/**
* Called after the store compaction has completed.
* @param store the store being compacted
* @param resultFile the new store file written during compaction
*/
public void postCompact(Store store, StoreFile resultFile) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
((RegionObserver)env.getInstance()).postCompact(ctx, willSplit);
((RegionObserver)env.getInstance()).postCompact(ctx, store, resultFile);
if (ctx.shouldComplete()) {
break;
}

View File

@ -684,6 +684,9 @@ public class Store implements HeapSize {
maxId);
// Move the compaction into place.
sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
}
} finally {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact);
@ -739,7 +742,10 @@ public class Store implements HeapSize {
// Ready to go. Have list of files to compact.
StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
// Move the compaction into place.
completeCompaction(filesToCompact, writer);
StoreFile sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
}
} finally {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact);
@ -900,7 +906,24 @@ public class Store implements HeapSize {
Preconditions.checkArgument(idx != -1);
candidates.subList(0, idx + 1).clear();
}
List<StoreFile> filesToCompact = compactSelection(candidates);
boolean override = false;
if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(
this, candidates);
}
List<StoreFile> filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
filesToCompact = candidates;
} else {
filesToCompact = compactSelection(candidates);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
ImmutableList.copyOf(filesToCompact));
}
// no files to compact
if (filesToCompact.isEmpty()) {
@ -1121,6 +1144,17 @@ public class Store implements HeapSize {
scan.setMaxVersions(family.getMaxVersions());
/* include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
if (region.getCoprocessorHost() != null) {
InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
this, scanner);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return null;
}
scanner = cpScanner;
}
int bytesWritten = 0;
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Arrays;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
@ -38,6 +39,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -57,6 +60,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
boolean hadPostFlush;
boolean hadPreSplit;
boolean hadPostSplit;
boolean hadPreCompactSelect;
boolean hadPostCompactSelect;
boolean hadPreCompact;
boolean hadPostCompact;
boolean hadPreGet = false;
@ -135,15 +140,34 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
public void preCompact(ObserverContext<RegionCoprocessorEnvironment> c, boolean willSplit) {
hadPreCompact = true;
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<StoreFile> candidates) {
hadPreCompactSelect = true;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, boolean willSplit) {
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, ImmutableList<StoreFile> selected) {
hadPostCompactSelect = true;
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, InternalScanner scanner) {
hadPreCompact = true;
return scanner;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile) {
hadPostCompact = true;
}
public boolean wasCompacted() {
return hadPreCompact && hadPostCompact;
}
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan,
@ -188,10 +212,6 @@ public class SimpleRegionObserver extends BaseRegionObserver {
hadPostScannerClose = true;
}
public boolean wasCompacted() {
return hadPreCompact && hadPostCompact;
}
@Override
public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
final List<KeyValue> results) throws IOException {

View File

@ -140,8 +140,13 @@ public class TestClassLoading {
// only add hbase classes to classpath. This is a little bit tricky: assume
// the classpath is {hbaseSrc}/target/classes.
String currentDir = new File(".").getAbsolutePath();
options.add(currentDir + Path.SEPARATOR + "target"+ Path.SEPARATOR +
"classes");
String classpath =
currentDir + Path.SEPARATOR + "target"+ Path.SEPARATOR + "classes" +
System.getProperty("path.separator") +
System.getProperty("surefire.test.class.path");
options.add(classpath);
LOG.debug("Setting classpath to: "+classpath);
JavaCompiler.CompilationTask task = compiler.getTask(null, fm, null,
options, null, cu);
assertTrue("Compile file " + sourceCodeFile + " failed.", task.call());

View File

@ -36,10 +36,13 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.Server;
@ -130,11 +133,14 @@ public class TestCoprocessorInterface extends HBaseTestCase {
postCloseCalled = true;
}
@Override
public void preCompact(ObserverContext<RegionCoprocessorEnvironment> e, boolean willSplit) {
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, InternalScanner scanner) {
preCompactCalled = true;
return scanner;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, boolean willSplit) {
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile) {
postCompactCalled = true;
}
@Override

View File

@ -22,18 +22,28 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.AfterClass;
@ -241,6 +251,134 @@ public class TestRegionObserverInterface {
util.deleteTable(tableName);
}
/* Overrides compaction to only output rows with keys that are even numbers */
public static class EvenOnlyCompactor extends BaseRegionObserver {
long lastCompaction;
long lastFlush;
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, final InternalScanner scanner) {
return new InternalScanner() {
@Override
public boolean next(List<KeyValue> results) throws IOException {
return next(results, -1);
}
@Override
public boolean next(List<KeyValue> results, int limit) throws IOException {
List<KeyValue> internalResults = new ArrayList<KeyValue>();
boolean hasMore;
do {
hasMore = scanner.next(internalResults, limit);
if (!internalResults.isEmpty()) {
long row = Bytes.toLong(internalResults.get(0).getRow());
if (row % 2 == 0) {
// return this row
break;
}
// clear and continue
internalResults.clear();
}
} while (hasMore);
if (!internalResults.isEmpty()) {
results.addAll(internalResults);
}
return hasMore;
}
@Override
public void close() throws IOException {
scanner.close();
}
};
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile) {
lastCompaction = EnvironmentEdgeManager.currentTimeMillis();
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
lastFlush = EnvironmentEdgeManager.currentTimeMillis();
}
}
/**
* Tests overriding compaction handling via coprocessor hooks
* @throws Exception
*/
@Test
public void testCompactionOverride() throws Exception {
byte[] compactTable = Bytes.toBytes("TestCompactionOverride");
HBaseAdmin admin = util.getHBaseAdmin();
if (admin.tableExists(compactTable)) {
admin.disableTable(compactTable);
admin.deleteTable(compactTable);
}
HTableDescriptor htd = new HTableDescriptor(compactTable);
htd.addFamily(new HColumnDescriptor(A));
htd.addCoprocessor(EvenOnlyCompactor.class.getName());
admin.createTable(htd);
HTable table = new HTable(util.getConfiguration(), compactTable);
for (long i=1; i<=10; i++) {
byte[] iBytes = Bytes.toBytes(i);
Put p = new Put(iBytes);
p.add(A, A, iBytes);
table.put(p);
}
HRegion firstRegion = cluster.getRegions(compactTable).get(0);
Coprocessor cp = firstRegion.getCoprocessorHost().findCoprocessor(
EvenOnlyCompactor.class.getName());
assertNotNull("EvenOnlyCompactor coprocessor should be loaded", cp);
EvenOnlyCompactor compactor = (EvenOnlyCompactor)cp;
// force a compaction
long ts = System.currentTimeMillis();
admin.flush(compactTable);
// wait for flush
for (int i=0; i<10; i++) {
if (compactor.lastFlush >= ts) {
break;
}
Thread.sleep(1000);
}
assertTrue("Flush didn't complete", compactor.lastFlush >= ts);
LOG.debug("Flush complete");
ts = compactor.lastFlush;
admin.majorCompact(compactTable);
// wait for compaction
for (int i=0; i<30; i++) {
if (compactor.lastCompaction >= ts) {
break;
}
Thread.sleep(1000);
}
LOG.debug("Last compaction was at "+compactor.lastCompaction);
assertTrue("Compaction didn't complete", compactor.lastCompaction >= ts);
// only even rows should remain
ResultScanner scanner = table.getScanner(new Scan());
try {
for (long i=2; i<=10; i+=2) {
Result r = scanner.next();
assertNotNull(r);
assertFalse(r.isEmpty());
byte[] iBytes = Bytes.toBytes(i);
assertArrayEquals("Row should be "+i, r.getRow(), iBytes);
assertArrayEquals("Value should be "+i, r.getValue(A, A), iBytes);
}
} finally {
scanner.close();
}
}
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class c, String methodName[], byte[] tableName,
Object value[]) throws IOException {