diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java index 16b8fc57eab..ed631ded6f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +76,7 @@ public class ReplicationBarrierCleaner extends ScheduledChore { long cleanedRows = 0; long deletedRows = 0; long deletedBarriers = 0; + long deletedLastPushedSeqIds = 0; TableName tableName = null; List peerIds = null; try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME); @@ -123,11 +125,16 @@ public class ReplicationBarrierCleaner extends ScheduledChore { } else { index++; } - // A special case for merged/split region, where we are in the last closed range and the - // pushedSeqId is the last barrier minus 1. + // A special case for merged/split region, and also deleted tables, where we are in the last + // closed range and the pushedSeqId is the last barrier minus 1. if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) { // check if the region has already been removed, i.e, no catalog family if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) { + ReplicationQueueStorage queueStorage = peerManager.getQueueStorage(); + for (String peerId: peerIds) { + queueStorage.removeLastSequenceIds(peerId, Arrays.asList(encodedRegionName)); + deletedLastPushedSeqIds++; + } metaTable .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); deletedRows++; @@ -153,9 +160,9 @@ public class ReplicationBarrierCleaner extends ScheduledChore { } if (totalRows > 0) { LOG.info( - "Cleanup replication barriers: " + - "totalRows {}, cleanedRows {}, deletedRows {}, deletedBarriers {}", - totalRows, cleanedRows, deletedRows, deletedBarriers); + "Cleanup replication barriers: totalRows {}, " + + "cleanedRows {}, deletedRows {}, deletedBarriers {}, deletedLastPushedSeqIds {}", + totalRows, cleanedRows, deletedRows, deletedBarriers, deletedLastPushedSeqIds); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java index 671bc22d7dd..6e0d6484a0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -28,6 +29,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -93,8 +95,8 @@ public class TestReplicationBarrierCleaner { @After public void tearDown() throws IOException { try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME); - ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY) - .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) { + ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY) + .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) { for (;;) { Result result = scanner.next(); if (result == null) { @@ -144,7 +146,7 @@ public class TestReplicationBarrierCleaner { Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); for (int i = 0; i < barriers.length; i++) { put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, - put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); + put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i])); } try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { table.put(put); @@ -260,15 +262,17 @@ public class TestReplicationBarrierCleaner { addBarrier(region, 40, 50, 60); fillCatalogFamily(region); + String peerId = "1"; ReplicationQueueStorage queueStorage = create(59L); @SuppressWarnings("unchecked") - ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList("1")); + ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList(peerId)); ReplicationBarrierCleaner cleaner = create(peerManager); // we have something in catalog family, so only delete 40 cleaner.chore(); assertArrayEquals(new long[] { 50, 60 }, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); + verify(queueStorage, never()).removeLastSequenceIds(anyString(), anyList()); // No catalog family, then we should remove the whole row clearCatalogFamily(region); @@ -277,6 +281,8 @@ public class TestReplicationBarrierCleaner { assertFalse(table .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY))); } + verify(queueStorage, times(1)).removeLastSequenceIds(peerId, + Arrays.asList(region.getEncodedName())); } private static class WarnOnlyStoppable implements Stoppable {