diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index aaf3beb2219..7803ac43190 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -42,8 +41,6 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; @@ -127,23 +124,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); if (walProvider != null) { - walProvider.addWALActionsListener(new WALActionsListener() { - - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.preLogRoll(newPath); - } - - @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.postLogRoll(newPath); - } - - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - replicationManager.scopeWALEdits(logKey, logEdit); - } - }); + walProvider + .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 6e875638ca2..85b2e854a0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -43,8 +43,6 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; @@ -60,13 +58,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,8 +68,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; - /** * This class is responsible to manage all the replication sources. There are two classes of * sources: @@ -609,43 +601,6 @@ public class ReplicationSourceManager implements ReplicationListener { } } - void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException { - scopeWALEdits(logKey, logEdit, this.conf); - } - - /** - * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from - * compaction WAL edits and if the scope is local. - * @param logKey Key that may get scoped according to its edits - * @param logEdit Edits used to lookup the scopes - * @throws IOException If failed to parse the WALEdit - */ - @VisibleForTesting - static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { - boolean replicationForBulkLoadEnabled = - ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); - boolean foundOtherEdits = false; - for (Cell cell : logEdit.getCells()) { - if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - foundOtherEdits = true; - break; - } - } - - if (!foundOtherEdits && logEdit.getCells().size() > 0) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); - if (maybeEvent != null && - (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { - // In serially replication, we use scopes when reading close marker. - foundOtherEdits = true; - } - } - if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { - ((WALKeyImpl) logKey).serializeReplicationScope(false); - } - } - @Override public void regionServerRemoved(String regionserver) { transferQueues(ServerName.valueOf(regionserver)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java new file mode 100644 index 00000000000..eb126146f3f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Used to receive new wals. + */ +@InterfaceAudience.Private +class ReplicationSourceWALActionListener implements WALActionsListener { + + private final Configuration conf; + + private final ReplicationSourceManager manager; + + public ReplicationSourceWALActionListener(Configuration conf, ReplicationSourceManager manager) { + this.conf = conf; + this.manager = manager; + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + manager.preLogRoll(newPath); + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + manager.postLogRoll(newPath); + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + scopeWALEdits(logKey, logEdit, conf); + } + + /** + * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from + * compaction WAL edits and if the scope is local. + * @param logKey Key that may get scoped according to its edits + * @param logEdit Edits used to lookup the scopes + * @throws IOException If failed to parse the WALEdit + */ + @VisibleForTesting + static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { + boolean replicationForBulkLoadEnabled = + ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); + boolean foundOtherEdits = false; + for (Cell cell : logEdit.getCells()) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + foundOtherEdits = true; + break; + } + } + + if (!foundOtherEdits && logEdit.getCells().size() > 0) { + WALProtos.RegionEventDescriptor maybeEvent = + WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); + if (maybeEvent != null && + (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { + // In serially replication, we use scopes when reading close marker. + foundOtherEdits = true; + } + } + if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { + ((WALKeyImpl) logKey).serializeReplicationScope(false); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index a8afe2d48ee..a53cba37080 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -82,7 +81,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -276,23 +274,8 @@ public abstract class TestReplicationSourceManager { WALFactory wals = new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); ReplicationSourceManager replicationManager = replication.getReplicationManager(); - wals.getWALProvider().addWALActionsListener(new WALActionsListener() { - - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.preLogRoll(newPath); - } - - @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.postLogRoll(newPath); - } - - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - replicationManager.scopeWALEdits(logKey, logEdit); - } - }); + wals.getWALProvider() + .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); final WAL wal = wals.getWAL(hri); manager.init(); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); @@ -450,7 +433,7 @@ public abstract class TestReplicationSourceManager { RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) .setEndKey(HConstants.EMPTY_END_ROW).build(); WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); - ReplicationSourceManager.scopeWALEdits(new WALKeyImpl(), edit, conf); + ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); } @Test @@ -462,7 +445,7 @@ public abstract class TestReplicationSourceManager { WALKeyImpl logKey = new WALKeyImpl(scope); // 3. Get the scopes for the key - ReplicationSourceManager.scopeWALEdits(logKey, logEdit, conf); + ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", @@ -481,7 +464,7 @@ public abstract class TestReplicationSourceManager { bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); // 4. Get the scopes for the key - ReplicationSourceManager.scopeWALEdits(logKey, logEdit, bulkLoadConf); + ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); NavigableMap scopes = logKey.getReplicationScopes(); // Assert family with replication scope global is present in the key scopes