diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index dc37ac324eb..236667b4be7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + /** * Defines coprocessor hooks for interacting with operations on the * {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations @@ -137,4 +140,33 @@ public interface RegionServerObserver { default void postExecuteProcedures(ObserverContext ctx) throws IOException { } + + /** + * This will be called before replication sink mutations are executed on the sink table as part of + * batch call. + * @param ctx the environment to interact with the framework and region server. + * @param walEntry wal entry from which mutation is formed. + * @param mutation mutation to be applied at sink cluster. + * @throws IOException if something goes wrong. + */ + default void preReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + + } + + /** + * This will be called after replication sink mutations are executed on the sink table as part of + * batch call. + * @param ctx the environment to interact with the framework and region server. + * @param walEntry wal entry from which mutation is formed. + * @param mutation mutation to be applied at sink cluster. + * @throws IOException if something goes wrong. + */ + default void postReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 6a3a0d0958c..770d6155308 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SharedConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; @@ -41,6 +42,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + @InterfaceAudience.Private public class RegionServerCoprocessorHost extends CoprocessorHost { @@ -173,6 +176,26 @@ public class RegionServerCoprocessorHost }); } + public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) + throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { + @Override + public void call(RegionServerObserver observer) throws IOException { + observer.preReplicationSinkBatchMutate(this, walEntry, mutation); + } + }); + } + + public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) + throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { + @Override + public void call(RegionServerObserver observer) throws IOException { + observer.postReplicationSinkBatchMutate(this, walEntry, mutation); + } + }); + } + public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { if (this.coprocEnvironments.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java index 7021bd27cfe..c8141b68340 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; @@ -72,7 +74,11 @@ public class ReplicationSinkServiceImpl implements ReplicationSinkService { @Override public void startReplicationService() throws IOException { - this.replicationSink = new ReplicationSink(this.conf); + RegionServerCoprocessorHost rsServerHost = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + } + this.replicationSink = new ReplicationSink(this.conf, rsServerHost); this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore( "ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond))); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index a71ee9e4b50..dcfd31baa4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.replication.ReplicationFactory; @@ -190,7 +192,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer @Override public void startReplicationService() throws IOException { this.replicationManager.init(); - this.replicationSink = new ReplicationSink(this.conf); + RegionServerCoprocessorHost rsServerHost = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + } + this.replicationSink = new ReplicationSink(this.conf, rsServerHost); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index cac885dcafc..c8a3236c47c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; @@ -97,13 +98,17 @@ public class ReplicationSink { */ private final int rowSizeWarnThreshold; + private final RegionServerCoprocessorHost rsServerHost; + /** * Create a sink for replication * @param conf conf object * @throws IOException thrown when HDFS goes bad or bad file name */ - public ReplicationSink(Configuration conf) throws IOException { + public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost) + throws IOException { this.conf = HBaseConfiguration.create(conf); + this.rsServerHost = rsServerHost; rowSizeWarnThreshold = conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); decorateConf(); @@ -160,6 +165,8 @@ public class ReplicationSink { /** * Replicate this array of entries directly into the local cluster using the native client. Only * operates against raw protobuf type saving on a conversion from pb to pojo. + * @param entries WAL entries to be replicated. + * @param cells cell scanner for iteration. * @param replicationClusterId Id which will uniquely identify source cluster FS client * configurations in the replication configuration directory * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace @@ -180,6 +187,8 @@ public class ReplicationSink { Map, List>> rowMap = new TreeMap<>(); Map, Map>>>> bulkLoadsPerClusters = null; + Pair, List> mutationsToWalEntriesPairs = + new Pair<>(new ArrayList<>(), new ArrayList<>()); for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); if (this.walEntrySinkFilter != null) { @@ -231,6 +240,11 @@ public class ReplicationSink { clusterIds.add(toUUID(clusterId)); } mutation.setClusterIds(clusterIds); + if (rsServerHost != null) { + rsServerHost.preReplicationSinkBatchMutate(entry, mutation); + mutationsToWalEntriesPairs.getFirst().add(mutation); + mutationsToWalEntriesPairs.getSecond().add(entry); + } addToHashMultiMap(rowMap, table, clusterIds, mutation); } if (CellUtil.isDelete(cell)) { @@ -253,6 +267,14 @@ public class ReplicationSink { LOG.debug("Finished replicating mutations."); } + if (rsServerHost != null) { + List mutations = mutationsToWalEntriesPairs.getFirst(); + List walEntries = mutationsToWalEntriesPairs.getSecond(); + for (int i = 0; i < mutations.size(); i++) { + rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); + } + } + if (bulkLoadsPerClusters != null) { for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java new file mode 100644 index 00000000000..2f8fba919f4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationWithWALExtendedAttributes { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationWithWALExtendedAttributes.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class); + + private static Configuration conf1 = HBaseConfiguration.create(); + + private static Admin replicationAdmin; + + private static Connection connection1; + + private static Table htable1; + private static Table htable2; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] ROW = Bytes.toBytes("row"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf1.setInt("replication.source.size.capacity", 10240); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setInt("zookeeper.recovery.retry", 1); + conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setInt("replication.stats.thread.period.seconds", 5); + conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessorForWALAnnotationAtSource.class.getName()); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + // Have to reget conf1 in case zk cluster location different + // than default + conf1 = utility1.getConfiguration(); + LOG.info("Setup first Zk"); + + // Base conf2 on conf1 so it gets the right zk cluster. + Configuration conf2 = HBaseConfiguration.create(conf1); + conf2.setInt("hfile.format.version", 3); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessorForWALAnnotationAtSink.class.getName()); + conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + TestReplicationSinkRegionServerEndpoint.class.getName()); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + + LOG.info("Setup second Zk"); + utility1.startMiniCluster(2); + utility2.startMiniCluster(2); + + connection1 = ConnectionFactory.createConnection(conf1); + replicationAdmin = connection1.getAdmin(); + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); + replicationAdmin.addReplicationPeer("2", rpc); + + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .build(); + try (Connection conn = ConnectionFactory.createConnection(conf1); + Admin admin = conn.getAdmin()) { + admin.createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + try (Connection conn = ConnectionFactory.createConnection(conf2); + Admin admin = conn.getAdmin()) { + admin.createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + htable1 = utility1.getConnection().getTable(TABLE_NAME); + htable2 = utility2.getConnection().getTable(TABLE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + Closeables.close(replicationAdmin, true); + Closeables.close(connection1, true); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + @Test + public void testReplicationWithWALExtendedAttributes() throws Exception { + Put put = new Put(ROW); + put.addColumn(FAMILY, ROW, ROW); + + htable1 = utility1.getConnection().getTable(TABLE_NAME); + htable1.put(put); + + Put put2 = new Put(ROW2); + put2.addColumn(FAMILY, ROW2, ROW2); + + htable1.batch(Collections.singletonList(put2), new Object[1]); + + assertGetValues(new Get(ROW), ROW); + assertGetValues(new Get(ROW2), ROW2); + } + + private static void assertGetValues(Get get, byte[] value) + throws IOException, InterruptedException { + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.isEmpty()) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(value, res.value()); + break; + } + } + } + + public static class TestCoprocessorForWALAnnotationAtSource + implements RegionCoprocessor, RegionObserver { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preWALAppend(ObserverContext ctx, WALKey key, + WALEdit edit) throws IOException { + key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01")); + key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02")); + } + } + + public static class TestCoprocessorForWALAnnotationAtSink + implements RegionCoprocessor, RegionObserver { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit) + throws IOException { + String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1")); + String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2")); + if (attrVal1 == null || attrVal2 == null) { + throw new IOException("Failed to retrieve WAL annotations"); + } + if ( + attrVal1.equals("Value of Extended attribute 01") + && attrVal2.equals("Value of Extended attribute 02") + ) { + return; + } + throw new IOException("Failed to retrieve WAL annotations.."); + } + + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1")); + String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2")); + if (attrVal1 == null || attrVal2 == null) { + throw new IOException("Failed to retrieve WAL annotations"); + } + if ( + attrVal1.equals("Value of Extended attribute 01") + && attrVal2.equals("Value of Extended attribute 02") + ) { + return; + } + throw new IOException("Failed to retrieve WAL annotations.."); + } + } + + public static final class TestReplicationSinkRegionServerEndpoint + implements RegionServerCoprocessor, RegionServerObserver { + + @Override + public Optional getRegionServerObserver() { + return Optional.of(this); + } + + @Override + public void preReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation); + List attributeList = walEntry.getKey().getExtendedAttributesList(); + attachWALExtendedAttributesToMutation(mutation, attributeList); + } + + @Override + public void postReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation); + LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList()); + LOG.info("Mutation attributes: {}", mutation.getAttributesMap()); + } + + private void attachWALExtendedAttributesToMutation(Mutation mutation, + List attributeList) { + if (attributeList != null) { + for (WALProtos.Attribute attribute : attributeList) { + mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); + } + } + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index fa7bef9fc4c..ad754c2553d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -128,7 +129,9 @@ public class TestReplicationSink { TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); TEST_UTIL.startMiniCluster(3); - SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); + RegionServerCoprocessorHost rsCpHost = + TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost(); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 07bd1abb20b..dcddd14b934 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -126,7 +126,7 @@ public class TestWALEntrySinkFilter { conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass("hbase.client.connection.impl", DevNullConnection.class, Connection.class); - ReplicationSink sink = new ReplicationSink(conf); + ReplicationSink sink = new ReplicationSink(conf, null); // Create some dumb walentries. List entries = new ArrayList<>();