diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java index cfddcd4d4d3..1d0076a28c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -70,4 +71,12 @@ public class BaseWALObserver implements WALObserver { HLogKey logKey, WALEdit logEdit) throws IOException { postWALWrite(ctx, info, (WALKey)logKey, logEdit); } + + @Override + public void preWALRoll(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException { } + + @Override + public void postWALRoll(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java index bba83cc3838..7fd03ce50b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.Coprocessor; @@ -112,4 +113,21 @@ public interface WALObserver extends Coprocessor { @Deprecated void postWALWrite(ObserverContext ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; + + /** + * Called before rolling the current WAL + * @param oldPath the path of the current wal that we are replacing + * @param newPath the path of the wal we are going to create + */ + void preWALRoll(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException; + + /** + * Called after rolling the current WAL + * @param oldPath the path of the wal that we replaced + * @param newPath the path of the wal we have created and now is the current + */ + void postWALRoll(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException; } + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index c01cc1c7dba..070ba3b0709 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -630,6 +630,8 @@ public class FSHLog implements WAL { */ private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) throws IOException { + coprocessorHost.preWALRoll(oldPath, newPath); + if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { i.preLogRoll(oldPath, newPath); @@ -648,6 +650,8 @@ public class FSHLog implements WAL { i.postLogRoll(oldPath, newPath); } } + + coprocessorHost.postWALRoll(oldPath, newPath); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 71cbe57ba5d..87019e86173 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.util.List; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.coprocessor.*; @@ -38,7 +39,7 @@ import org.apache.hadoop.hbase.wal.WALKey; @InterfaceAudience.Private public class WALCoprocessorHost extends CoprocessorHost { - + /** * Encapsulation of the environment of each coprocessor */ @@ -195,4 +196,66 @@ public class WALCoprocessorHost } } } + + /** + * Called before rolling the current WAL + * @param oldPath the path of the current wal that we are replacing + * @param newPath the path of the wal we are going to create + */ + public void preWALRoll(Path oldPath, Path newPath) throws IOException { + if (this.coprocessors == null || this.coprocessors.isEmpty()) return; + ObserverContext ctx = null; + List envs = coprocessors.get(); + for (int i = 0; i < envs.size(); i++) { + WALEnvironment env = envs.get(i); + if (env.getInstance() instanceof WALObserver) { + final WALObserver observer = (WALObserver)env.getInstance(); + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + observer.preWALRoll(ctx, oldPath, newPath); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + /** + * Called after rolling the current WAL + * @param oldPath the path of the wal that we replaced + * @param newPath the path of the wal we have created and now is the current + */ + public void postWALRoll(Path oldPath, Path newPath) throws IOException { + if (this.coprocessors == null || this.coprocessors.isEmpty()) return; + ObserverContext ctx = null; + List envs = coprocessors.get(); + for (int i = 0; i < envs.size(); i++) { + WALEnvironment env = envs.get(i); + if (env.getInstance() instanceof WALObserver) { + final WALObserver observer = (WALObserver)env.getInstance(); + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + observer.postWALRoll(ctx, oldPath, newPath); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java index d7852f17f8e..da262ee626e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; import java.util.Arrays; +import org.apache.hadoop.fs.Path; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,6 +61,8 @@ implements WALObserver { private boolean postWALWriteCalled = false; private boolean preWALRestoreCalled = false; private boolean postWALRestoreCalled = false; + private boolean preWALRollCalled = false; + private boolean postWALRollCalled = false; // Deprecated versions private boolean preWALWriteDeprecatedCalled = false; @@ -89,6 +92,8 @@ implements WALObserver { postWALWriteDeprecatedCalled = false; preWALRestoreDeprecatedCalled = false; postWALRestoreDeprecatedCalled = false; + preWALRollCalled = false; + postWALRollCalled = false; } @Override @@ -167,6 +172,18 @@ implements WALObserver { preWALRestore(env, info, (WALKey)logKey, logEdit); } + @Override + public void preWALRoll(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException { + preWALRollCalled = true; + } + + @Override + public void postWALRoll(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException { + postWALRollCalled = true; + } + /** * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is * Restoreed. @@ -220,6 +237,14 @@ implements WALObserver { return postWALRestoreDeprecatedCalled; } + public boolean isPreWALRollCalled() { + return preWALRollCalled; + } + + public boolean isPostWALRollCalled() { + return postWALRollCalled; + } + /** * This class should trigger our legacy support since it does not directly implement the * newer API methods. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index 95e77a47a73..c7a6a0c24e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -450,6 +450,20 @@ public class TestWALObserver { assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class)); } + @Test + public void testWALObserverRoll() throws Exception { + final WAL wal = wals.getWAL(UNSPECIFIED_REGION, null); + final SampleRegionWALObserver cp = getCoprocessor(wal, SampleRegionWALObserver.class); + cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); + + assertFalse(cp.isPreWALRollCalled()); + assertFalse(cp.isPostWALRollCalled()); + + wal.rollWriter(true); + assertTrue(cp.isPreWALRollCalled()); + assertTrue(cp.isPostWALRollCalled()); + } + private SampleRegionWALObserver getCoprocessor(WAL wal, Class clazz) throws Exception { WALCoprocessorHost host = wal.getCoprocessorHost();