HBASE-11733 Avoid copy-paste in Master/Region CoprocessorHost

This commit is contained in:
Matteo Bertozzi 2014-08-16 08:22:48 +01:00
parent 0a46a638d3
commit 6856e4533e
3 changed files with 1155 additions and 2942 deletions

View File

@ -54,90 +54,107 @@ public class RegionServerCoprocessorHost extends
}
public void preStop(String message) throws IOException {
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
for (RegionServerEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionServerObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionServerObserver) env.getInstance()).preStopRegionServer(ctx);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preStopRegionServer(ctx);
}
@Override
public void postEnvCall(RegionServerEnvironment env) {
// invoke coprocessor stop method
shutdown(env);
}
});
}
public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
boolean bypass = false;
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
for (RegionServerEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionServerObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionServerObserver) env.getInstance()).preMerge(ctx, regionA, regionB);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preMerge(ctx, regionA, regionB);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass;
});
}
public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
throws IOException {
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
for (RegionServerEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionServerObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionServerObserver) env.getInstance()).postMerge(ctx, regionA, regionB, mergedRegion);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
}
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postMerge(ctx, regionA, regionB, mergedRegion);
}
});
}
public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
}
});
}
public void postMergeCommit(final HRegion regionA, final HRegion regionB,
final HRegion mergedRegion) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
}
});
}
public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preRollBackMerge(ctx, regionA, regionB);
}
});
}
public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postRollBackMerge(ctx, regionA, regionB);
}
});
}
private static abstract class CoprocessorOperation
extends ObserverContext<RegionServerCoprocessorEnvironment> {
public CoprocessorOperation() {
}
public abstract void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
public void postEnvCall(RegionServerEnvironment env) {
}
}
private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
if (ctx == null) return false;
boolean bypass = false;
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
for (RegionServerEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionServerObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
ctx.prepare(env);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB,
metaEntries);
ctx.call((RegionServerObserver)env.getInstance(), ctx);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
@ -148,78 +165,11 @@ public class RegionServerCoprocessorHost extends
break;
}
}
ctx.postEnvCall(env);
}
return bypass;
}
public void postMergeCommit(final HRegion regionA, final HRegion regionB,
final HRegion mergedRegion) throws IOException {
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
for (RegionServerEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionServerObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionServerObserver) env.getInstance()).postMergeCommit(ctx, regionA, regionB,
mergedRegion);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
for (RegionServerEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionServerObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionServerObserver) env.getInstance()).preRollBackMerge(ctx, regionA, regionB);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
for (RegionServerEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionServerObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionServerObserver) env.getInstance()).postRollBackMerge(ctx, regionA, regionB);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
/**
* Coprocessor environment extension providing access to region server
* related services.