HBASE-21451 The way we maintain the lastestPaths in ReplicationSourceManager is broken when sync replication is used

This commit is contained in:
Duo Zhang 2018-11-08 15:01:38 +08:00
parent fa6373660f
commit fe2265fa4a
2 changed files with 35 additions and 21 deletions

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -71,6 +70,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
@ -148,7 +148,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Configuration conf; private final Configuration conf;
private final FileSystem fs; private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers // The paths to the latest log of each wal group, for new coming peers
private final Set<Path> latestPaths; private final Map<String, Path> latestPaths;
// Path to the wals directories // Path to the wals directories
private final Path logDir; private final Path logDir;
// Path to the wal archive // Path to the wal archive
@ -216,7 +216,7 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true); tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build()); this.executor.setThreadFactory(tfb.build());
this.latestPaths = new HashSet<Path>(); this.latestPaths = new HashMap<>();
this.replicationForBulkLoadDataEnabled = conf.getBoolean( this.replicationForBulkLoadDataEnabled = conf.getBoolean(
HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
@ -371,17 +371,16 @@ public class ReplicationSourceManager implements ReplicationListener {
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
this.walsById.put(peerId, walsByGroup); this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue // Add the latest wal to that source's queue
if (this.latestPaths.size() > 0) { if (!latestPaths.isEmpty()) {
for (Path logPath : latestPaths) { for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
String name = logPath.getName(); Path walPath = walPrefixAndPath.getValue();
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); NavigableSet<String> wals = new TreeSet<>();
NavigableSet<String> logs = new TreeSet<>(); wals.add(walPath.getName());
logs.add(name); walsByGroup.put(walPrefixAndPath.getKey(), wals);
walsByGroup.put(walPrefix, logs);
// Abort RS and throw exception to make add peer failed // Abort RS and throw exception to make add peer failed
abortAndThrowIOExceptionWhenFail( abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
src.enqueueLog(logPath); src.enqueueLog(walPath);
} }
} }
} }
@ -780,15 +779,7 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
// Add to latestPaths // Add to latestPaths
Iterator<Path> iterator = latestPaths.iterator(); latestPaths.put(logPrefix, newLog);
while (iterator.hasNext()) {
Path path = iterator.next();
if (path.getName().contains(logPrefix)) {
iterator.remove();
break;
}
}
this.latestPaths.add(newLog);
} }
} }
@ -1053,6 +1044,13 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
} }
@VisibleForTesting
Set<Path> getLastestPath() {
synchronized (latestPaths) {
return Sets.newHashSet(latestPaths.values());
}
}
@VisibleForTesting @VisibleForTesting
public AtomicLong getTotalBufferUsed() { public AtomicLong getTotalBufferUsed() {
return totalBufferUsed; return totalBufferUsed;

View File

@ -646,6 +646,22 @@ public abstract class TestReplicationSourceManager {
} }
} }
@Test
public void testSameWALPrefix() throws IOException {
Set<String> latestWalsBefore =
manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
String walName1 = "localhost,8080,12345-45678-Peer.34567";
String walName2 = "localhost,8080,12345.56789";
manager.preLogRoll(new Path(walName1));
manager.preLogRoll(new Path(walName2));
Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
.filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
assertEquals(2, latestWals.size());
assertTrue(latestWals.contains(walName1));
assertTrue(latestWals.contains(walName2));
}
/** /**
* Add a peer and wait for it to initialize * Add a peer and wait for it to initialize
* @param waitForSource Whether to wait for replication source to initialize * @param waitForSource Whether to wait for replication source to initialize