From 70441505451e0dc54df6889a6e15ddcda39ff63c Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 15 Sep 2022 22:58:29 +0800 Subject: [PATCH] HBASE-27215 Add support for sync replication (#4762) Signed-off-by: Xiaolin Ha --- .../regionserver/ReplicationSource.java | 2 +- .../ReplicationSourceManager.java | 53 ++++++++++--------- .../TestDrainReplicationQueuesForStandBy.java | 3 -- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a49bfd7b623..7eba8889511 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -462,7 +462,7 @@ public class ReplicationSource implements ReplicationSourceInterface { t.getName()); manager.refreshSources(peerId); break; - } catch (IOException e1) { + } catch (IOException | ReplicationException e1) { LOG.error("Replication sources refresh failed.", e1); sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b521766ae3d..2fb996c6e4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -404,38 +404,44 @@ public class ReplicationSourceManager { // TODO: use empty initial offsets for now, revisit when adding support for sync replication ReplicationSourceInterface src = createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); - // synchronized here to avoid race with preLogRoll where we add new log to source and also + // synchronized here to avoid race with postLogRoll where we add new log to source and also // walsById. ReplicationSourceInterface toRemove; - Map> wals = new HashMap<>(); + ReplicationQueueData queueData; synchronized (latestPaths) { + // Here we make a copy of all the remaining wal files and then delete them from the + // replication queue storage after releasing the lock. It is not safe to just remove the old + // map from walsById since later we may fail to update the replication queue storage, and when + // we retry next time, we can not know the wal files that needs to be set to the replication + // queue storage + ImmutableMap.Builder builder = ImmutableMap.builder(); + synchronized (walsById) { + walsById.get(queueId).forEach((group, wals) -> { + if (!wals.isEmpty()) { + builder.put(group, new ReplicationGroupOffset(wals.last(), -1)); + } + }); + } + queueData = new ReplicationQueueData(queueId, builder.build()); + src = createSource(queueData, peer); toRemove = sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); toRemove.terminate(terminateMessage); toRemove.getSourceMetrics().clear(); } - // Here we make a copy of all the remaining wal files and then delete them from the - // replication queue storage after releasing the lock. It is not safe to just remove the old - // map from walsById since later we may fail to delete them from the replication queue - // storage, and when we retry next time, we can not know the wal files that need to be deleted - // from the replication queue storage. - walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v))); + } + for (Map.Entry entry : queueData.getOffsets().entrySet()) { + queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap()); } LOG.info("Startup replication source for " + src.getPeerId()); src.startup(); - for (NavigableSet walsByGroup : wals.values()) { - // TODO: just need to reset the replication offset - // for (String wal : walsByGroup) { - // queueStorage.removeWAL(server.getServerName(), peerId, wal); - // } - } synchronized (walsById) { - Map> oldWals = walsById.get(queueId); - wals.forEach((k, v) -> { - NavigableSet walsByGroup = oldWals.get(k); + Map> wals = walsById.get(queueId); + queueData.getOffsets().forEach((group, offset) -> { + NavigableSet walsByGroup = wals.get(group); if (walsByGroup != null) { - walsByGroup.removeAll(v); + walsByGroup.headSet(offset.getWal(), true).clear(); } }); } @@ -458,13 +464,8 @@ public class ReplicationSourceManager { } private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, - ReplicationPeer peer) throws IOException { - Map offsets; - try { - offsets = queueStorage.getOffsets(queueId); - } catch (ReplicationException e) { - throw new IOException(e); - } + ReplicationPeer peer) throws IOException, ReplicationException { + Map offsets = queueStorage.getOffsets(queueId); return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer); } @@ -474,7 +475,7 @@ public class ReplicationSourceManager { * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer */ - public void refreshSources(String peerId) throws IOException { + public void refreshSources(String peerId) throws ReplicationException, IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java index 8918f8422e1..0189d475575 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java @@ -35,12 +35,9 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -// TODO: revisit later -@Ignore @Category({ ReplicationTests.class, MediumTests.class }) public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {