HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#390)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Geoffrey Jacoby 2019-08-09 14:27:32 -07:00 committed by Andrew Purtell
parent 0136b9873f
commit c8f57bf678
8 changed files with 225 additions and 2 deletions

View File

@ -1104,4 +1104,16 @@ public interface RegionObserver {
throws IOException {
return delTracker;
}
/**
* Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
* coprocessors to add extended attributes to the WALKey that then get persisted to the
* WAL, and are available to replication endpoints to use in processing WAL Entries.
* @param ctx the environment provided by the region server
* @param key the WALKey associated with a particular append to a WAL
*/
default void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
WALEdit edit)
throws IOException {
}
}

View File

@ -7964,6 +7964,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(origLogSeqNum);
}
//don't call the coproc hook for writes to the WAL caused by
//system lifecycle events like flushes or compactions
if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdit);
}
WriteEntry writeEntry = null;
try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);

View File

@ -1720,6 +1720,18 @@ public class RegionCoprocessorHost
});
}
public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
if (this.coprocEnvironments.isEmpty()){
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preWALAppend(this, key, edit);
}
});
}
public Message preEndpointInvocation(final Service service, final String methodName,
Message request) throws IOException {
if (coprocEnvironments.isEmpty()) {

View File

@ -34,7 +34,6 @@ import java.util.UUID;
/**
* Key for WAL Entry.
* Read-only. No Setters. For limited audience such as Coprocessors.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC})
@ -86,6 +85,13 @@ public interface WALKey extends SequenceId, Comparable<WALKey> {
*/
long getOrigLogSeqNum();
/**
* Add a named String value to this WALKey to be persisted into the WAL
* @param attributeKey Name of the attribute
* @param attributeValue Value of the attribute
*/
void addExtendedAttribute(String attributeKey, byte[] attributeValue);
/**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor

View File

@ -195,6 +195,37 @@ public class WALKeyImpl implements WALKey {
mvcc, null, null);
}
/**
* Copy constructor that takes in an existing WALKeyImpl plus some extended attributes.
* Intended for coprocessors to add annotations to a system-generated WALKey
* for persistence to the WAL.
* @param key Key to be copied into this new key
* @param extendedAttributes Extra attributes to copy into the new key
*/
public WALKeyImpl(WALKeyImpl key,
Map<String, byte[]> extendedAttributes){
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
key.getWriteTime(), key.getClusterIds(), key.getNonceGroup(), key.getNonce(),
key.getMvcc(), key.getReplicationScopes(), extendedAttributes);
}
/**
* Copy constructor that takes in an existing WALKey, the extra WALKeyImpl fields that the
* parent interface is missing, plus some extended attributes. Intended
* for coprocessors to add annotations to a system-generated WALKey for
* persistence to the WAL.
*/
public WALKeyImpl(WALKey key,
List<UUID> clusterIds,
MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScopes,
Map<String, byte[]> extendedAttributes){
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
key.getWriteTime(), clusterIds, key.getNonceGroup(), key.getNonce(),
mvcc, replicationScopes, extendedAttributes);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
@ -464,6 +495,14 @@ public class WALKeyImpl implements WALKey {
return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0);
}
@Override
public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
if (extendedAttributes == null){
extendedAttributes = new HashMap<String, byte[]>();
}
extendedAttributes.put(attributeKey, attributeValue);
}
@Override
public byte[] getExtendedAttribute(String attributeKey){
return extendedAttributes != null ? extendedAttributes.get(attributeKey) : null;

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -124,7 +125,11 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
Map<String, byte[]> extendedAttributes = new HashMap<String,byte[]>();
static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
public void setThrowOnPostFlush(Boolean val){
throwOnPostFlush.set(val);
@ -631,6 +636,15 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return reader;
}
@Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
WALKey key, WALEdit edit) throws IOException {
ctPreWALAppend.incrementAndGet();
key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
Bytes.toBytes("foo"));
}
public boolean hadPreGet() {
return ctPreGet.get() > 0;
}
@ -864,6 +878,10 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPostWALRestore.get();
}
public int getCtPreWALAppend() {
return ctPreWALAppend.get();
}
public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
@ -70,6 +74,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
@ -77,13 +82,18 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -99,6 +109,7 @@ public class TestRegionObserverInterface {
private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
public static final byte[] FAMILY = Bytes.toBytes("f");
public final static byte[] A = Bytes.toBytes("a");
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
@ -663,6 +674,97 @@ public class TestRegionObserverInterface {
table.close();
}
//called from testPreWALAppendIsWrittenToWAL
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
int expectedCalls = 0;
String [] methodArray = new String[1];
methodArray[0] = "getCtPreWALAppend";
Object[] resultArray = new Object[1];
Put p = new Put(ROW);
p.addColumn(A, A, A);
table.put(p);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
Append a = new Append(ROW);
a.addColumn(B, B, B);
table.append(a);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
Increment i = new Increment(ROW);
i.addColumn(C, C, 1);
table.increment(i);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
Delete d = new Delete(ROW);
table.delete(d);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
}
@Test
public void testPreWALAppend() throws Exception {
SimpleRegionObserver sro = new SimpleRegionObserver();
ObserverContext ctx = Mockito.mock(ObserverContext.class);
WALKey key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE,
EnvironmentEdgeManager.currentTime());
WALEdit edit = new WALEdit();
sro.preWALAppend(ctx, key, edit);
Assert.assertEquals(1, key.getExtendedAttributes().size());
Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
}
@Test
public void testPreWALAppendIsWrittenToWAL() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
"." + name.getMethodName());
Table table = util.createTable(tableName, new byte[][] { A, B, C });
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
//should be only one region
HRegion region = regions.get(0);
region.getWAL().registerWALActionsListener(listener);
testPreWALAppendHook(table, tableName);
boolean[] expectedResults = {true, true, true, true};
Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
}
@Test
public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
"." + name.getMethodName());
TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
tdBuilder.setColumnFamily(cfBuilder.build());
tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName());
TableDescriptor td = tdBuilder.build();
Table table = util.createTable(td, new byte[][] { A, B, C });
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
//should be only one region
HRegion region = regions.get(0);
region.getWAL().registerWALActionsListener(listener);
//flushing should write to the WAL
region.flush(true);
//so should compaction
region.compact(false);
//and so should closing the region
region.close();
//but we still shouldn't have triggered preWALAppend because no user data was written
String[] methods = new String[] {"getCtPreWALAppend"};
Object[] expectedResult = new Integer[]{0};
verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
}
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
Object value[]) throws IOException {
@ -711,4 +813,23 @@ public class TestRegionObserverInterface {
writer.close();
}
}
private static class PreWALAppendWALActionsListener implements WALActionsListener {
boolean[] walKeysCorrect = {false, false, false, false};
@Override
public void postAppend(long entryLen, long elapsedTimeMillis,
WALKey logKey, WALEdit logEdit) throws IOException {
for (int k = 0; k < 4; k++) {
if (!walKeysCorrect[k]) {
walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
logKey.getExtendedAttribute(Integer.toString(k + 1)));
}
}
}
boolean[] getWalKeysCorrectArray() {
return walKeysCorrect;
}
}
}

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -165,7 +166,6 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.metrics2.MetricsExecutor;
import org.junit.After;
import org.junit.Assert;
@ -401,6 +401,7 @@ public class TestHRegion {
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + testName);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
hLog.init();
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
@ -2427,7 +2428,16 @@ public class TestHRegion {
return null;
}
}).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
newBuilder(COLUMN_FAMILY_BYTES);
ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
Long.MAX_VALUE, region.getCellComparator());
Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
Mockito.any())).thenReturn(info);
Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
Mockito.any())).thenAnswer(i -> i.getArgument(1));
region.setCoprocessorHost(mockedCPHost);
region.put(originalPut);
region.setCoprocessorHost(normalCPHost);
final long finalSize = region.getDataInMemoryWithoutWAL();