diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5756cbc9d0f..20c1215950d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -71,6 +70,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -148,7 +148,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers - private final Set latestPaths; + private final Map latestPaths; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -216,7 +216,7 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); - this.latestPaths = new HashSet(); + this.latestPaths = new HashMap<>(); this.replicationForBulkLoadDataEnabled = conf.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); @@ -371,17 +371,16 @@ public class ReplicationSourceManager implements ReplicationListener { Map> walsByGroup = new HashMap<>(); this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue - if (this.latestPaths.size() > 0) { - for (Path logPath : latestPaths) { - String name = logPath.getName(); - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); - NavigableSet logs = new TreeSet<>(); - logs.add(name); - walsByGroup.put(walPrefix, logs); + if (!latestPaths.isEmpty()) { + for (Map.Entry walPrefixAndPath : latestPaths.entrySet()) { + Path walPath = walPrefixAndPath.getValue(); + NavigableSet wals = new TreeSet<>(); + wals.add(walPath.getName()); + walsByGroup.put(walPrefixAndPath.getKey(), wals); // Abort RS and throw exception to make add peer failed abortAndThrowIOExceptionWhenFail( - () -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); - src.enqueueLog(logPath); + () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); + src.enqueueLog(walPath); } } } @@ -780,15 +779,7 @@ public class ReplicationSourceManager implements ReplicationListener { } // Add to latestPaths - Iterator iterator = latestPaths.iterator(); - while (iterator.hasNext()) { - Path path = iterator.next(); - if (path.getName().contains(logPrefix)) { - iterator.remove(); - break; - } - } - this.latestPaths.add(newLog); + latestPaths.put(logPrefix, newLog); } } @@ -1053,6 +1044,13 @@ public class ReplicationSourceManager implements ReplicationListener { } } + @VisibleForTesting + Set getLastestPath() { + synchronized (latestPaths) { + return Sets.newHashSet(latestPaths.values()); + } + } + @VisibleForTesting public AtomicLong getTotalBufferUsed() { return totalBufferUsed; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index febe764ac3a..0872ea75a92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -646,6 +646,22 @@ public abstract class TestReplicationSourceManager { } } + @Test + public void testSameWALPrefix() throws IOException { + Set latestWalsBefore = + manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); + String walName1 = "localhost,8080,12345-45678-Peer.34567"; + String walName2 = "localhost,8080,12345.56789"; + manager.preLogRoll(new Path(walName1)); + manager.preLogRoll(new Path(walName2)); + + Set latestWals = manager.getLastestPath().stream().map(Path::getName) + .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); + assertEquals(2, latestWals.size()); + assertTrue(latestWals.contains(walName1)); + assertTrue(latestWals.contains(walName2)); + } + /** * Add a peer and wait for it to initialize * @param waitForSource Whether to wait for replication source to initialize