HBASE-20378 Provide a hbck option to cleanup replication barrier for a table
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
eaafdff761
commit
5f260451d5
|
@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -85,6 +86,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
|
|||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
|
@ -99,7 +101,9 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
|
@ -115,6 +119,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
||||
|
@ -268,11 +276,13 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
private boolean fixHFileLinks = false; // fix lingering HFileLinks
|
||||
private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
|
||||
private boolean fixReplication = false; // fix undeleted replication queues for removed peer
|
||||
private boolean cleanReplicationBarrier = false; // clean replication barriers of a table
|
||||
private boolean fixAny = false; // Set to true if any of the fix is required.
|
||||
|
||||
// limit checking/fixes to listed tables, if empty attempt to check/fix all
|
||||
// hbase:meta are always checked
|
||||
private Set<TableName> tablesIncluded = new HashSet<>();
|
||||
private TableName cleanReplicationBarrierTable;
|
||||
private int maxMerge = DEFAULT_MAX_MERGE; // maximum number of overlapping regions to merge
|
||||
// maximum number of overlapping regions to sideline
|
||||
private int maxOverlapsToSideline = DEFAULT_OVERLAPS_TO_SIDELINE;
|
||||
|
@ -786,6 +796,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
|
||||
checkAndFixReplication();
|
||||
|
||||
cleanReplicationBarrier();
|
||||
|
||||
// Remove the hbck znode
|
||||
cleanupHbckZnode();
|
||||
|
||||
|
@ -4119,14 +4131,13 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
enum ERROR_CODE {
|
||||
UNKNOWN, NO_META_REGION, NULL_META_REGION, NO_VERSION_FILE, NOT_IN_META_HDFS, NOT_IN_META,
|
||||
NOT_IN_META_OR_DEPLOYED, NOT_IN_HDFS_OR_DEPLOYED, NOT_IN_HDFS, SERVER_DOES_NOT_MATCH_META,
|
||||
NOT_DEPLOYED,
|
||||
MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE,
|
||||
NOT_DEPLOYED, MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE,
|
||||
FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS,
|
||||
HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
|
||||
ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
|
||||
LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR,
|
||||
ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS,
|
||||
UNSUPPORTED_OPTION
|
||||
UNSUPPORTED_OPTION, INVALID_TABLE
|
||||
}
|
||||
void clear();
|
||||
void report(String message);
|
||||
|
@ -4558,6 +4569,10 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
fixAny |= shouldFix;
|
||||
}
|
||||
|
||||
public void setCleanReplicationBarrier(boolean shouldClean) {
|
||||
cleanReplicationBarrier = shouldClean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we should rerun fsck again. This checks if we've tried to
|
||||
* fix something and we should rerun fsck tool again.
|
||||
|
@ -4568,7 +4583,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
rerun = true;
|
||||
}
|
||||
|
||||
boolean shouldRerun() {
|
||||
public boolean shouldRerun() {
|
||||
return rerun;
|
||||
}
|
||||
|
||||
|
@ -4849,7 +4864,11 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
"-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles" +
|
||||
"-fixHFileLinks");
|
||||
out.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles");
|
||||
|
||||
out.println("");
|
||||
out.println(" Replication options");
|
||||
out.println(" -fixReplication Deletes replication queues for removed peers");
|
||||
out.println(" -cleanReplicationBrarier [tableName] clean the replication barriers " +
|
||||
"of a specified table, tableName is required");
|
||||
out.flush();
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
|
||||
|
||||
|
@ -4910,13 +4929,12 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
return printUsageAndExit();
|
||||
}
|
||||
try {
|
||||
long timelag = Long.parseLong(args[i+1]);
|
||||
long timelag = Long.parseLong(args[++i]);
|
||||
setTimeLag(timelag);
|
||||
} catch (NumberFormatException e) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric value.");
|
||||
return printUsageAndExit();
|
||||
}
|
||||
i++;
|
||||
} else if (cmd.equals("-sleepBeforeRerun")) {
|
||||
if (i == args.length - 1) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE,
|
||||
|
@ -4924,19 +4942,17 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
return printUsageAndExit();
|
||||
}
|
||||
try {
|
||||
sleepBeforeRerun = Long.parseLong(args[i+1]);
|
||||
sleepBeforeRerun = Long.parseLong(args[++i]);
|
||||
} catch (NumberFormatException e) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs a numeric value.");
|
||||
return printUsageAndExit();
|
||||
}
|
||||
i++;
|
||||
} else if (cmd.equals("-sidelineDir")) {
|
||||
if (i == args.length - 1) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir needs a value.");
|
||||
return printUsageAndExit();
|
||||
}
|
||||
i++;
|
||||
setSidelineDir(args[i]);
|
||||
setSidelineDir(args[++i]);
|
||||
} else if (cmd.equals("-fix")) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE,
|
||||
"This option is deprecated, please use -fixAssignments instead.");
|
||||
|
@ -5006,14 +5022,13 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
return printUsageAndExit();
|
||||
}
|
||||
try {
|
||||
int maxOverlapsToSideline = Integer.parseInt(args[i+1]);
|
||||
int maxOverlapsToSideline = Integer.parseInt(args[++i]);
|
||||
setMaxOverlapsToSideline(maxOverlapsToSideline);
|
||||
} catch (NumberFormatException e) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE,
|
||||
"-maxOverlapsToSideline needs a numeric value argument.");
|
||||
return printUsageAndExit();
|
||||
}
|
||||
i++;
|
||||
} else if (cmd.equals("-maxMerge")) {
|
||||
if (i == args.length - 1) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE,
|
||||
|
@ -5021,14 +5036,13 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
return printUsageAndExit();
|
||||
}
|
||||
try {
|
||||
int maxMerge = Integer.parseInt(args[i+1]);
|
||||
int maxMerge = Integer.parseInt(args[++i]);
|
||||
setMaxMerge(maxMerge);
|
||||
} catch (NumberFormatException e) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE,
|
||||
"-maxMerge needs a numeric value argument.");
|
||||
return printUsageAndExit();
|
||||
}
|
||||
i++;
|
||||
} else if (cmd.equals("-summary")) {
|
||||
setSummary();
|
||||
} else if (cmd.equals("-metaonly")) {
|
||||
|
@ -5037,6 +5051,12 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
setRegionBoundariesCheck();
|
||||
} else if (cmd.equals("-fixReplication")) {
|
||||
setFixReplication(true);
|
||||
} else if (cmd.equals("-cleanReplicationBarrier")) {
|
||||
setCleanReplicationBarrier(true);
|
||||
if(args[++i].startsWith("-")){
|
||||
printUsageAndExit();
|
||||
}
|
||||
setCleanReplicationBarrierTable(args[i]);
|
||||
} else if (cmd.startsWith("-")) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd);
|
||||
return printUsageAndExit();
|
||||
|
@ -5122,7 +5142,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
boolean result = true;
|
||||
String hbaseServerVersion = status.getHBaseVersion();
|
||||
Object[] versionComponents = VersionInfo.getVersionComponents(hbaseServerVersion);
|
||||
if (versionComponents[0] instanceof Integer && ((Integer)versionComponents[0]) >= 2) {
|
||||
if (versionComponents[0] instanceof Integer && ((Integer) versionComponents[0]) >= 2) {
|
||||
// Process command-line args.
|
||||
for (String arg : args) {
|
||||
if (unsupportedOptionsInV2.contains(arg)) {
|
||||
|
@ -5136,6 +5156,85 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
return result;
|
||||
}
|
||||
|
||||
public void setCleanReplicationBarrierTable(String cleanReplicationBarrierTable) {
|
||||
this.cleanReplicationBarrierTable = TableName.valueOf(cleanReplicationBarrierTable);
|
||||
}
|
||||
|
||||
public void cleanReplicationBarrier() throws IOException {
|
||||
if (!cleanReplicationBarrier || cleanReplicationBarrierTable == null) {
|
||||
return;
|
||||
}
|
||||
if (cleanReplicationBarrierTable.isSystemTable()) {
|
||||
errors.reportError(ERROR_CODE.INVALID_TABLE,
|
||||
"invalid table: " + cleanReplicationBarrierTable);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean isGlobalScope = false;
|
||||
try {
|
||||
isGlobalScope = admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope();
|
||||
} catch (TableNotFoundException e) {
|
||||
LOG.info("we may need to clean some erroneous data due to bugs");
|
||||
}
|
||||
|
||||
if (isGlobalScope) {
|
||||
errors.reportError(ERROR_CODE.INVALID_TABLE,
|
||||
"table's replication scope is global: " + cleanReplicationBarrierTable);
|
||||
return;
|
||||
}
|
||||
List<byte[]> regionNames = new ArrayList<>();
|
||||
Scan barrierScan = new Scan();
|
||||
barrierScan.setCaching(100);
|
||||
barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
||||
barrierScan
|
||||
.withStartRow(MetaTableAccessor.getTableStartRowForMeta(cleanReplicationBarrierTable,
|
||||
MetaTableAccessor.QueryType.REGION))
|
||||
.withStopRow(MetaTableAccessor.getTableStopRowForMeta(cleanReplicationBarrierTable,
|
||||
MetaTableAccessor.QueryType.REGION));
|
||||
Result result;
|
||||
try (ResultScanner scanner = meta.getScanner(barrierScan)) {
|
||||
while ((result = scanner.next()) != null) {
|
||||
regionNames.add(result.getRow());
|
||||
}
|
||||
}
|
||||
if (regionNames.size() <= 0) {
|
||||
errors.reportError(ERROR_CODE.INVALID_TABLE,
|
||||
"there is no barriers of this table: " + cleanReplicationBarrierTable);
|
||||
return;
|
||||
}
|
||||
ReplicationQueueStorage queueStorage =
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
|
||||
List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
|
||||
if (peerDescriptions != null && peerDescriptions.size() > 0) {
|
||||
List<String> peers = peerDescriptions.stream()
|
||||
.filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(),
|
||||
cleanReplicationBarrierTable))
|
||||
.map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList());
|
||||
try {
|
||||
List<String> batch = new ArrayList<>();
|
||||
for (String peer : peers) {
|
||||
for (byte[] regionName : regionNames) {
|
||||
batch.add(RegionInfo.encodeRegionName(regionName));
|
||||
if (batch.size() % 100 == 0) {
|
||||
queueStorage.removeLastSequenceIds(peer, batch);
|
||||
batch.clear();
|
||||
}
|
||||
}
|
||||
if (batch.size() > 0) {
|
||||
queueStorage.removeLastSequenceIds(peer, batch);
|
||||
batch.clear();
|
||||
}
|
||||
}
|
||||
} catch (ReplicationException re) {
|
||||
throw new IOException(re);
|
||||
}
|
||||
}
|
||||
for (byte[] regionName : regionNames) {
|
||||
meta.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
|
||||
}
|
||||
setShouldRerun();
|
||||
}
|
||||
|
||||
/**
|
||||
* ls -r for debugging purposes
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestHBaseFsckCleanReplicationBarriers {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static String PEER_1 = "1", PEER_2 = "2";
|
||||
|
||||
private static ReplicationQueueStorage QUEUE_STORAGE;
|
||||
|
||||
private static String WAL_FILE_NAME = "test.wal";
|
||||
|
||||
private static String TABLE_NAME = "test";
|
||||
|
||||
private static String COLUMN_FAMILY = "info";
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniCluster(1);
|
||||
QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
|
||||
UTIL.getConfiguration());
|
||||
createPeer();
|
||||
QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_1,
|
||||
WAL_FILE_NAME);
|
||||
QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_2,
|
||||
WAL_FILE_NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanReplicationBarrierWithNonExistTable()
|
||||
throws ClassNotFoundException, IOException {
|
||||
TableName tableName = TableName.valueOf(TABLE_NAME + "_non");
|
||||
boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
|
||||
assertFalse(cleaned);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanReplicationBarrierWithDeletedTable() throws Exception {
|
||||
TableName tableName = TableName.valueOf(TABLE_NAME + "_deleted");
|
||||
List<RegionInfo> regionInfos = new ArrayList<>();
|
||||
// only write some barriers into meta table
|
||||
|
||||
for (int i = 0; i < 110; i++) {
|
||||
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(i))
|
||||
.setEndKey(Bytes.toBytes(i + 1)).build();
|
||||
regionInfos.add(regionInfo);
|
||||
addStateAndBarrier(regionInfo, RegionState.State.OPEN, 10, 100);
|
||||
updatePushedSeqId(regionInfo, 10);
|
||||
assertEquals("check if there is lastPushedId", 10,
|
||||
QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
|
||||
assertEquals("check if there is lastPushedId", 10,
|
||||
QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
|
||||
}
|
||||
Scan barrierScan = new Scan();
|
||||
barrierScan.setCaching(100);
|
||||
barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
||||
barrierScan
|
||||
.withStartRow(
|
||||
MetaTableAccessor.getTableStartRowForMeta(tableName, MetaTableAccessor.QueryType.REGION))
|
||||
.withStopRow(
|
||||
MetaTableAccessor.getTableStopRowForMeta(tableName, MetaTableAccessor.QueryType.REGION));
|
||||
Result result;
|
||||
try (ResultScanner scanner =
|
||||
MetaTableAccessor.getMetaHTable(UTIL.getConnection()).getScanner(barrierScan)) {
|
||||
while ((result = scanner.next()) != null) {
|
||||
assertTrue(MetaTableAccessor.getReplicationBarriers(result).length > 0);
|
||||
}
|
||||
}
|
||||
boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
|
||||
assertTrue(cleaned);
|
||||
for (RegionInfo regionInfo : regionInfos) {
|
||||
assertEquals("check if there is lastPushedId", -1,
|
||||
QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
|
||||
assertEquals("check if there is lastPushedId", -1,
|
||||
QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
|
||||
}
|
||||
cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
|
||||
assertFalse(cleaned);
|
||||
for (RegionInfo region : regionInfos) {
|
||||
assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(),
|
||||
region.getRegionName()).length);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanReplicationBarrierWithExistTable() throws Exception {
|
||||
TableName tableName = TableName.valueOf(TABLE_NAME);
|
||||
String cf = COLUMN_FAMILY;
|
||||
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build())
|
||||
.setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
|
||||
UTIL.createTable(tableDescriptor, Bytes.split(Bytes.toBytes(1), Bytes.toBytes(256), 123));
|
||||
assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0);
|
||||
for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
|
||||
addStateAndBarrier(region, RegionState.State.OFFLINE, 10, 100);
|
||||
updatePushedSeqId(region, 10);
|
||||
assertEquals("check if there is lastPushedId", 10,
|
||||
QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
|
||||
assertEquals("check if there is lastPushedId", 10,
|
||||
QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
|
||||
}
|
||||
boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
|
||||
assertTrue(cleaned);
|
||||
for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
|
||||
assertEquals("check if there is lastPushedId", -1,
|
||||
QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
|
||||
assertEquals("check if there is lastPushedId", -1,
|
||||
QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
|
||||
}
|
||||
cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
|
||||
assertFalse(cleaned);
|
||||
for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
|
||||
assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(),
|
||||
region.getRegionName()).length);
|
||||
}
|
||||
}
|
||||
|
||||
public static void createPeer() throws IOException {
|
||||
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
|
||||
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
|
||||
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
|
||||
}
|
||||
|
||||
private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
|
||||
throws IOException {
|
||||
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
|
||||
if (state != null) {
|
||||
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
|
||||
Bytes.toBytes(state.name()));
|
||||
}
|
||||
for (int i = 0; i < barriers.length; i++) {
|
||||
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
|
||||
put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
|
||||
}
|
||||
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
|
||||
QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
|
||||
PEER_1, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
|
||||
QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
|
||||
PEER_2, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -40,14 +41,14 @@ public class HbckTestingUtil {
|
|||
|
||||
public static HBaseFsck doFsck(
|
||||
Configuration conf, boolean fix, TableName table) throws Exception {
|
||||
return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
|
||||
return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
|
||||
}
|
||||
|
||||
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
|
||||
boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
|
||||
boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks,
|
||||
boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication,
|
||||
TableName table) throws Exception {
|
||||
boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles,
|
||||
boolean fixHFileLinks, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks,
|
||||
boolean fixReplication, boolean cleanReplicationBarrier, TableName table) throws Exception {
|
||||
HBaseFsck fsck = new HBaseFsck(conf, exec);
|
||||
try {
|
||||
HBaseFsck.setDisplayFullReport(); // i.e. -details
|
||||
|
@ -63,6 +64,7 @@ public class HbckTestingUtil {
|
|||
fsck.setFixHFileLinks(fixHFileLinks);
|
||||
fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
|
||||
fsck.setFixReplication(fixReplication);
|
||||
fsck.setCleanReplicationBarrier(cleanReplicationBarrier);
|
||||
if (table != null) {
|
||||
fsck.includeTable(table);
|
||||
}
|
||||
|
@ -88,6 +90,16 @@ public class HbckTestingUtil {
|
|||
return hbck;
|
||||
}
|
||||
|
||||
public static boolean cleanReplicationBarrier(Configuration conf, TableName table)
|
||||
throws IOException, ClassNotFoundException {
|
||||
HBaseFsck hbck = new HBaseFsck(conf, null);
|
||||
hbck.setCleanReplicationBarrierTable(table.getNameAsString());
|
||||
hbck.setCleanReplicationBarrier(true);
|
||||
hbck.connect();
|
||||
hbck.cleanReplicationBarrier();
|
||||
return hbck.shouldRerun();
|
||||
}
|
||||
|
||||
public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception {
|
||||
List<ERROR_CODE> errs = fsck.getErrors().getErrorList();
|
||||
return (errs != null && !errs.isEmpty());
|
||||
|
|
Loading…
Reference in New Issue