HBASE-27529 Provide RS coproc ability to attach WAL extended attributes to mutations at replication sink (#4924)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
6244891df9
commit
6d2064d443
|
@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.coprocessor;
|
|||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
|
||||
/**
|
||||
* Defines coprocessor hooks for interacting with operations on the
|
||||
* {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations
|
||||
|
@ -137,4 +140,33 @@ public interface RegionServerObserver {
|
|||
default void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called before replication sink mutations are executed on the sink table as part of
|
||||
* batch call.
|
||||
* @param ctx the environment to interact with the framework and region server.
|
||||
* @param walEntry wal entry from which mutation is formed.
|
||||
* @param mutation mutation to be applied at sink cluster.
|
||||
* @throws IOException if something goes wrong.
|
||||
*/
|
||||
default void preReplicationSinkBatchMutate(
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
|
||||
Mutation mutation) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called after replication sink mutations are executed on the sink table as part of
|
||||
* batch call.
|
||||
* @param ctx the environment to interact with the framework and region server.
|
||||
* @param walEntry wal entry from which mutation is formed.
|
||||
* @param mutation mutation to be applied at sink cluster.
|
||||
* @throws IOException if something goes wrong.
|
||||
*/
|
||||
default void postReplicationSinkBatchMutate(
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
|
||||
Mutation mutation) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.SharedConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
|
||||
|
@ -41,6 +42,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerCoprocessorHost
|
||||
extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
|
||||
|
@ -173,6 +176,26 @@ public class RegionServerCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
|
||||
@Override
|
||||
public void call(RegionServerObserver observer) throws IOException {
|
||||
observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
|
||||
@Override
|
||||
public void call(RegionServerObserver observer) throws IOException {
|
||||
observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
|
||||
throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.CellScanner;
|
|||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
|
||||
|
@ -72,7 +74,11 @@ public class ReplicationSinkServiceImpl implements ReplicationSinkService {
|
|||
|
||||
@Override
|
||||
public void startReplicationService() throws IOException {
|
||||
this.replicationSink = new ReplicationSink(this.conf);
|
||||
RegionServerCoprocessorHost rsServerHost = null;
|
||||
if (server instanceof HRegionServer) {
|
||||
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
|
||||
}
|
||||
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
|
||||
this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
|
||||
"ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
|
||||
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
|
@ -190,7 +192,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
|||
@Override
|
||||
public void startReplicationService() throws IOException {
|
||||
this.replicationManager.init();
|
||||
this.replicationSink = new ReplicationSink(this.conf);
|
||||
RegionServerCoprocessorHost rsServerHost = null;
|
||||
if (server instanceof HRegionServer) {
|
||||
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
|
||||
}
|
||||
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
|
||||
this.scheduleThreadPool.scheduleAtFixedRate(
|
||||
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
|
||||
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
|
@ -97,13 +98,17 @@ public class ReplicationSink {
|
|||
*/
|
||||
private final int rowSizeWarnThreshold;
|
||||
|
||||
private final RegionServerCoprocessorHost rsServerHost;
|
||||
|
||||
/**
|
||||
* Create a sink for replication
|
||||
* @param conf conf object
|
||||
* @throws IOException thrown when HDFS goes bad or bad file name
|
||||
*/
|
||||
public ReplicationSink(Configuration conf) throws IOException {
|
||||
public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
|
||||
throws IOException {
|
||||
this.conf = HBaseConfiguration.create(conf);
|
||||
this.rsServerHost = rsServerHost;
|
||||
rowSizeWarnThreshold =
|
||||
conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
|
||||
decorateConf();
|
||||
|
@ -160,6 +165,8 @@ public class ReplicationSink {
|
|||
/**
|
||||
* Replicate this array of entries directly into the local cluster using the native client. Only
|
||||
* operates against raw protobuf type saving on a conversion from pb to pojo.
|
||||
* @param entries WAL entries to be replicated.
|
||||
* @param cells cell scanner for iteration.
|
||||
* @param replicationClusterId Id which will uniquely identify source cluster FS client
|
||||
* configurations in the replication configuration directory
|
||||
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
|
||||
|
@ -180,6 +187,8 @@ public class ReplicationSink {
|
|||
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
|
||||
|
||||
Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
|
||||
Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
|
||||
new Pair<>(new ArrayList<>(), new ArrayList<>());
|
||||
for (WALEntry entry : entries) {
|
||||
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
|
||||
if (this.walEntrySinkFilter != null) {
|
||||
|
@ -231,6 +240,11 @@ public class ReplicationSink {
|
|||
clusterIds.add(toUUID(clusterId));
|
||||
}
|
||||
mutation.setClusterIds(clusterIds);
|
||||
if (rsServerHost != null) {
|
||||
rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
|
||||
mutationsToWalEntriesPairs.getFirst().add(mutation);
|
||||
mutationsToWalEntriesPairs.getSecond().add(entry);
|
||||
}
|
||||
addToHashMultiMap(rowMap, table, clusterIds, mutation);
|
||||
}
|
||||
if (CellUtil.isDelete(cell)) {
|
||||
|
@ -253,6 +267,14 @@ public class ReplicationSink {
|
|||
LOG.debug("Finished replicating mutations.");
|
||||
}
|
||||
|
||||
if (rsServerHost != null) {
|
||||
List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
|
||||
List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
|
||||
for (int i = 0; i < mutations.size(); i++) {
|
||||
rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
if (bulkLoadsPerClusters != null) {
|
||||
for (Entry<List<String>,
|
||||
Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
|
||||
|
|
|
@ -0,0 +1,304 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestReplicationWithWALExtendedAttributes {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationWithWALExtendedAttributes.class);
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class);
|
||||
|
||||
private static Configuration conf1 = HBaseConfiguration.create();
|
||||
|
||||
private static Admin replicationAdmin;
|
||||
|
||||
private static Connection connection1;
|
||||
|
||||
private static Table htable1;
|
||||
private static Table htable2;
|
||||
|
||||
private static HBaseTestingUtility utility1;
|
||||
private static HBaseTestingUtility utility2;
|
||||
private static final long SLEEP_TIME = 500;
|
||||
private static final int NB_RETRIES = 10;
|
||||
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation");
|
||||
private static final byte[] FAMILY = Bytes.toBytes("f");
|
||||
private static final byte[] ROW = Bytes.toBytes("row");
|
||||
private static final byte[] ROW2 = Bytes.toBytes("row2");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||
conf1.setInt("replication.source.size.capacity", 10240);
|
||||
conf1.setLong("replication.source.sleepforretries", 100);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 10);
|
||||
conf1.setLong("hbase.master.logcleaner.ttl", 10);
|
||||
conf1.setInt("zookeeper.recovery.retry", 1);
|
||||
conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
|
||||
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||
conf1.setInt("replication.stats.thread.period.seconds", 5);
|
||||
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
|
||||
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
TestCoprocessorForWALAnnotationAtSource.class.getName());
|
||||
|
||||
utility1 = new HBaseTestingUtility(conf1);
|
||||
utility1.startMiniZKCluster();
|
||||
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
|
||||
// Have to reget conf1 in case zk cluster location different
|
||||
// than default
|
||||
conf1 = utility1.getConfiguration();
|
||||
LOG.info("Setup first Zk");
|
||||
|
||||
// Base conf2 on conf1 so it gets the right zk cluster.
|
||||
Configuration conf2 = HBaseConfiguration.create(conf1);
|
||||
conf2.setInt("hfile.format.version", 3);
|
||||
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
|
||||
conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
TestCoprocessorForWALAnnotationAtSink.class.getName());
|
||||
conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
|
||||
TestReplicationSinkRegionServerEndpoint.class.getName());
|
||||
|
||||
utility2 = new HBaseTestingUtility(conf2);
|
||||
utility2.setZkCluster(miniZK);
|
||||
|
||||
LOG.info("Setup second Zk");
|
||||
utility1.startMiniCluster(2);
|
||||
utility2.startMiniCluster(2);
|
||||
|
||||
connection1 = ConnectionFactory.createConnection(conf1);
|
||||
replicationAdmin = connection1.getAdmin();
|
||||
ReplicationPeerConfig rpc =
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
|
||||
replicationAdmin.addReplicationPeer("2", rpc);
|
||||
|
||||
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
|
||||
.build();
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf1);
|
||||
Admin admin = conn.getAdmin()) {
|
||||
admin.createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
}
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf2);
|
||||
Admin admin = conn.getAdmin()) {
|
||||
admin.createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
}
|
||||
htable1 = utility1.getConnection().getTable(TABLE_NAME);
|
||||
htable2 = utility2.getConnection().getTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
Closeables.close(replicationAdmin, true);
|
||||
Closeables.close(connection1, true);
|
||||
utility2.shutdownMiniCluster();
|
||||
utility1.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationWithWALExtendedAttributes() throws Exception {
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, ROW, ROW);
|
||||
|
||||
htable1 = utility1.getConnection().getTable(TABLE_NAME);
|
||||
htable1.put(put);
|
||||
|
||||
Put put2 = new Put(ROW2);
|
||||
put2.addColumn(FAMILY, ROW2, ROW2);
|
||||
|
||||
htable1.batch(Collections.singletonList(put2), new Object[1]);
|
||||
|
||||
assertGetValues(new Get(ROW), ROW);
|
||||
assertGetValues(new Get(ROW2), ROW2);
|
||||
}
|
||||
|
||||
private static void assertGetValues(Get get, byte[] value)
|
||||
throws IOException, InterruptedException {
|
||||
for (int i = 0; i < NB_RETRIES; i++) {
|
||||
if (i == NB_RETRIES - 1) {
|
||||
fail("Waited too much time for put replication");
|
||||
}
|
||||
Result res = htable2.get(get);
|
||||
if (res.isEmpty()) {
|
||||
LOG.info("Row not available");
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
} else {
|
||||
assertArrayEquals(value, res.value());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestCoprocessorForWALAnnotationAtSource
|
||||
implements RegionCoprocessor, RegionObserver {
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
|
||||
WALEdit edit) throws IOException {
|
||||
key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01"));
|
||||
key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02"));
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestCoprocessorForWALAnnotationAtSink
|
||||
implements RegionCoprocessor, RegionObserver {
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
|
||||
throws IOException {
|
||||
String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1"));
|
||||
String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2"));
|
||||
if (attrVal1 == null || attrVal2 == null) {
|
||||
throw new IOException("Failed to retrieve WAL annotations");
|
||||
}
|
||||
if (
|
||||
attrVal1.equals("Value of Extended attribute 01")
|
||||
&& attrVal2.equals("Value of Extended attribute 02")
|
||||
) {
|
||||
return;
|
||||
}
|
||||
throw new IOException("Failed to retrieve WAL annotations..");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1"));
|
||||
String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2"));
|
||||
if (attrVal1 == null || attrVal2 == null) {
|
||||
throw new IOException("Failed to retrieve WAL annotations");
|
||||
}
|
||||
if (
|
||||
attrVal1.equals("Value of Extended attribute 01")
|
||||
&& attrVal2.equals("Value of Extended attribute 02")
|
||||
) {
|
||||
return;
|
||||
}
|
||||
throw new IOException("Failed to retrieve WAL annotations..");
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TestReplicationSinkRegionServerEndpoint
|
||||
implements RegionServerCoprocessor, RegionServerObserver {
|
||||
|
||||
@Override
|
||||
public Optional<RegionServerObserver> getRegionServerObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preReplicationSinkBatchMutate(
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
|
||||
Mutation mutation) throws IOException {
|
||||
RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation);
|
||||
List<WALProtos.Attribute> attributeList = walEntry.getKey().getExtendedAttributesList();
|
||||
attachWALExtendedAttributesToMutation(mutation, attributeList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postReplicationSinkBatchMutate(
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
|
||||
Mutation mutation) throws IOException {
|
||||
RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation);
|
||||
LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList());
|
||||
LOG.info("Mutation attributes: {}", mutation.getAttributesMap());
|
||||
}
|
||||
|
||||
private void attachWALExtendedAttributesToMutation(Mutation mutation,
|
||||
List<WALProtos.Attribute> attributeList) {
|
||||
if (attributeList != null) {
|
||||
for (WALProtos.Attribute attribute : attributeList) {
|
||||
mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
|||
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -128,7 +129,9 @@ public class TestReplicationSink {
|
|||
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
|
||||
TestSourceFSConfigurationProvider.class.getCanonicalName());
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
|
||||
RegionServerCoprocessorHost rsCpHost =
|
||||
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost();
|
||||
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost);
|
||||
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
|
||||
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
|
||||
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
|
||||
|
|
|
@ -126,7 +126,7 @@ public class TestWALEntrySinkFilter {
|
|||
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
|
||||
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
|
||||
conf.setClass("hbase.client.connection.impl", DevNullConnection.class, Connection.class);
|
||||
ReplicationSink sink = new ReplicationSink(conf);
|
||||
ReplicationSink sink = new ReplicationSink(conf, null);
|
||||
// Create some dumb walentries.
|
||||
List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries =
|
||||
new ArrayList<>();
|
||||
|
|
Loading…
Reference in New Issue