HBASE-20294 Also cleanup last pushed sequence id in ReplicationBarrierCleaner

This commit is contained in:
zhangduo 2018-04-14 14:56:18 +08:00
parent 1d133c005c
commit f5d970eba1
2 changed files with 22 additions and 9 deletions

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -75,6 +76,7 @@ public class ReplicationBarrierCleaner extends ScheduledChore {
long cleanedRows = 0; long cleanedRows = 0;
long deletedRows = 0; long deletedRows = 0;
long deletedBarriers = 0; long deletedBarriers = 0;
long deletedLastPushedSeqIds = 0;
TableName tableName = null; TableName tableName = null;
List<String> peerIds = null; List<String> peerIds = null;
try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME); try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
@ -123,11 +125,16 @@ public class ReplicationBarrierCleaner extends ScheduledChore {
} else { } else {
index++; index++;
} }
// A special case for merged/split region, where we are in the last closed range and the // A special case for merged/split region, and also deleted tables, where we are in the last
// pushedSeqId is the last barrier minus 1. // closed range and the pushedSeqId is the last barrier minus 1.
if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) { if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) {
// check if the region has already been removed, i.e, no catalog family // check if the region has already been removed, i.e, no catalog family
if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) { if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
ReplicationQueueStorage queueStorage = peerManager.getQueueStorage();
for (String peerId: peerIds) {
queueStorage.removeLastSequenceIds(peerId, Arrays.asList(encodedRegionName));
deletedLastPushedSeqIds++;
}
metaTable metaTable
.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
deletedRows++; deletedRows++;
@ -153,9 +160,9 @@ public class ReplicationBarrierCleaner extends ScheduledChore {
} }
if (totalRows > 0) { if (totalRows > 0) {
LOG.info( LOG.info(
"Cleanup replication barriers: " + "Cleanup replication barriers: totalRows {}, " +
"totalRows {}, cleanedRows {}, deletedRows {}, deletedBarriers {}", "cleanedRows {}, deletedRows {}, deletedBarriers {}, deletedLastPushedSeqIds {}",
totalRows, cleanedRows, deletedRows, deletedBarriers); totalRows, cleanedRows, deletedRows, deletedBarriers, deletedLastPushedSeqIds);
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -28,6 +29,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -93,8 +95,8 @@ public class TestReplicationBarrierCleaner {
@After @After
public void tearDown() throws IOException { public void tearDown() throws IOException {
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME); try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY) ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY)
.addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) { .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) {
for (;;) { for (;;) {
Result result = scanner.next(); Result result = scanner.next();
if (result == null) { if (result == null) {
@ -144,7 +146,7 @@ public class TestReplicationBarrierCleaner {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
for (int i = 0; i < barriers.length; i++) { for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
} }
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put); table.put(put);
@ -260,15 +262,17 @@ public class TestReplicationBarrierCleaner {
addBarrier(region, 40, 50, 60); addBarrier(region, 40, 50, 60);
fillCatalogFamily(region); fillCatalogFamily(region);
String peerId = "1";
ReplicationQueueStorage queueStorage = create(59L); ReplicationQueueStorage queueStorage = create(59L);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList("1")); ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList(peerId));
ReplicationBarrierCleaner cleaner = create(peerManager); ReplicationBarrierCleaner cleaner = create(peerManager);
// we have something in catalog family, so only delete 40 // we have something in catalog family, so only delete 40
cleaner.chore(); cleaner.chore();
assertArrayEquals(new long[] { 50, 60 }, assertArrayEquals(new long[] { 50, 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
verify(queueStorage, never()).removeLastSequenceIds(anyString(), anyList());
// No catalog family, then we should remove the whole row // No catalog family, then we should remove the whole row
clearCatalogFamily(region); clearCatalogFamily(region);
@ -277,6 +281,8 @@ public class TestReplicationBarrierCleaner {
assertFalse(table assertFalse(table
.exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY))); .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
} }
verify(queueStorage, times(1)).removeLastSequenceIds(peerId,
Arrays.asList(region.getEncodedName()));
} }
private static class WarnOnlyStoppable implements Stoppable { private static class WarnOnlyStoppable implements Stoppable {