HBASE-15481 Add pre/post roll to WALObserver
Signed-off-by: Sean Busbey <busbey@apache.org> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
This commit is contained in:
parent
4981d4c207
commit
cd84796b39
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
|
@ -70,4 +71,12 @@ public class BaseWALObserver implements WALObserver {
|
|||
HLogKey logKey, WALEdit logEdit) throws IOException {
|
||||
postWALWrite(ctx, info, (WALKey)logKey, logEdit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
Path oldPath, Path newPath) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
Path oldPath, Path newPath) throws IOException { }
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
|
@ -112,4 +113,21 @@ public interface WALObserver extends Coprocessor {
|
|||
@Deprecated
|
||||
void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx,
|
||||
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
|
||||
|
||||
/**
|
||||
* Called before rolling the current WAL
|
||||
* @param oldPath the path of the current wal that we are replacing
|
||||
* @param newPath the path of the wal we are going to create
|
||||
*/
|
||||
void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
Path oldPath, Path newPath) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after rolling the current WAL
|
||||
* @param oldPath the path of the wal that we replaced
|
||||
* @param newPath the path of the wal we have created and now is the current
|
||||
*/
|
||||
void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
Path oldPath, Path newPath) throws IOException;
|
||||
}
|
||||
|
||||
|
|
|
@ -630,6 +630,8 @@ public class FSHLog implements WAL {
|
|||
*/
|
||||
private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
|
||||
throws IOException {
|
||||
coprocessorHost.preWALRoll(oldPath, newPath);
|
||||
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.preLogRoll(oldPath, newPath);
|
||||
|
@ -648,6 +650,8 @@ public class FSHLog implements WAL {
|
|||
i.postLogRoll(oldPath, newPath);
|
||||
}
|
||||
}
|
||||
|
||||
coprocessorHost.postWALRoll(oldPath, newPath);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.coprocessor.*;
|
||||
|
@ -195,4 +196,66 @@ public class WALCoprocessorHost
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before rolling the current WAL
|
||||
* @param oldPath the path of the current wal that we are replacing
|
||||
* @param newPath the path of the wal we are going to create
|
||||
*/
|
||||
public void preWALRoll(Path oldPath, Path newPath) throws IOException {
|
||||
if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
|
||||
ObserverContext<WALCoprocessorEnvironment> ctx = null;
|
||||
List<WALEnvironment> envs = coprocessors.get();
|
||||
for (int i = 0; i < envs.size(); i++) {
|
||||
WALEnvironment env = envs.get(i);
|
||||
if (env.getInstance() instanceof WALObserver) {
|
||||
final WALObserver observer = (WALObserver)env.getInstance();
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
Thread currentThread = Thread.currentThread();
|
||||
ClassLoader cl = currentThread.getContextClassLoader();
|
||||
try {
|
||||
currentThread.setContextClassLoader(env.getClassLoader());
|
||||
observer.preWALRoll(ctx, oldPath, newPath);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
} finally {
|
||||
currentThread.setContextClassLoader(cl);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after rolling the current WAL
|
||||
* @param oldPath the path of the wal that we replaced
|
||||
* @param newPath the path of the wal we have created and now is the current
|
||||
*/
|
||||
public void postWALRoll(Path oldPath, Path newPath) throws IOException {
|
||||
if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
|
||||
ObserverContext<WALCoprocessorEnvironment> ctx = null;
|
||||
List<WALEnvironment> envs = coprocessors.get();
|
||||
for (int i = 0; i < envs.size(); i++) {
|
||||
WALEnvironment env = envs.get(i);
|
||||
if (env.getInstance() instanceof WALObserver) {
|
||||
final WALObserver observer = (WALObserver)env.getInstance();
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
Thread currentThread = Thread.currentThread();
|
||||
ClassLoader cl = currentThread.getContextClassLoader();
|
||||
try {
|
||||
currentThread.setContextClassLoader(env.getClassLoader());
|
||||
observer.postWALRoll(ctx, oldPath, newPath);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
} finally {
|
||||
currentThread.setContextClassLoader(cl);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -60,6 +61,8 @@ implements WALObserver {
|
|||
private boolean postWALWriteCalled = false;
|
||||
private boolean preWALRestoreCalled = false;
|
||||
private boolean postWALRestoreCalled = false;
|
||||
private boolean preWALRollCalled = false;
|
||||
private boolean postWALRollCalled = false;
|
||||
|
||||
// Deprecated versions
|
||||
private boolean preWALWriteDeprecatedCalled = false;
|
||||
|
@ -89,6 +92,8 @@ implements WALObserver {
|
|||
postWALWriteDeprecatedCalled = false;
|
||||
preWALRestoreDeprecatedCalled = false;
|
||||
postWALRestoreDeprecatedCalled = false;
|
||||
preWALRollCalled = false;
|
||||
postWALRollCalled = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -167,6 +172,18 @@ implements WALObserver {
|
|||
preWALRestore(env, info, (WALKey)logKey, logEdit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
Path oldPath, Path newPath) throws IOException {
|
||||
preWALRollCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
Path oldPath, Path newPath) throws IOException {
|
||||
postWALRollCalled = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
|
||||
* Restoreed.
|
||||
|
@ -220,6 +237,14 @@ implements WALObserver {
|
|||
return postWALRestoreDeprecatedCalled;
|
||||
}
|
||||
|
||||
public boolean isPreWALRollCalled() {
|
||||
return preWALRollCalled;
|
||||
}
|
||||
|
||||
public boolean isPostWALRollCalled() {
|
||||
return postWALRollCalled;
|
||||
}
|
||||
|
||||
/**
|
||||
* This class should trigger our legacy support since it does not directly implement the
|
||||
* newer API methods.
|
||||
|
|
|
@ -450,6 +450,20 @@ public class TestWALObserver {
|
|||
assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWALObserverRoll() throws Exception {
|
||||
final WAL wal = wals.getWAL(UNSPECIFIED_REGION, null);
|
||||
final SampleRegionWALObserver cp = getCoprocessor(wal, SampleRegionWALObserver.class);
|
||||
cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
|
||||
|
||||
assertFalse(cp.isPreWALRollCalled());
|
||||
assertFalse(cp.isPostWALRollCalled());
|
||||
|
||||
wal.rollWriter(true);
|
||||
assertTrue(cp.isPreWALRollCalled());
|
||||
assertTrue(cp.isPostWALRollCalled());
|
||||
}
|
||||
|
||||
private SampleRegionWALObserver getCoprocessor(WAL wal,
|
||||
Class<? extends SampleRegionWALObserver> clazz) throws Exception {
|
||||
WALCoprocessorHost host = wal.getCoprocessorHost();
|
||||
|
|
Loading…
Reference in New Issue