HBASE-6633 Adding new hooks to the split flow - For roll backs and one final hook after split is completed either successfully or failed
Submitted by:Ram Reviewed by:Stack git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1377033 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
567723f028
commit
07218cb039
|
@ -105,6 +105,26 @@ public abstract class BaseRegionObserver implements RegionObserver {
|
|||
@Override
|
||||
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] splitRow) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRollBackSplit(
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCompleteSplit(
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
|
||||
|
|
|
@ -213,8 +213,17 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param c the environment provided by the region server
|
||||
* (e.getRegion() returns the parent region)
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
* @deprecated Use preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
|
||||
*/
|
||||
void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
|
||||
|
||||
/**
|
||||
* Called before the region is split.
|
||||
* @param c the environment provided by the region server
|
||||
* (e.getRegion() returns the parent region)
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the region is split.
|
||||
|
@ -223,10 +232,32 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param l the left daughter region
|
||||
* @param r the right daughter region
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
* @deprecated Use postCompleteSplit() instead
|
||||
*/
|
||||
void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final HRegion l,
|
||||
final HRegion r) throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* This will be called before the roll back of the split region is completed
|
||||
* @param ctx
|
||||
* @throws IOException
|
||||
*/
|
||||
void preRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
|
||||
|
||||
/**
|
||||
* This will be called after the roll back of the split region is completed
|
||||
* @param ctx
|
||||
* @throws IOException
|
||||
*/
|
||||
void postRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after any split request is processed. This will be called irrespective of success or
|
||||
* failure of the split.
|
||||
* @param ctx
|
||||
* @throws IOException
|
||||
*/
|
||||
void postCompleteSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
|
||||
/**
|
||||
* Called before the region is reported as closed to the master.
|
||||
* @param c the environment provided by the region server
|
||||
|
|
|
@ -601,6 +601,27 @@ public class RegionCoprocessorHost
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked just before a split
|
||||
* @throws IOException
|
||||
*/
|
||||
public void preSplit(byte[] splitRow) throws IOException {
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof RegionObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionObserver)env.getInstance()).preSplit(ctx, splitRow);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked just after a split
|
||||
|
@ -624,7 +645,69 @@ public class RegionCoprocessorHost
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Invoked just before the rollback of a failed split is started
|
||||
* @throws IOException
|
||||
*/
|
||||
public void preRollBackSplit() throws IOException {
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionObserver) env.getInstance()).preRollBackSplit(ctx);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked just after the rollback of a failed split is done
|
||||
* @throws IOException
|
||||
*/
|
||||
public void postRollBackSplit() throws IOException {
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionObserver) env.getInstance()).postRollBackSplit(ctx);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked after a split is completed irrespective of a failure or success.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void postCompleteSplit() throws IOException {
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionObserver) env.getInstance()).postCompleteSplit(ctx);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// RegionObserver support
|
||||
|
||||
/**
|
||||
|
|
|
@ -105,6 +105,15 @@ class SplitRequest implements Runnable {
|
|||
.checkIOException(ex));
|
||||
this.server.getMetrics().incrementSplitFailureCount();
|
||||
server.checkFileSystem();
|
||||
} finally {
|
||||
if (this.parent.getCoprocessorHost() != null) {
|
||||
try {
|
||||
this.parent.getCoprocessorHost().postCompleteSplit();
|
||||
} catch (IOException io) {
|
||||
LOG.error("Split failed " + this,
|
||||
RemoteExceptionHandler.checkIOException(io));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,6 +229,11 @@ public class SplitTransaction {
|
|||
if (this.parent.getCoprocessorHost() != null) {
|
||||
this.parent.getCoprocessorHost().preSplit();
|
||||
}
|
||||
|
||||
// Coprocessor callback
|
||||
if (this.parent.getCoprocessorHost() != null) {
|
||||
this.parent.getCoprocessorHost().preSplit(this.splitrow);
|
||||
}
|
||||
|
||||
// If true, no cluster to write meta edits to or to update znodes in.
|
||||
boolean testing = server == null? true:
|
||||
|
@ -727,6 +732,11 @@ public class SplitTransaction {
|
|||
*/
|
||||
public boolean rollback(final Server server, final RegionServerServices services)
|
||||
throws IOException {
|
||||
// Coprocessor callback
|
||||
if (this.parent.getCoprocessorHost() != null) {
|
||||
this.parent.getCoprocessorHost().preRollBackSplit();
|
||||
}
|
||||
|
||||
boolean result = true;
|
||||
FileSystem fs = this.parent.getFilesystem();
|
||||
ListIterator<JournalEntry> iterator =
|
||||
|
@ -793,6 +803,10 @@ public class SplitTransaction {
|
|||
throw new RuntimeException("Unhandled journal entry: " + je);
|
||||
}
|
||||
}
|
||||
// Coprocessor callback
|
||||
if (this.parent.getCoprocessorHost() != null) {
|
||||
this.parent.getCoprocessorHost().postRollBackSplit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -799,6 +799,12 @@ public class AccessController extends BaseRegionObserver
|
|||
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
|
||||
requirePermission(getTableName(e.getEnvironment()), null, null, Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
byte[] splitRow) throws IOException {
|
||||
requirePermission(getTableName(e.getEnvironment()), null, null, Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
|
|
|
@ -140,6 +140,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
private boolean postFlushCalled;
|
||||
private boolean preSplitCalled;
|
||||
private boolean postSplitCalled;
|
||||
private boolean preSplitWithSplitRowCalled;
|
||||
private ConcurrentMap<String, Object> sharedData;
|
||||
|
||||
@Override
|
||||
|
@ -195,6 +196,12 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||
preSplitCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] splitRow) throws IOException {
|
||||
preSplitWithSplitRowCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) {
|
||||
postSplitCalled = true;
|
||||
|
@ -225,7 +232,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
return (preCompactCalled && postCompactCalled);
|
||||
}
|
||||
boolean wasSplit() {
|
||||
return (preSplitCalled && postSplitCalled);
|
||||
return (preSplitCalled && postSplitCalled && preSplitWithSplitRowCalled);
|
||||
}
|
||||
Map<String, Object> getSharedData() {
|
||||
return sharedData;
|
||||
|
|
|
@ -35,6 +35,10 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
|
@ -63,13 +67,19 @@ public class TestSplitTransaction {
|
|||
private static final byte [] GOOD_SPLIT_ROW = new byte [] {'d', 'd', 'd'};
|
||||
private static final byte [] CF = HConstants.CATALOG_FAMILY;
|
||||
|
||||
private static boolean preRollBackCalled = false;
|
||||
private static boolean postRollBackCalled = false;
|
||||
|
||||
@Before public void setup() throws IOException {
|
||||
this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
|
||||
TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName());
|
||||
this.fs.delete(this.testdir, true);
|
||||
this.wal = new HLog(fs, new Path(this.testdir, "logs"),
|
||||
new Path(this.testdir, "archive"),
|
||||
TEST_UTIL.getConfiguration());
|
||||
this.parent = createRegion(this.testdir, this.wal);
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(this.parent, null, TEST_UTIL.getConfiguration());
|
||||
this.parent.setCoprocessorHost(host);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", true);
|
||||
}
|
||||
|
||||
|
@ -280,6 +290,11 @@ public class TestSplitTransaction {
|
|||
assertEquals(rowcount, daughtersRowCount);
|
||||
// Assert the write lock is no longer held on parent
|
||||
assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread());
|
||||
assertTrue("Rollback hooks should be called.", wasRollBackHookCalled());
|
||||
}
|
||||
|
||||
private boolean wasRollBackHookCalled(){
|
||||
return (preRollBackCalled && postRollBackCalled);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -318,6 +333,20 @@ public class TestSplitTransaction {
|
|||
return HRegion.openHRegion(testdir, hri, htd, wal,
|
||||
TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
public static class CustomObserver extends BaseRegionObserver{
|
||||
@Override
|
||||
public void preRollBackSplit(
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
|
||||
preRollBackCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRollBackSplit(
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
|
||||
postRollBackCalled = true;
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
|
|
|
@ -459,6 +459,22 @@ public class TestAccessController {
|
|||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitWithSplitRow() throws Exception {
|
||||
PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preSplit(
|
||||
ObserverContext.createAndPrepare(RCP_ENV, null),
|
||||
Bytes.toBytes("row2"));
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER);
|
||||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testFlush() throws Exception {
|
||||
PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
|
||||
|
|
Loading…
Reference in New Issue