HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#470)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Geoffrey Jacoby 2019-08-09 14:46:08 -07:00 committed by Andrew Purtell
parent e7114f7bec
commit 9888217177
7 changed files with 234 additions and 0 deletions

View File

@ -548,4 +548,10 @@ public class BaseRegionObserver implements RegionObserver {
throws IOException {
return delTracker;
}
@Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
WALEdit edit) throws IOException {
}
}

View File

@ -1341,4 +1341,16 @@ public interface RegionObserver extends Coprocessor {
DeleteTracker postInstantiateDeleteTracker(
final ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
throws IOException;
/**
* Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
* coprocessors to add extended attributes to the WALKey that then get persisted to the
* WAL, and are available to replication endpoints to use in processing WAL Entries.
* @param ctx the environment provided by the region server
* @param key the WALKey associated with a particular append to a WAL
* @param edit the WALEdit associated with a particular append to a WAL
*/
void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
WALEdit edit)
throws IOException;
}

View File

@ -3527,6 +3527,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
preWALAppend(walKey, walEdit);
txid = this.wal
.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, true);
@ -7593,6 +7594,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce, mvcc);
preWALAppend(walKey, walEdit);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, true);
}
@ -7884,6 +7886,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
nonceGroup,
nonce,
mvcc);
preWALAppend(walKey, walEdits);
txid =
this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
} else {
@ -7973,6 +7976,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return mutate.isReturnResults() ? Result.create(allKVs) : null;
}
private void preWALAppend(WALKey walKey, WALEdit walEdits) throws IOException {
if (this.coprocessorHost != null && !walEdits.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdits);
}
}
private static Cell getNewCell(final byte [] row, final long ts, final Cell cell,
final Cell oldCell, final byte [] tagBytes) {
// allocate an empty cell once
@ -8124,6 +8133,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
getMVCC());
preWALAppend(walKey, walEdits);
txid =
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
} else {

View File

@ -1691,6 +1691,19 @@ public class RegionCoprocessorHost
});
}
public void preWALAppend(final WALKey key, final WALEdit edit) throws IOException {
if (coprocessors.isEmpty()){
return;
}
execOperation(new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
oserver.preWALAppend(ctx, key, edit);
}
});
}
private static abstract class CoprocessorOperation
extends ObserverContext<RegionCoprocessorEnvironment> {
public CoprocessorOperation() {

View File

@ -464,6 +464,18 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
}
/**
* Add a named String value to this WALKey to be persisted into the WAL
* @param attributeKey Name of the attribute
* @param attributeValue Value of the attribute
*/
public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
if (extendedAttributes == null){
extendedAttributes = new HashMap<String, byte[]>();
}
extendedAttributes.put(attributeKey, attributeValue);
}
/**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor

View File

@ -139,8 +139,10 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
public void setThrowOnPostFlush(Boolean val){
throwOnPostFlush.set(val);
@ -718,6 +720,15 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return reader;
}
@Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
WALKey key, WALEdit edit) throws IOException {
ctPreWALAppend.incrementAndGet();
key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
Bytes.toBytes("foo"));
}
public boolean hadPreGet() {
return ctPreGet.get() > 0;
}
@ -975,6 +986,10 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return ctPostWALRestoreDeprecated.get();
}
public int getCtPreWALAppend() {
return ctPreWALAppend.get();
}
public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
@ -42,6 +43,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
@ -78,11 +81,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@Category(MediumTests.class)
public class TestRegionObserverInterface {
@ -93,6 +99,7 @@ public class TestRegionObserverInterface {
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
public final static byte[] ROW = Bytes.toBytes("testrow");
public final static byte[] FAMILY = Bytes.toBytes("f");
private static HBaseTestingUtility util = new HBaseTestingUtility();
private static MiniHBaseCluster cluster = null;
@ -750,6 +757,100 @@ public class TestRegionObserverInterface {
table.close();
}
//called from testPreWALAppendIsWrittenToWAL
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
int expectedCalls = 0;
String [] methodArray = new String[1];
methodArray[0] = "getCtPreWALAppend";
Object[] resultArray = new Object[1];
Put p = new Put(ROW);
p.addColumn(A, A, A);
table.put(p);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
Append a = new Append(ROW);
a.add(B, B, B);
table.append(a);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
Increment i = new Increment(ROW);
i.addColumn(C, C, 1);
table.increment(i);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
Delete d = new Delete(ROW);
table.delete(d);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
}
@Test
public void testPreWALAppend() throws Exception {
SimpleRegionObserver sro = new SimpleRegionObserver();
ObserverContext ctx = Mockito.mock(ObserverContext.class);
WALKey key = new WALKey(Bytes.toBytes("region"), TEST_TABLE,
EnvironmentEdgeManager.currentTime());
WALEdit edit = new WALEdit();
sro.preWALAppend(ctx, key, edit);
Assert.assertEquals(1, key.getExtendedAttributes().size());
Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
}
@Test
public void testPreWALAppendIsWrittenToWAL() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
".testPreWALAppendIsWrittenToWAL");
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(A));
htd.addFamily(new HColumnDescriptor(B));
htd.addFamily(new HColumnDescriptor(C));
htd.addCoprocessor(SimpleRegionObserver.class.getName());
Table table = util.createTable(htd, null);
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
//should be only one region
HRegion region = regions.get(0);
region.getWAL().registerWALActionsListener(listener);
testPreWALAppendHook(table, tableName);
boolean[] expectedResults = {true, true, true, true};
Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
}
@Test
public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
".testPreWALAppendNotCalledOnMetaEdt");
HTableDescriptor td = new HTableDescriptor(tableName);
td.addCoprocessor(SimpleRegionObserver.class.getName());
td.addFamily(new HColumnDescriptor(FAMILY));
Table table = util.createTable(td, new byte[][] { A, B, C });
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
//should be only one region
HRegion region = regions.get(0);
region.getWAL().registerWALActionsListener(listener);
//flushing should write to the WAL
region.flush(true);
//so should compaction
region.compact(false);
//and so should closing the region
region.close();
//but we still shouldn't have triggered preWALAppend because no user data was written
String[] methods = new String[] {"getCtPreWALAppend"};
Object[] expectedResult = new Integer[]{0};
verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
}
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class<?> c, String methodName[], TableName tableName,
Object value[]) throws IOException {
@ -800,4 +901,69 @@ public class TestRegionObserverInterface {
writer.close();
}
}
private static class PreWALAppendWALActionsListener implements WALActionsListener {
boolean[] walKeysCorrect = {false, false, false, false};
@Override
public void postAppend(long entryLen, long elapsedTimeMillis,
WALKey logKey, WALEdit logEdit) throws IOException {
for (int k = 0; k < 4; k++) {
if (!walKeysCorrect[k]) {
walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
logKey.getExtendedAttribute(Integer.toString(k + 1)));
}
}
}
@Override
public void postSync(long timeInNanos, int handlerSyncs) {
}
boolean[] getWalKeysCorrectArray() {
return walKeysCorrect;
}
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
}
@Override
public void preLogArchive(Path oldPath, Path newPath) throws IOException {
}
@Override
public void postLogArchive(Path oldPath, Path newPath) throws IOException {
}
@Override
public void logRollRequested(RollRequestReason reason) {
}
@Override
public void logCloseRequested() {
}
@Override
public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
WALEdit logEdit) {
}
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
WALEdit logEdit) throws IOException {
}
}
}