HBASE-27529 Provide RS coproc ability to attach WAL extended attributes to mutations at replication sink (#4924)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Viraj Jasani 2023-01-16 10:46:14 -08:00
parent 6d82dc1e0b
commit cb36eba00e
No known key found for this signature in database
GPG Key ID: 2DFADE221012D134
8 changed files with 401 additions and 5 deletions

View File

@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
/** /**
* Defines coprocessor hooks for interacting with operations on the * Defines coprocessor hooks for interacting with operations on the
* {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations * {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations
@ -137,4 +140,33 @@ public interface RegionServerObserver {
default void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx) default void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException { throws IOException {
} }
/**
* This will be called before replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void preReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {
}
/**
* This will be called after replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void postReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {
}
} }

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SharedConnection; import org.apache.hadoop.hbase.SharedConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
@ -41,6 +42,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@InterfaceAudience.Private @InterfaceAudience.Private
public class RegionServerCoprocessorHost public class RegionServerCoprocessorHost
extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> { extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
@ -173,6 +176,26 @@ public class RegionServerCoprocessorHost
}); });
} }
public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}
public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException { throws IOException {
if (this.coprocEnvironments.isEmpty()) { if (this.coprocEnvironments.isEmpty()) {

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
@ -72,7 +74,11 @@ public class ReplicationSinkServiceImpl implements ReplicationSinkService {
@Override @Override
public void startReplicationService() throws IOException { public void startReplicationService() throws IOException {
this.replicationSink = new ReplicationSink(this.conf); RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore( this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
"ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond))); "ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
} }

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
@ -185,7 +187,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
@Override @Override
public void startReplicationService() throws IOException { public void startReplicationService() throws IOException {
this.replicationManager.init(); this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf); RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
this.scheduleThreadPool.scheduleAtFixedRate( this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -120,13 +121,17 @@ public class ReplicationSink {
private final int rowSizeWarnThreshold; private final int rowSizeWarnThreshold;
private boolean replicationSinkTrackerEnabled; private boolean replicationSinkTrackerEnabled;
private final RegionServerCoprocessorHost rsServerHost;
/** /**
* Create a sink for replication * Create a sink for replication
* @param conf conf object * @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name * @throws IOException thrown when HDFS goes bad or bad file name
*/ */
public ReplicationSink(Configuration conf) throws IOException { public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
throws IOException {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
this.rsServerHost = rsServerHost;
rowSizeWarnThreshold = rowSizeWarnThreshold =
conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
@ -185,6 +190,8 @@ public class ReplicationSink {
/** /**
* Replicate this array of entries directly into the local cluster using the native client. Only * Replicate this array of entries directly into the local cluster using the native client. Only
* operates against raw protobuf type saving on a conversion from pb to pojo. * operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries WAL entries to be replicated.
* @param cells cell scanner for iteration.
* @param replicationClusterId Id which will uniquely identify source cluster FS client * @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory * configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
@ -205,6 +212,8 @@ public class ReplicationSink {
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
new Pair<>(new ArrayList<>(), new ArrayList<>());
for (WALEntry entry : entries) { for (WALEntry entry : entries) {
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) { if (this.walEntrySinkFilter != null) {
@ -268,6 +277,11 @@ public class ReplicationSink {
clusterIds.add(toUUID(clusterId)); clusterIds.add(toUUID(clusterId));
} }
mutation.setClusterIds(clusterIds); mutation.setClusterIds(clusterIds);
if (rsServerHost != null) {
rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
mutationsToWalEntriesPairs.getFirst().add(mutation);
mutationsToWalEntriesPairs.getSecond().add(entry);
}
addToHashMultiMap(rowMap, table, clusterIds, mutation); addToHashMultiMap(rowMap, table, clusterIds, mutation);
} }
if (CellUtil.isDelete(cell)) { if (CellUtil.isDelete(cell)) {
@ -290,6 +304,14 @@ public class ReplicationSink {
LOG.debug("Finished replicating mutations."); LOG.debug("Finished replicating mutations.");
} }
if (rsServerHost != null) {
List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
for (int i = 0; i < mutations.size(); i++) {
rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
}
}
if (bulkLoadsPerClusters != null) { if (bulkLoadsPerClusters != null) {
for (Entry<List<String>, for (Entry<List<String>,
Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationWithWALExtendedAttributes {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationWithWALExtendedAttributes.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class);
private static Configuration conf1 = HBaseConfiguration.create();
private static Admin replicationAdmin;
private static Connection connection1;
private static Table htable1;
private static Table htable2;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static final long SLEEP_TIME = 500;
private static final int NB_RETRIES = 10;
private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation");
private static final byte[] FAMILY = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("row");
private static final byte[] ROW2 = Bytes.toBytes("row2");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
conf1.setInt("replication.source.size.capacity", 10240);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setInt("zookeeper.recovery.retry", 1);
conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setInt("replication.stats.thread.period.seconds", 5);
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
TestCoprocessorForWALAnnotationAtSource.class.getName());
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
// Have to reget conf1 in case zk cluster location different
// than default
conf1 = utility1.getConfiguration();
LOG.info("Setup first Zk");
// Base conf2 on conf1 so it gets the right zk cluster.
Configuration conf2 = HBaseConfiguration.create(conf1);
conf2.setInt("hfile.format.version", 3);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
TestCoprocessorForWALAnnotationAtSink.class.getName());
conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
TestReplicationSinkRegionServerEndpoint.class.getName());
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
LOG.info("Setup second Zk");
utility1.startMiniCluster(2);
utility2.startMiniCluster(2);
connection1 = ConnectionFactory.createConnection(conf1);
replicationAdmin = connection1.getAdmin();
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
replicationAdmin.addReplicationPeer("2", rpc);
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
try (Connection conn = ConnectionFactory.createConnection(conf1);
Admin admin = conn.getAdmin()) {
admin.createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
try (Connection conn = ConnectionFactory.createConnection(conf2);
Admin admin = conn.getAdmin()) {
admin.createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
htable1 = utility1.getConnection().getTable(TABLE_NAME);
htable2 = utility2.getConnection().getTable(TABLE_NAME);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(replicationAdmin, true);
Closeables.close(connection1, true);
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
@Test
public void testReplicationWithWALExtendedAttributes() throws Exception {
Put put = new Put(ROW);
put.addColumn(FAMILY, ROW, ROW);
htable1 = utility1.getConnection().getTable(TABLE_NAME);
htable1.put(put);
Put put2 = new Put(ROW2);
put2.addColumn(FAMILY, ROW2, ROW2);
htable1.batch(Collections.singletonList(put2), new Object[1]);
assertGetValues(new Get(ROW), ROW);
assertGetValues(new Get(ROW2), ROW2);
}
private static void assertGetValues(Get get, byte[] value)
throws IOException, InterruptedException {
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for put replication");
}
Result res = htable2.get(get);
if (res.isEmpty()) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(value, res.value());
break;
}
}
}
public static class TestCoprocessorForWALAnnotationAtSource
implements RegionCoprocessor, RegionObserver {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
WALEdit edit) throws IOException {
key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01"));
key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02"));
}
}
public static class TestCoprocessorForWALAnnotationAtSink
implements RegionCoprocessor, RegionObserver {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
throws IOException {
String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1"));
String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2"));
if (attrVal1 == null || attrVal2 == null) {
throw new IOException("Failed to retrieve WAL annotations");
}
if (
attrVal1.equals("Value of Extended attribute 01")
&& attrVal2.equals("Value of Extended attribute 02")
) {
return;
}
throw new IOException("Failed to retrieve WAL annotations..");
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1"));
String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2"));
if (attrVal1 == null || attrVal2 == null) {
throw new IOException("Failed to retrieve WAL annotations");
}
if (
attrVal1.equals("Value of Extended attribute 01")
&& attrVal2.equals("Value of Extended attribute 02")
) {
return;
}
throw new IOException("Failed to retrieve WAL annotations..");
}
}
public static final class TestReplicationSinkRegionServerEndpoint
implements RegionServerCoprocessor, RegionServerObserver {
@Override
public Optional<RegionServerObserver> getRegionServerObserver() {
return Optional.of(this);
}
@Override
public void preReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {
RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation);
List<WALProtos.Attribute> attributeList = walEntry.getKey().getExtendedAttributesList();
attachWALExtendedAttributesToMutation(mutation, attributeList);
}
@Override
public void postReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {
RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation);
LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList());
LOG.info("Mutation attributes: {}", mutation.getAttributesMap());
}
private void attachWALExtendedAttributesToMutation(Mutation mutation,
List<WALProtos.Attribute> attributeList) {
if (attributeList != null) {
for (WALProtos.Attribute attribute : attributeList) {
mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray());
}
}
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -129,7 +130,9 @@ public class TestReplicationSink {
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName()); TestSourceFSConfigurationProvider.class.getCanonicalName());
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); RegionServerCoprocessorHost rsCpHost =
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost();
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost);
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());

View File

@ -120,7 +120,7 @@ public class TestWALEntrySinkFilter {
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL, conf.setClass(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
DevNullAsyncConnection.class, AsyncConnection.class); DevNullAsyncConnection.class, AsyncConnection.class);
ReplicationSink sink = new ReplicationSink(conf); ReplicationSink sink = new ReplicationSink(conf, null);
// Create some dumb walentries. // Create some dumb walentries.
List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries = List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries =
new ArrayList<>(); new ArrayList<>();