HBASE-15481 Add pre/post roll to WALObserver

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Matteo Bertozzi 2016-04-13 00:43:39 -05:00 committed by Sean Busbey
parent 543e7081f5
commit ae7e5e29f9
6 changed files with 135 additions and 2 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
@ -70,4 +71,12 @@ public class BaseWALObserver implements WALObserver {
HLogKey logKey, WALEdit logEdit) throws IOException { HLogKey logKey, WALEdit logEdit) throws IOException {
postWALWrite(ctx, info, (WALKey)logKey, logEdit); postWALWrite(ctx, info, (WALKey)logKey, logEdit);
} }
@Override
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
Path oldPath, Path newPath) throws IOException { }
@Override
public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
Path oldPath, Path newPath) throws IOException { }
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
@ -112,4 +113,21 @@ public interface WALObserver extends Coprocessor {
@Deprecated @Deprecated
void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
/**
* Called before rolling the current WAL
* @param oldPath the path of the current wal that we are replacing
* @param newPath the path of the wal we are going to create
*/
void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
Path oldPath, Path newPath) throws IOException;
/**
* Called after rolling the current WAL
* @param oldPath the path of the wal that we replaced
* @param newPath the path of the wal we have created and now is the current
*/
void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
Path oldPath, Path newPath) throws IOException;
} }

View File

@ -473,6 +473,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
*/ */
private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
throws IOException { throws IOException {
coprocessorHost.preWALRoll(oldPath, newPath);
if (!this.listeners.isEmpty()) { if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) { for (WALActionsListener i : this.listeners) {
i.preLogRoll(oldPath, newPath); i.preLogRoll(oldPath, newPath);
@ -490,6 +492,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
i.postLogRoll(oldPath, newPath); i.postLogRoll(oldPath, newPath);
} }
} }
coprocessorHost.postWALRoll(oldPath, newPath);
} }
// public only until class moves to o.a.h.h.wal // public only until class moves to o.a.h.h.wal
@ -911,4 +915,4 @@ public abstract class AbstractFSWAL<W> implements WAL {
*/ */
@VisibleForTesting @VisibleForTesting
abstract int getLogReplication(); abstract int getLogReplication();
} }

View File

@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.coprocessor.*; import org.apache.hadoop.hbase.coprocessor.*;
@ -38,7 +39,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
@InterfaceAudience.Private @InterfaceAudience.Private
public class WALCoprocessorHost public class WALCoprocessorHost
extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> { extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
/** /**
* Encapsulation of the environment of each coprocessor * Encapsulation of the environment of each coprocessor
*/ */
@ -195,4 +196,66 @@ public class WALCoprocessorHost
} }
} }
} }
/**
* Called before rolling the current WAL
* @param oldPath the path of the current wal that we are replacing
* @param newPath the path of the wal we are going to create
*/
public void preWALRoll(Path oldPath, Path newPath) throws IOException {
if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
ObserverContext<WALCoprocessorEnvironment> ctx = null;
List<WALEnvironment> envs = coprocessors.get();
for (int i = 0; i < envs.size(); i++) {
WALEnvironment env = envs.get(i);
if (env.getInstance() instanceof WALObserver) {
final WALObserver observer = (WALObserver)env.getInstance();
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
observer.preWALRoll(ctx, oldPath, newPath);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
/**
* Called after rolling the current WAL
* @param oldPath the path of the wal that we replaced
* @param newPath the path of the wal we have created and now is the current
*/
public void postWALRoll(Path oldPath, Path newPath) throws IOException {
if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
ObserverContext<WALCoprocessorEnvironment> ctx = null;
List<WALEnvironment> envs = coprocessors.get();
for (int i = 0; i < envs.size(); i++) {
WALEnvironment env = envs.get(i);
if (env.getInstance() instanceof WALObserver) {
final WALObserver observer = (WALObserver)env.getInstance();
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
observer.postWALRoll(ctx, oldPath, newPath);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
} }

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -60,6 +61,8 @@ implements WALObserver {
private boolean postWALWriteCalled = false; private boolean postWALWriteCalled = false;
private boolean preWALRestoreCalled = false; private boolean preWALRestoreCalled = false;
private boolean postWALRestoreCalled = false; private boolean postWALRestoreCalled = false;
private boolean preWALRollCalled = false;
private boolean postWALRollCalled = false;
// Deprecated versions // Deprecated versions
private boolean preWALWriteDeprecatedCalled = false; private boolean preWALWriteDeprecatedCalled = false;
@ -89,6 +92,8 @@ implements WALObserver {
postWALWriteDeprecatedCalled = false; postWALWriteDeprecatedCalled = false;
preWALRestoreDeprecatedCalled = false; preWALRestoreDeprecatedCalled = false;
postWALRestoreDeprecatedCalled = false; postWALRestoreDeprecatedCalled = false;
preWALRollCalled = false;
postWALRollCalled = false;
} }
@Override @Override
@ -167,6 +172,18 @@ implements WALObserver {
preWALRestore(env, info, (WALKey)logKey, logEdit); preWALRestore(env, info, (WALKey)logKey, logEdit);
} }
@Override
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
Path oldPath, Path newPath) throws IOException {
preWALRollCalled = true;
}
@Override
public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
Path oldPath, Path newPath) throws IOException {
postWALRollCalled = true;
}
/** /**
* Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
* Restoreed. * Restoreed.
@ -220,6 +237,14 @@ implements WALObserver {
return postWALRestoreDeprecatedCalled; return postWALRestoreDeprecatedCalled;
} }
public boolean isPreWALRollCalled() {
return preWALRollCalled;
}
public boolean isPostWALRollCalled() {
return postWALRollCalled;
}
/** /**
* This class should trigger our legacy support since it does not directly implement the * This class should trigger our legacy support since it does not directly implement the
* newer API methods. * newer API methods.

View File

@ -471,6 +471,20 @@ public class TestWALObserver {
assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class)); assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class));
} }
@Test
public void testWALObserverRoll() throws Exception {
final WAL wal = wals.getWAL(UNSPECIFIED_REGION, null);
final SampleRegionWALObserver cp = getCoprocessor(wal, SampleRegionWALObserver.class);
cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
assertFalse(cp.isPreWALRollCalled());
assertFalse(cp.isPostWALRollCalled());
wal.rollWriter(true);
assertTrue(cp.isPreWALRollCalled());
assertTrue(cp.isPostWALRollCalled());
}
private SampleRegionWALObserver getCoprocessor(WAL wal, private SampleRegionWALObserver getCoprocessor(WAL wal,
Class<? extends SampleRegionWALObserver> clazz) throws Exception { Class<? extends SampleRegionWALObserver> clazz) throws Exception {
WALCoprocessorHost host = wal.getCoprocessorHost(); WALCoprocessorHost host = wal.getCoprocessorHost();