HBASE-20117 Cleanup the unused replication barriers in meta table

This commit is contained in:
zhangduo 2018-03-13 21:36:06 +08:00
parent b16e03c130
commit b7308ee01c
8 changed files with 556 additions and 81 deletions

View File

@ -2056,7 +2056,7 @@ public class MetaTableAccessor {
return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
private static long[] getReplicationBarriers(Result result) {
public static long[] getReplicationBarriers(Result result) {
return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
.stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
}

View File

@ -135,4 +135,42 @@ public final class ReplicationUtils {
return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
/**
* Returns whether we should replicate the given table.
*/
public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
String namespace = tableName.getNamespaceAsString();
if (peerConfig.replicateAllUserTables()) {
// replicate all user tables, but filter by exclude namespaces and table-cfs config
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
return false;
}
Map<TableName, List<String>> excludedTableCFs = peerConfig.getTableCFsMap();
// trap here, must check existence first since HashMap allows null value.
if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) {
return true;
}
List<String> cfs = excludedTableCFs.get(tableName);
// if cfs is null or empty then we can make sure that we do not need to replicate this table,
// otherwise, we may still need to replicate the table but filter out some families.
return cfs != null && !cfs.isEmpty();
} else {
// Not replicate all user tables, so filter by namespaces and table-cfs config
Set<String> namespaces = peerConfig.getNamespaces();
Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
if (namespaces == null && tableCFs == null) {
return false;
}
// First filter by namespaces config
// If table's namespace in peer config, all the tables data are applicable for replication
if (namespaces != null && namespaces.contains(namespace)) {
return true;
}
return tableCFs != null && tableCFs.containsKey(tableName);
}
}
}

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@ -108,6 +109,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@ -364,6 +366,7 @@ public class HMaster extends HRegionServer implements MasterServices {
CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ReplicationBarrierCleaner replicationBarrierCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
private MobCompactionChore mobCompactChore;
private MasterMobCompactionThread mobCompactThread;
@ -1151,19 +1154,30 @@ public class HMaster extends HRegionServer implements MasterServices {
getMasterWalManager().getOldLogDir());
getChoreService().scheduleChore(logCleaner);
//start the hfile archive cleaner thread
// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir, params);
getChoreService().scheduleChore(hfileCleaner);
replicationBarrierCleaner =
new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager);
getChoreService().scheduleChore(replicationBarrierCleaner);
serviceStarted = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
}
private void cancelChore(ScheduledChore chore) {
if (chore != null) {
chore.cancel();
}
}
@Override
protected void stopServiceThreads() {
if (masterJettyServer != null) {
@ -1177,24 +1191,33 @@ public class HMaster extends HRegionServer implements MasterServices {
super.stopServiceThreads();
stopChores();
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping service threads");
LOG.debug("Stopping service threads");
if (this.quotaManager != null) {
this.quotaManager.stop();
}
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
if (this.activeMasterManager != null) {
this.activeMasterManager.stop();
}
if (this.serverManager != null) {
this.serverManager.stop();
}
if (this.assignmentManager != null) {
this.assignmentManager.stop();
}
stopProcedureExecutor();
if (this.walManager != null) this.walManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
if (this.walManager != null) {
this.walManager.stop();
}
if (this.fileSystemManager != null) {
this.fileSystemManager.stop();
}
if (this.mpmHost != null) {
this.mpmHost.stop("server shutting down.");
}
}
private void startProcedureExecutor() throws IOException {
@ -1233,37 +1256,21 @@ public class HMaster extends HRegionServer implements MasterServices {
}
private void stopChores() {
if (this.expiredMobFileCleanerChore != null) {
this.expiredMobFileCleanerChore.cancel(true);
}
if (this.mobCompactChore != null) {
this.mobCompactChore.cancel(true);
}
if (this.balancerChore != null) {
this.balancerChore.cancel(true);
}
if (this.normalizerChore != null) {
this.normalizerChore.cancel(true);
}
if (this.clusterStatusChore != null) {
this.clusterStatusChore.cancel(true);
}
if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.cancel(true);
}
if (this.clusterStatusPublisherChore != null){
clusterStatusPublisherChore.cancel(true);
}
cancelChore(this.expiredMobFileCleanerChore);
cancelChore(this.mobCompactChore);
cancelChore(this.balancerChore);
cancelChore(this.normalizerChore);
cancelChore(this.clusterStatusChore);
cancelChore(this.catalogJanitorChore);
cancelChore(this.clusterStatusPublisherChore);
if (this.mobCompactThread != null) {
this.mobCompactThread.close();
}
if (this.quotaObserverChore != null) {
quotaObserverChore.cancel();
}
if (this.snapshotQuotaChore != null) {
snapshotQuotaChore.cancel();
}
cancelChore(this.clusterStatusPublisherChore);
cancelChore(this.snapshotQuotaChore);
cancelChore(this.logCleaner);
cancelChore(this.hfileCleaner);
cancelChore(this.replicationBarrierCleaner);
}
/**

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in
* meta table.
*/
@InterfaceAudience.Private
public class ReplicationBarrierCleaner extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class);
private static final String REPLICATION_BARRIER_CLEANER_INTERVAL =
"hbase.master.cleaner.replication.barrier.interval";
// 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large
// interval.
private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000;
private final Connection conn;
private final ReplicationPeerManager peerManager;
public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn,
ReplicationPeerManager peerManager) {
super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL,
DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL));
this.conn = conn;
this.peerManager = peerManager;
}
@Override
protected void chore() {
long totalRows = 0;
long cleanedRows = 0;
long deletedRows = 0;
long deletedBarriers = 0;
TableName tableName = null;
List<String> peerIds = null;
try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = metaTable.getScanner(
new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
totalRows++;
long[] barriers = MetaTableAccessor.getReplicationBarriers(result);
if (barriers.length == 0) {
continue;
}
byte[] regionName = result.getRow();
TableName tn = RegionInfo.getTable(regionName);
if (!tn.equals(tableName)) {
tableName = tn;
peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName);
}
if (peerIds.isEmpty()) {
// no serial replication, only keep the newest barrier
Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY,
HConstants.SEQNUM_QUALIFIER);
metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
cell.getTimestamp() - 1));
cleanedRows++;
deletedBarriers += barriers.length - 1;
continue;
}
String encodedRegionName = RegionInfo.encodeRegionName(regionName);
long pushedSeqId = Long.MAX_VALUE;
for (String peerId : peerIds) {
pushedSeqId = Math.min(pushedSeqId,
peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId));
}
int index = Arrays.binarySearch(barriers, pushedSeqId);
if (index == -1) {
// beyond the first barrier, usually this should not happen but anyway let's add a check
// for it.
continue;
}
if (index < 0) {
index = -index - 1;
} else {
index++;
}
// A special case for merged/split region, where we are in the last closed range and the
// pushedSeqId is the last barrier minus 1.
if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) {
// check if the region has already been removed, i.e, no catalog family
if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
metaTable
.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
deletedRows++;
deletedBarriers += barriers.length;
continue;
}
}
// the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in
// SerialReplicationChecker for more details.
if (index - 1 > 0) {
List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY,
HConstants.SEQNUM_QUALIFIER);
// All barriers before this cell(exclusive) can be removed
Cell cell = cells.get(cells.size() - index);
metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
cell.getTimestamp() - 1));
cleanedRows++;
deletedBarriers += index - 1;
}
}
} catch (ReplicationException | IOException e) {
LOG.warn("Failed to clean up replication barrier", e);
}
if (totalRows > 0) {
LOG.info(
"Cleanup replication barriers: " +
"totalRows {}, cleanedRows {}, deletedRows {}, deletedBarriers {}",
totalRows, cleanedRows, deletedRows, deletedBarriers);
}
}
}

