HBASE-27215 Add support for sync replication (#4762)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2022-09-15 22:58:29 +08:00 committed by Duo Zhang
parent 6d0311c1d9
commit 7044150545
3 changed files with 28 additions and 30 deletions

View File

@ -462,7 +462,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
t.getName()); t.getName());
manager.refreshSources(peerId); manager.refreshSources(peerId);
break; break;
} catch (IOException e1) { } catch (IOException | ReplicationException e1) {
LOG.error("Replication sources refresh failed.", e1); LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
} }

View File

@ -404,38 +404,44 @@ public class ReplicationSourceManager {
// TODO: use empty initial offsets for now, revisit when adding support for sync replication // TODO: use empty initial offsets for now, revisit when adding support for sync replication
ReplicationSourceInterface src = ReplicationSourceInterface src =
createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
// synchronized here to avoid race with preLogRoll where we add new log to source and also // synchronized here to avoid race with postLogRoll where we add new log to source and also
// walsById. // walsById.
ReplicationSourceInterface toRemove; ReplicationSourceInterface toRemove;
Map<String, NavigableSet<String>> wals = new HashMap<>(); ReplicationQueueData queueData;
synchronized (latestPaths) { synchronized (latestPaths) {
// Here we make a copy of all the remaining wal files and then delete them from the
// replication queue storage after releasing the lock. It is not safe to just remove the old
// map from walsById since later we may fail to update the replication queue storage, and when
// we retry next time, we can not know the wal files that needs to be set to the replication
// queue storage
ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder();
synchronized (walsById) {
walsById.get(queueId).forEach((group, wals) -> {
if (!wals.isEmpty()) {
builder.put(group, new ReplicationGroupOffset(wals.last(), -1));
}
});
}
queueData = new ReplicationQueueData(queueId, builder.build());
src = createSource(queueData, peer);
toRemove = sources.put(peerId, src); toRemove = sources.put(peerId, src);
if (toRemove != null) { if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId()); LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage); toRemove.terminate(terminateMessage);
toRemove.getSourceMetrics().clear(); toRemove.getSourceMetrics().clear();
} }
// Here we make a copy of all the remaining wal files and then delete them from the }
// replication queue storage after releasing the lock. It is not safe to just remove the old for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) {
// map from walsById since later we may fail to delete them from the replication queue queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap());
// storage, and when we retry next time, we can not know the wal files that need to be deleted
// from the replication queue storage.
walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
} }
LOG.info("Startup replication source for " + src.getPeerId()); LOG.info("Startup replication source for " + src.getPeerId());
src.startup(); src.startup();
for (NavigableSet<String> walsByGroup : wals.values()) {
// TODO: just need to reset the replication offset
// for (String wal : walsByGroup) {
// queueStorage.removeWAL(server.getServerName(), peerId, wal);
// }
}
synchronized (walsById) { synchronized (walsById) {
Map<String, NavigableSet<String>> oldWals = walsById.get(queueId); Map<String, NavigableSet<String>> wals = walsById.get(queueId);
wals.forEach((k, v) -> { queueData.getOffsets().forEach((group, offset) -> {
NavigableSet<String> walsByGroup = oldWals.get(k); NavigableSet<String> walsByGroup = wals.get(group);
if (walsByGroup != null) { if (walsByGroup != null) {
walsByGroup.removeAll(v); walsByGroup.headSet(offset.getWal(), true).clear();
} }
}); });
} }
@ -458,13 +464,8 @@ public class ReplicationSourceManager {
} }
private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId,
ReplicationPeer peer) throws IOException { ReplicationPeer peer) throws IOException, ReplicationException {
Map<String, ReplicationGroupOffset> offsets; Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
try {
offsets = queueStorage.getOffsets(queueId);
} catch (ReplicationException e) {
throw new IOException(e);
}
return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer); return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer);
} }
@ -474,7 +475,7 @@ public class ReplicationSourceManager {
* replication queue storage and only to enqueue all logs to the new replication source * replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer * @param peerId the id of the replication peer
*/ */
public void refreshSources(String peerId) throws IOException { public void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId String terminateMessage = "Peer " + peerId
+ " state or config changed. Will close the previous replication source and open a new one"; + " state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationPeer peer = replicationPeers.getPeer(peerId);

View File

@ -35,12 +35,9 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
// TODO: revisit later
@Ignore
@Category({ ReplicationTests.class, MediumTests.class }) @Category({ ReplicationTests.class, MediumTests.class })
public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase { public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {