diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index fe831686fba..fa445307c3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -66,6 +65,7 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -154,7 +154,7 @@ public class ReplicationSourceManager { private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers - private final Set latestPaths; + private final Map latestPaths; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -225,7 +225,7 @@ public class ReplicationSourceManager { tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); - this.latestPaths = new HashSet(); + this.latestPaths = new HashMap<>(); replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, @@ -345,17 +345,17 @@ public class ReplicationSourceManager { Map> walsByGroup = new HashMap<>(); this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue - if (this.latestPaths.size() > 0) { - for (Path logPath : latestPaths) { - String name = logPath.getName(); - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); - NavigableSet logs = new TreeSet<>(); - logs.add(name); - walsByGroup.put(walPrefix, logs); + if (!latestPaths.isEmpty()) { + for (Map.Entry walPrefixAndPath : latestPaths.entrySet()) { + Path walPath = walPrefixAndPath.getValue(); + NavigableSet wals = new TreeSet<>(); + wals.add(walPath.getName()); + walsByGroup.put(walPrefixAndPath.getKey(), wals); // Abort RS and throw exception to make add peer failed abortAndThrowIOExceptionWhenFail( - () -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); - src.enqueueLog(logPath); + () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); + src.enqueueLog(walPath); + LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); } } } @@ -606,15 +606,7 @@ public class ReplicationSourceManager { } // Add to latestPaths - Iterator iterator = latestPaths.iterator(); - while (iterator.hasNext()) { - Path path = iterator.next(); - if (path.getName().contains(logPrefix)) { - iterator.remove(); - break; - } - } - this.latestPaths.add(newLog); + latestPaths.put(logPrefix, newLog); } } @@ -795,6 +787,12 @@ public class ReplicationSourceManager { } } + Set getLastestPath() { + synchronized (latestPaths) { + return Sets.newHashSet(latestPaths.values()); + } + } + public AtomicLong getTotalBufferUsed() { return totalBufferUsed; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 8aad2b1bcdf..422ab1cae4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -640,6 +640,22 @@ public abstract class TestReplicationSourceManager { }); } + @Test + public void testSameWALPrefix() throws IOException { + Set latestWalsBefore = + manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); + String walName1 = "localhost,8080,12345-45678-Peer.34567"; + String walName2 = "localhost,8080,12345.56789"; + manager.preLogRoll(new Path(walName1)); + manager.preLogRoll(new Path(walName2)); + + Set latestWals = manager.getLastestPath().stream().map(Path::getName) + .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); + assertEquals(2, latestWals.size()); + assertTrue(latestWals.contains(walName1)); + assertTrue(latestWals.contains(walName2)); + } + private WALEdit getBulkLoadWALEdit(NavigableMap scope) { // 1. Create store files for the families Map> storeFiles = new HashMap<>(1);