View File

@ -327,6 +327,16 @@ public class ReplicationPeerManager {
}
}
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
.filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId())
.collect(Collectors.toList());
}
public ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
@ -53,44 +52,10 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
@Override
public Entry filter(Entry entry) {
TableName tabName = entry.getKey().getTableName();
String namespace = tabName.getNamespaceAsString();
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
if (peerConfig.replicateAllUserTables()) {
// replicate all user tables, but filter by exclude namespaces config
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
// return null(prevent replicating) if logKey's table is in this peer's
// exclude namespaces list
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
return null;
}
if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) {
return entry;
} else {
// Not replicate all user tables, so filter by namespaces and table-cfs config
Set<String> namespaces = peerConfig.getNamespaces();
Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
if (namespaces == null && tableCFs == null) {
return null;
}
// First filter by namespaces config
// If table's namespace in peer config, all the tables data are applicable for replication
if (namespaces != null && namespaces.contains(namespace)) {
return entry;
}
// Then filter by table-cfs config
// return null(prevent replicating) if logKey's table isn't in this peer's
// replicable tables list
if (tableCFs == null || !tableCFs.containsKey(tabName)) {
return null;
}
return entry;
return null;
}
}

View File

@ -0,0 +1,293 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({ MasterTests.class, MediumTests.class })
public class TestReplicationBarrierCleaner {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationBarrierCleaner.class);
private static final Logger LOG = LoggerFactory.getLogger(TestHFileCleaner.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@Rule
public final TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@After
public void tearDown() throws IOException {
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY)
.addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
TableName tableName = RegionInfo.getTable(result.getRow());
if (!tableName.isSystemTable()) {
table.delete(new Delete(result.getRow()));
}
}
}
}
private ReplicationPeerManager create(ReplicationQueueStorage queueStorage,
List<String> firstPeerIds, @SuppressWarnings("unchecked") List<String>... peerIds) {
ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
if (queueStorage != null) {
when(peerManager.getQueueStorage()).thenReturn(queueStorage);
}
if (peerIds.length == 0) {
when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds);
} else {
when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds,
peerIds);
}
return peerManager;
}
private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds)
throws ReplicationException {
ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class);
if (lastPushedSeqIds.length == 0) {
when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId);
} else {
when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId,
lastPushedSeqIds);
}
return queueStorage;
}
private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) throws IOException {
return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new WarnOnlyStoppable(),
UTIL.getConnection(), peerManager);
}
private void addBarrier(RegionInfo region, long... barriers) throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
}
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);
}
}
private void fillCatalogFamily(RegionInfo region) throws IOException {
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(new Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY,
Bytes.toBytes("whatever"), Bytes.toBytes("whatever")));
}
}
private void clearCatalogFamily(RegionInfo region) throws IOException {
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.delete(new Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY));
}
}
@Test
public void testNothing() throws IOException {
ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
ReplicationBarrierCleaner cleaner = create(peerManager);
cleaner.chore();
verify(peerManager, never()).getSerialPeerIdsBelongsTo(any(TableName.class));
verify(peerManager, never()).getQueueStorage();
}
@Test
public void testCleanNoPeers() throws IOException {
TableName tableName1 = TableName.valueOf(name.getMethodName() + "_1");
RegionInfo region11 =
RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build();
addBarrier(region11, 10, 20, 30, 40, 50, 60);
RegionInfo region12 =
RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build();
addBarrier(region12, 20, 30, 40, 50, 60, 70);
TableName tableName2 = TableName.valueOf(name.getMethodName() + "_2");
RegionInfo region21 =
RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build();
addBarrier(region21, 100, 200, 300, 400);
RegionInfo region22 =
RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build();
addBarrier(region22, 200, 300, 400, 500, 600);
@SuppressWarnings("unchecked")
ReplicationPeerManager peerManager =
create(null, Collections.emptyList(), Collections.emptyList());
ReplicationBarrierCleaner cleaner = create(peerManager);
cleaner.chore();
// should never call this method
verify(peerManager, never()).getQueueStorage();
// should only be called twice although we have 4 regions to clean
verify(peerManager, times(2)).getSerialPeerIdsBelongsTo(any(TableName.class));
assertArrayEquals(new long[] { 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region11.getRegionName()));
assertArrayEquals(new long[] { 70 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region12.getRegionName()));
assertArrayEquals(new long[] { 400 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region21.getRegionName()));
assertArrayEquals(new long[] { 600 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region22.getRegionName()));
}
@Test
public void testDeleteBarriers() throws IOException, ReplicationException {
TableName tableName = TableName.valueOf(name.getMethodName());
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addBarrier(region, 10, 20, 30, 40, 50, 60);
// two peers
ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 65L, 55L, 70L, 70L);
List<String> peerIds = Lists.newArrayList("1", "2");
@SuppressWarnings("unchecked")
ReplicationPeerManager peerManager =
create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds);
ReplicationBarrierCleaner cleaner = create(peerManager);
// beyond the first barrier, no deletion
cleaner.chore();
assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
// in the first range, still no deletion
cleaner.chore();
assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
// in the second range, 10 is deleted
cleaner.chore();
assertArrayEquals(new long[] { 20, 30, 40, 50, 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
// between 50 and 60, so the barriers before 50 will be deleted
cleaner.chore();
assertArrayEquals(new long[] { 50, 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
// in the last open range, 50 is deleted
cleaner.chore();
assertArrayEquals(new long[] { 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
}
@Test
public void testDeleteRowForDeletedRegion() throws IOException, ReplicationException {
TableName tableName = TableName.valueOf(name.getMethodName());
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addBarrier(region, 40, 50, 60);
fillCatalogFamily(region);
ReplicationQueueStorage queueStorage = create(59L);
@SuppressWarnings("unchecked")
ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList("1"));
ReplicationBarrierCleaner cleaner = create(peerManager);
// we have something in catalog family, so only delete 40
cleaner.chore();
assertArrayEquals(new long[] { 50, 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
// No catalog family, then we should remove the whole row
clearCatalogFamily(region);
cleaner.chore();
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
assertFalse(table
.exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
}
}
private static class WarnOnlyStoppable implements Stoppable {
@Override
public void stop(String why) {
LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: " + why);
}
@Override
public boolean isStopped() {
return false;
}
}
}

View File

@ -157,7 +157,7 @@ public class TestSerialReplicationChecker {
}
for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
}
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);