HBASE-20296 Remove last pushed sequence ids when removing tables from a peer

This commit is contained in:
zhangduo 2018-03-31 20:25:13 +08:00
parent 97b3a04019
commit 1e56938757
8 changed files with 363 additions and 101 deletions

View File

@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
@ -682,20 +684,19 @@ public class MetaTableAccessor {
scanMeta(connection, null, null, QueryType.ALL, v);
}
public static void scanMetaForTableRegions(Connection connection,
Visitor visitor, TableName tableName) throws IOException {
public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
TableName tableName) throws IOException {
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor);
}
public static void scanMeta(Connection connection, TableName table,
QueryType type, int maxRows, final Visitor visitor) throws IOException {
public static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
final Visitor visitor) throws IOException {
scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type),
type, maxRows, visitor);
}
public static void scanMeta(Connection connection,
@Nullable final byte[] startRow, @Nullable final byte[] stopRow,
QueryType type, final Visitor visitor) throws IOException {
public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, final Visitor visitor) throws IOException {
scanMeta(connection, startRow, stopRow, type, Integer.MAX_VALUE, visitor);
}
@ -708,26 +709,19 @@ public class MetaTableAccessor {
* @param tableName table withing we scan
* @param row start scan from this row
* @param rowLimit max number of rows to return
* @throws IOException
*/
public static void scanMeta(Connection connection,
final Visitor visitor, final TableName tableName,
final byte[] row, final int rowLimit)
throws IOException {
public static void scanMeta(Connection connection, final Visitor visitor,
final TableName tableName, final byte[] row, final int rowLimit) throws IOException {
byte[] startRow = null;
byte[] stopRow = null;
if (tableName != null) {
startRow =
getTableStartRowForMeta(tableName, QueryType.REGION);
startRow = getTableStartRowForMeta(tableName, QueryType.REGION);
if (row != null) {
RegionInfo closestRi =
getClosestRegionInfo(connection, tableName, row);
startRow = RegionInfo
.createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false);
RegionInfo closestRi = getClosestRegionInfo(connection, tableName, row);
startRow =
RegionInfo.createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false);
}
stopRow =
getTableStopRowForMeta(tableName, QueryType.REGION);
stopRow = getTableStopRowForMeta(tableName, QueryType.REGION);
}
scanMeta(connection, startRow, stopRow, QueryType.REGION, rowLimit, visitor);
}
@ -743,11 +737,16 @@ public class MetaTableAccessor {
* @param type scanned part of meta
* @param maxRows maximum rows to return
* @param visitor Visitor invoked against each row.
* @throws IOException
*/
public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor)
throws IOException {
scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor);
}
private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
final Visitor visitor) throws IOException {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(connection, rowUpperLimit);
@ -760,13 +759,14 @@ public class MetaTableAccessor {
if (stopRow != null) {
scan.withStopRow(stopRow);
}
if (filter != null) {
scan.setFilter(filter);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning META"
+ " starting at row=" + Bytes.toStringBinary(startRow)
+ " stopping at row=" + Bytes.toStringBinary(stopRow)
+ " for max=" + rowUpperLimit
+ " with caching=" + scan.getCaching());
LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow) +
" stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit +
" with caching=" + scan.getCaching());
}
int currentRow = 0;
@ -1973,7 +1973,7 @@ public class MetaTableAccessor {
byte[] value = getParentsBytes(parents);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
.setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build());
}
private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
@ -1988,7 +1988,7 @@ public class MetaTableAccessor {
.setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
.setQualifier(HConstants.SEQNUM_QUALIFIER)
.setTimestamp(put.getTimeStamp())
.setTimestamp(put.getTimestamp())
.setType(Type.Put)
.setValue(Bytes.toBytes(openSeqNum))
.build());
@ -2128,6 +2128,18 @@ public class MetaTableAccessor {
return list;
}
public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn,
TableName tableName) throws IOException {
List<String> list = new ArrayList<>();
scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION),
getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION,
new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
list.add(RegionInfo.encodeRegionName(r.getRow()));
return true;
});
return list;
}
private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException {
if (!METALOG.isDebugEnabled()) {
return;

View File

@ -91,6 +91,15 @@ public interface ReplicationQueueStorage {
* @param peerId peer id
*/
void removeLastSequenceIds(String peerId) throws ReplicationException;
/**
* Remove the max sequence id record for the given peer and regions.
* @param peerId peer id
* @param encodedRegionNames the encoded region names
*/
void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
throws ReplicationException;
/**
* Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver

View File

@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
@ -347,6 +348,20 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
}
@Override
public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
throws ReplicationException {
try {
List<ZKUtilOp> listOfOps =
encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId))
.map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList());
ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId +
", encodedRegionNames.size=" + encodedRegionNames.size(), e);
}
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {

View File

@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
/**
* The procedure for adding a new replication peer.
@ -57,8 +58,15 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
}
@Override
protected boolean reopenRegionsAfterRefresh() {
return true;
protected PeerModificationState nextStateAfterRefresh() {
return peerConfig.isSerial() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
: super.nextStateAfterRefresh();
}
@Override
protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
throws IOException, ReplicationException {
setLastPushedSequenceId(env, peerConfig);
}
@Override

View File

@ -18,11 +18,10 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -55,7 +54,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
private static final int SET_LAST_SEQ_ID_BATCH_SIZE = 1000;
protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
protected ModifyPeerProcedure() {
}
@ -93,12 +92,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
/**
* Implementation class can override this method. The default return value is false which means we
* will jump to POST_PEER_MODIFICATION and finish the procedure. If returns true, we will jump to
* SERIAL_PEER_REOPEN_REGIONS.
* Implementation class can override this method. By default we will jump to
* POST_PEER_MODIFICATION and finish the procedure.
*/
protected boolean reopenRegionsAfterRefresh() {
return false;
protected PeerModificationState nextStateAfterRefresh() {
return PeerModificationState.POST_PEER_MODIFICATION;
}
/**
@ -123,81 +121,98 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
throw new UnsupportedOperationException();
}
private Stream<TableDescriptor> getTables(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
Stream<TableDescriptor> stream = env.getMasterServices().getTableDescriptors().getAll().values()
.stream().filter(TableDescriptor::hasGlobalReplicationScope)
.filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName()));
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
if (oldPeerConfig != null && oldPeerConfig.isSerial()) {
stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName()));
}
return stream;
protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
throws IOException, ReplicationException {
throw new UnsupportedOperationException();
}
private void reopenRegions(MasterProcedureEnv env) throws IOException {
Stream<TableDescriptor> stream = getTables(env);
ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
TableStateManager tsm = env.getMasterServices().getTableStateManager();
stream.filter(td -> {
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!ReplicationUtils.contains(peerConfig, tn)) {
continue;
}
if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
ReplicationUtils.contains(oldPeerConfig, tn)) {
continue;
}
try {
return tsm.getTableState(td.getTableName()).isEnabled();
if (!tsm.getTableState(tn).isEnabled()) {
continue;
}
} catch (TableStateNotFoundException e) {
return false;
} catch (IOException e) {
throw new UncheckedIOException(e);
continue;
}
}).forEach(td -> {
try {
addChildProcedure(env.getAssignmentManager().createReopenProcedures(
env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName())));
} catch (IOException e) {
throw new UncheckedIOException(e);
env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
}
});
}
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
lastSeqIds.put(encodedRegionName, barrier);
if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) {
if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
lastSeqIds.clear();
}
}
}
private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env)
throws IOException, ReplicationException {
Stream<TableDescriptor> stream = getTables(env);
protected final void setLastPushedSequenceId(MasterProcedureEnv env,
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!ReplicationUtils.contains(peerConfig, tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
if (!lastSeqIds.isEmpty()) {
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
}
}
// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
// should not forget to check whether the map is empty at last, if not you should call
// queueStorage.setLastSequenceIds to write out the remaining entries in the map.
protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
stream.forEach(td -> {
boolean isTableEnabled;
try {
if (tsm.getTableState(td.getTableName()).isEnabled()) {
isTableEnabled = tsm.getTableState(tableName).isEnabled();
} catch (TableStateNotFoundException e) {
return;
}
if (isTableEnabled) {
for (Pair<String, Long> name2Barrier : MetaTableAccessor
.getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) {
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
queueStorage);
}
} else {
for (RegionInfo region : regionStates.getRegionsOfTable(td.getTableName(), true)) {
for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage);
}
}
} catch (IOException | ReplicationException e) {
throw new RuntimeException(e);
}
});
if (!lastSeqIds.isEmpty()) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
}
}
@Override
@ -232,8 +247,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
refreshPeer(env, getPeerOperationType());
setNextState(reopenRegionsAfterRefresh() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
: PeerModificationState.POST_PEER_MODIFICATION);
setNextState(nextStateAfterRefresh());
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_REOPEN_REGIONS:
try {
@ -246,7 +260,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
setLastSequenceIdForSerialPeer(env);
updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) {
LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
peerId, e);

View File

@ -18,6 +18,14 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -25,11 +33,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData;
/**
@ -59,12 +69,84 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
return PeerOperationType.UPDATE_CONFIG;
}
private void addToList(List<String> encodedRegionNames, String encodedRegionName,
ReplicationQueueStorage queueStorage) throws ReplicationException {
encodedRegionNames.add(encodedRegionName);
if (encodedRegionNames.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
encodedRegionNames.clear();
}
}
@Override
protected boolean reopenRegionsAfterRefresh() {
// If we remove some tables from the peer config then we do not need to enter the extra states
// for serial replication. Could try to optimize later since it is not easy to determine this...
return peerConfig.isSerial() && (!oldPeerConfig.isSerial() ||
!ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig));
protected PeerModificationState nextStateAfterRefresh() {
if (peerConfig.isSerial()) {
if (oldPeerConfig.isSerial()) {
// both serial, then if the ns/table-cfs configs are not changed, just go with the normal
// way, otherwise we need to reopen the regions for the newly added tables.
return ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig)
? super.nextStateAfterRefresh()
: PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
} else {
// we change the peer to serial, need to reopen all regions
return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
}
} else {
if (oldPeerConfig.isSerial()) {
// we remove the serial flag for peer, then we do not need to reopen all regions, but we
// need to remove the last pushed sequence ids.
return PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID;
} else {
// not serial for both, just go with the normal way.
return super.nextStateAfterRefresh();
}
}
}
@Override
protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
throws IOException, ReplicationException {
if (!oldPeerConfig.isSerial()) {
assert peerConfig.isSerial();
// change to serial
setLastPushedSequenceId(env, peerConfig);
return;
}
if (!peerConfig.isSerial()) {
// remove the serial flag
env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
return;
}
// enter here means peerConfig and oldPeerConfig are both serial, let's find out the diffs and
// process them
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
List<String> encodedRegionNames = new ArrayList<>();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (ReplicationUtils.contains(oldPeerConfig, tn)) {
if (!ReplicationUtils.contains(peerConfig, tn)) {
// removed from peer config
for (String encodedRegionName : MetaTableAccessor
.getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
addToList(encodedRegionNames, encodedRegionName, queueStorage);
}
}
} else if (ReplicationUtils.contains(peerConfig, tn)) {
// newly added to peer config
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
}
if (!encodedRegionNames.isEmpty()) {
queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
}
if (!lastSeqIds.isEmpty()) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
}
}
@Override
@ -99,7 +181,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
if (enabled && reopenRegionsAfterRefresh()) {
// if we need to jump to the special states for serial peers, then we need to disable the peer
// first if it is not disabled yet.
if (enabled && nextStateAfterRefresh() != super.nextStateAfterRefresh()) {
env.getReplicationPeerManager().disablePeer(peerId);
}
}

View File

@ -186,8 +186,8 @@ public class TestEnableTable {
fail("Got an exception while deleting " + tableName);
}
int rowCount = 0;
try (ResultScanner scanner =
metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) {
try (ResultScanner scanner = metaTable
.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) {
for (Result result : scanner) {
LOG.info("Found when none expected: " + result);
rowCount++;

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
/**
* Testcase for HBASE-20296.
*/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestRemoveFromSerialReplicationPeer extends SerialReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRemoveFromSerialReplicationPeer.class);
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
setupWALWriter();
}
private void waitUntilHasLastPushedSequenceId(RegionInfo region) throws Exception {
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID) > 0;
}
@Override
public String explainFailure() throws Exception {
return "Still no last pushed sequence id for " + region;
}
});
}
@Test
public void testRemoveTable() throws Exception {
TableName tableName = createTable();
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
.setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
waitUntilHasLastPushedSequenceId(region);
UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build());
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertEquals(HConstants.NO_SEQNUM,
queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
}
@Test
public void testRemoveSerialFlag() throws Exception {
TableName tableName = createTable();
addPeer(true);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
waitUntilHasLastPushedSequenceId(region);
UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
.newBuilder(UTIL.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(false).build());
waitUntilReplicationDone(100);
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertEquals(HConstants.NO_SEQNUM,
queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
}
}