HBASE-26120 New replication gets stuck or data loss when multiwal groups more than 10 (#3528)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
845e6f0216
commit
eb4728116d
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -73,6 +72,7 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -156,7 +156,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
// The paths to the latest log of each wal group, for new coming peers
|
// The paths to the latest log of each wal group, for new coming peers
|
||||||
private final Set<Path> latestPaths;
|
private final Map<String, Path> latestPaths;
|
||||||
// Path to the wals directories
|
// Path to the wals directories
|
||||||
private final Path logDir;
|
private final Path logDir;
|
||||||
// Path to the wal archive
|
// Path to the wal archive
|
||||||
|
@ -229,7 +229,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
tfb.setNameFormat("ReplicationExecutor-%d");
|
tfb.setNameFormat("ReplicationExecutor-%d");
|
||||||
tfb.setDaemon(true);
|
tfb.setDaemon(true);
|
||||||
this.executor.setThreadFactory(tfb.build());
|
this.executor.setThreadFactory(tfb.build());
|
||||||
this.latestPaths = new HashSet<Path>();
|
this.latestPaths = new HashMap<>();
|
||||||
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
||||||
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
|
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
|
||||||
|
@ -376,17 +376,17 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
|
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
|
||||||
this.walsById.put(peerId, walsByGroup);
|
this.walsById.put(peerId, walsByGroup);
|
||||||
// Add the latest wal to that source's queue
|
// Add the latest wal to that source's queue
|
||||||
if (this.latestPaths.size() > 0) {
|
if (!latestPaths.isEmpty()) {
|
||||||
for (Path logPath : latestPaths) {
|
for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
|
||||||
String name = logPath.getName();
|
Path walPath = walPrefixAndPath.getValue();
|
||||||
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
|
NavigableSet<String> wals = new TreeSet<>();
|
||||||
NavigableSet<String> logs = new TreeSet<>();
|
wals.add(walPath.getName());
|
||||||
logs.add(name);
|
walsByGroup.put(walPrefixAndPath.getKey(), wals);
|
||||||
walsByGroup.put(walPrefix, logs);
|
|
||||||
// Abort RS and throw exception to make add peer failed
|
// Abort RS and throw exception to make add peer failed
|
||||||
abortAndThrowIOExceptionWhenFail(
|
abortAndThrowIOExceptionWhenFail(
|
||||||
() -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
|
() -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
|
||||||
src.enqueueLog(logPath);
|
src.enqueueLog(walPath);
|
||||||
|
LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -637,15 +637,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add to latestPaths
|
// Add to latestPaths
|
||||||
Iterator<Path> iterator = latestPaths.iterator();
|
latestPaths.put(logPrefix, newLog);
|
||||||
while (iterator.hasNext()) {
|
|
||||||
Path path = iterator.next();
|
|
||||||
if (path.getName().contains(logPrefix)) {
|
|
||||||
iterator.remove();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.latestPaths.add(newLog);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -894,6 +886,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Set<Path> getLastestPath() {
|
||||||
|
synchronized (latestPaths) {
|
||||||
|
return Sets.newHashSet(latestPaths.values());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public AtomicLong getTotalBufferUsed() {
|
public AtomicLong getTotalBufferUsed() {
|
||||||
return totalBufferUsed;
|
return totalBufferUsed;
|
||||||
}
|
}
|
||||||
|
|
|
@ -640,6 +640,22 @@ public abstract class TestReplicationSourceManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSameWALPrefix() throws IOException {
|
||||||
|
Set<String> latestWalsBefore =
|
||||||
|
manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
|
||||||
|
String walName1 = "localhost,8080,12345-45678-Peer.34567";
|
||||||
|
String walName2 = "localhost,8080,12345.56789";
|
||||||
|
manager.preLogRoll(new Path(walName1));
|
||||||
|
manager.preLogRoll(new Path(walName2));
|
||||||
|
|
||||||
|
Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
|
||||||
|
.filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
|
||||||
|
assertEquals(2, latestWals.size());
|
||||||
|
assertTrue(latestWals.contains(walName1));
|
||||||
|
assertTrue(latestWals.contains(walName2));
|
||||||
|
}
|
||||||
|
|
||||||
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
|
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
|
||||||
// 1. Create store files for the families
|
// 1. Create store files for the families
|
||||||
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
|
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
|
||||||
|
|
Loading…
Reference in New Issue