diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 4cc46c80904..0f5ef096f09 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState.State; @@ -682,20 +684,19 @@ public class MetaTableAccessor { scanMeta(connection, null, null, QueryType.ALL, v); } - public static void scanMetaForTableRegions(Connection connection, - Visitor visitor, TableName tableName) throws IOException { + public static void scanMetaForTableRegions(Connection connection, Visitor visitor, + TableName tableName) throws IOException { scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor); } - public static void scanMeta(Connection connection, TableName table, - QueryType type, int maxRows, final Visitor visitor) throws IOException { + public static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows, + final Visitor visitor) throws IOException { scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type), - type, maxRows, visitor); + type, maxRows, visitor); } - public static void scanMeta(Connection connection, - @Nullable final byte[] startRow, @Nullable final byte[] stopRow, - QueryType type, final Visitor visitor) throws IOException { + public static void scanMeta(Connection connection, @Nullable final byte[] startRow, + @Nullable final byte[] stopRow, QueryType type, final Visitor visitor) throws IOException { scanMeta(connection, startRow, stopRow, type, Integer.MAX_VALUE, visitor); } @@ -708,26 +709,19 @@ public class MetaTableAccessor { * @param tableName table withing we scan * @param row start scan from this row * @param rowLimit max number of rows to return - * @throws IOException */ - public static void scanMeta(Connection connection, - final Visitor visitor, final TableName tableName, - final byte[] row, final int rowLimit) - throws IOException { - + public static void scanMeta(Connection connection, final Visitor visitor, + final TableName tableName, final byte[] row, final int rowLimit) throws IOException { byte[] startRow = null; byte[] stopRow = null; if (tableName != null) { - startRow = - getTableStartRowForMeta(tableName, QueryType.REGION); + startRow = getTableStartRowForMeta(tableName, QueryType.REGION); if (row != null) { - RegionInfo closestRi = - getClosestRegionInfo(connection, tableName, row); - startRow = RegionInfo - .createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false); + RegionInfo closestRi = getClosestRegionInfo(connection, tableName, row); + startRow = + RegionInfo.createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false); } - stopRow = - getTableStopRowForMeta(tableName, QueryType.REGION); + stopRow = getTableStopRowForMeta(tableName, QueryType.REGION); } scanMeta(connection, startRow, stopRow, QueryType.REGION, rowLimit, visitor); } @@ -743,11 +737,16 @@ public class MetaTableAccessor { * @param type scanned part of meta * @param maxRows maximum rows to return * @param visitor Visitor invoked against each row. - * @throws IOException */ public static void scanMeta(Connection connection, @Nullable final byte[] startRow, @Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) throws IOException { + scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor); + } + + private static void scanMeta(Connection connection, @Nullable final byte[] startRow, + @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows, + final Visitor visitor) throws IOException { int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; Scan scan = getMetaScan(connection, rowUpperLimit); @@ -760,13 +759,14 @@ public class MetaTableAccessor { if (stopRow != null) { scan.withStopRow(stopRow); } + if (filter != null) { + scan.setFilter(filter); + } if (LOG.isTraceEnabled()) { - LOG.trace("Scanning META" - + " starting at row=" + Bytes.toStringBinary(startRow) - + " stopping at row=" + Bytes.toStringBinary(stopRow) - + " for max=" + rowUpperLimit - + " with caching=" + scan.getCaching()); + LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow) + + " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit + + " with caching=" + scan.getCaching()); } int currentRow = 0; @@ -1973,7 +1973,7 @@ public class MetaTableAccessor { byte[] value = getParentsBytes(parents); put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER) - .setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build()); + .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build()); } private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts) @@ -1988,7 +1988,7 @@ public class MetaTableAccessor { .setRow(put.getRow()) .setFamily(HConstants.REPLICATION_BARRIER_FAMILY) .setQualifier(HConstants.SEQNUM_QUALIFIER) - .setTimestamp(put.getTimeStamp()) + .setTimestamp(put.getTimestamp()) .setType(Type.Put) .setValue(Bytes.toBytes(openSeqNum)) .build()); @@ -2128,6 +2128,18 @@ public class MetaTableAccessor { return list; } + public static List getTableEncodedRegionNamesForSerialReplication(Connection conn, + TableName tableName) throws IOException { + List list = new ArrayList<>(); + scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION), + getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION, + new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> { + list.add(RegionInfo.encodeRegionName(r.getRow())); + return true; + }); + return list; + } + private static void debugLogMutations(List mutations) throws IOException { if (!METALOG.isDebugEnabled()) { return; diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index cd37ac26291..84653adbf6d 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -91,6 +91,15 @@ public interface ReplicationQueueStorage { * @param peerId peer id */ void removeLastSequenceIds(String peerId) throws ReplicationException; + + /** + * Remove the max sequence id record for the given peer and regions. + * @param peerId peer id + * @param encodedRegionNames the encoded region names + */ + void removeLastSequenceIds(String peerId, List encodedRegionNames) + throws ReplicationException; + /** * Get the current position for a specific WAL in a given queue for a given regionserver. * @param serverName the name of the regionserver diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 96b0b9130ad..6d721281f25 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -346,6 +347,20 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } + @Override + public void removeLastSequenceIds(String peerId, List encodedRegionNames) + throws ReplicationException { + try { + List listOfOps = + encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId)) + .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); + ZKUtil.multiOrSequential(zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId + + ", encodedRegionNames.size=" + encodedRegionNames.size(), e); + } + } + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index 72228f64cec..2f2d5a5449d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; /** * The procedure for adding a new replication peer. @@ -57,8 +58,15 @@ public class AddPeerProcedure extends ModifyPeerProcedure { } @Override - protected boolean reopenRegionsAfterRefresh() { - return true; + protected PeerModificationState nextStateAfterRefresh() { + return peerConfig.isSerial() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS + : super.nextStateAfterRefresh(); + } + + @Override + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + setLastPushedSequenceId(env, peerConfig); } @Override @@ -102,7 +110,7 @@ public class AddPeerProcedure extends ModifyPeerProcedure { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); serializer.serialize(AddPeerStateData.newBuilder() - .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build()); + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 2b764873962..8bedeffac33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; -import java.util.stream.Stream; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -55,7 +54,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure getTables(MasterProcedureEnv env) throws IOException { - ReplicationPeerConfig peerConfig = getNewPeerConfig(); - Stream stream = env.getMasterServices().getTableDescriptors().getAll().values() - .stream().filter(TableDescriptor::hasGlobalReplicationScope) - .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName())); - ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); - if (oldPeerConfig != null && oldPeerConfig.isSerial()) { - stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName())); - } - return stream; + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + throw new UnsupportedOperationException(); } private void reopenRegions(MasterProcedureEnv env) throws IOException { - Stream stream = getTables(env); + ReplicationPeerConfig peerConfig = getNewPeerConfig(); + ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); TableStateManager tsm = env.getMasterServices().getTableStateManager(); - stream.filter(td -> { + for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { + if (!td.hasGlobalReplicationScope()) { + continue; + } + TableName tn = td.getTableName(); + if (!ReplicationUtils.contains(peerConfig, tn)) { + continue; + } + if (oldPeerConfig != null && oldPeerConfig.isSerial() && + ReplicationUtils.contains(oldPeerConfig, tn)) { + continue; + } try { - return tsm.getTableState(td.getTableName()).isEnabled(); + if (!tsm.getTableState(tn).isEnabled()) { + continue; + } } catch (TableStateNotFoundException e) { - return false; - } catch (IOException e) { - throw new UncheckedIOException(e); + continue; } - }).forEach(td -> { - try { - addChildProcedure(env.getAssignmentManager().createReopenProcedures( - env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName()))); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); + addChildProcedure(env.getAssignmentManager().createReopenProcedures( + env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn))); + } } private void addToMap(Map lastSeqIds, String encodedRegionName, long barrier, ReplicationQueueStorage queueStorage) throws ReplicationException { if (barrier >= 0) { lastSeqIds.put(encodedRegionName, barrier); - if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) { + if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { queueStorage.setLastSequenceIds(peerId, lastSeqIds); lastSeqIds.clear(); } } } - private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env) - throws IOException, ReplicationException { - Stream stream = getTables(env); + protected final void setLastPushedSequenceId(MasterProcedureEnv env, + ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { + Map lastSeqIds = new HashMap(); + for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { + if (!td.hasGlobalReplicationScope()) { + continue; + } + TableName tn = td.getTableName(); + if (!ReplicationUtils.contains(peerConfig, tn)) { + continue; + } + setLastPushedSequenceIdForTable(env, tn, lastSeqIds); + } + if (!lastSeqIds.isEmpty()) { + env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); + } + } + + // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is + // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller + // should not forget to check whether the map is empty at last, if not you should call + // queueStorage.setLastSequenceIds to write out the remaining entries in the map. + protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, + Map lastSeqIds) throws IOException, ReplicationException { TableStateManager tsm = env.getMasterServices().getTableStateManager(); ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); Connection conn = env.getMasterServices().getConnection(); RegionStates regionStates = env.getAssignmentManager().getRegionStates(); MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); - Map lastSeqIds = new HashMap(); - stream.forEach(td -> { - try { - if (tsm.getTableState(td.getTableName()).isEnabled()) { - for (Pair name2Barrier : MetaTableAccessor - .getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) { - addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, - queueStorage); - } - } else { - for (RegionInfo region : regionStates.getRegionsOfTable(td.getTableName(), true)) { - long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); - addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage); - } - } - } catch (IOException | ReplicationException e) { - throw new RuntimeException(e); + boolean isTableEnabled; + try { + isTableEnabled = tsm.getTableState(tableName).isEnabled(); + } catch (TableStateNotFoundException e) { + return; + } + if (isTableEnabled) { + for (Pair name2Barrier : MetaTableAccessor + .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { + addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, + queueStorage); + } + } else { + for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) { + long maxSequenceId = + WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage); } - }); - if (!lastSeqIds.isEmpty()) { - queueStorage.setLastSequenceIds(peerId, lastSeqIds); } } @@ -232,8 +247,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure encodedRegionNames, String encodedRegionName, + ReplicationQueueStorage queueStorage) throws ReplicationException { + encodedRegionNames.add(encodedRegionName); + if (encodedRegionNames.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { + queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); + encodedRegionNames.clear(); + } + } + @Override - protected boolean reopenRegionsAfterRefresh() { - // If we remove some tables from the peer config then we do not need to enter the extra states - // for serial replication. Could try to optimize later since it is not easy to determine this... - return peerConfig.isSerial() && (!oldPeerConfig.isSerial() || - !ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig)); + protected PeerModificationState nextStateAfterRefresh() { + if (peerConfig.isSerial()) { + if (oldPeerConfig.isSerial()) { + // both serial, then if the ns/table-cfs configs are not changed, just go with the normal + // way, otherwise we need to reopen the regions for the newly added tables. + return ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig) + ? super.nextStateAfterRefresh() + : PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; + } else { + // we change the peer to serial, need to reopen all regions + return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; + } + } else { + if (oldPeerConfig.isSerial()) { + // we remove the serial flag for peer, then we do not need to reopen all regions, but we + // need to remove the last pushed sequence ids. + return PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID; + } else { + // not serial for both, just go with the normal way. + return super.nextStateAfterRefresh(); + } + } + } + + @Override + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + if (!oldPeerConfig.isSerial()) { + assert peerConfig.isSerial(); + // change to serial + setLastPushedSequenceId(env, peerConfig); + return; + } + if (!peerConfig.isSerial()) { + // remove the serial flag + env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); + return; + } + // enter here means peerConfig and oldPeerConfig are both serial, let's find out the diffs and + // process them + ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); + Connection conn = env.getMasterServices().getConnection(); + Map lastSeqIds = new HashMap(); + List encodedRegionNames = new ArrayList<>(); + for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { + if (!td.hasGlobalReplicationScope()) { + continue; + } + TableName tn = td.getTableName(); + if (ReplicationUtils.contains(oldPeerConfig, tn)) { + if (!ReplicationUtils.contains(peerConfig, tn)) { + // removed from peer config + for (String encodedRegionName : MetaTableAccessor + .getTableEncodedRegionNamesForSerialReplication(conn, tn)) { + addToList(encodedRegionNames, encodedRegionName, queueStorage); + } + } + } else if (ReplicationUtils.contains(peerConfig, tn)) { + // newly added to peer config + setLastPushedSequenceIdForTable(env, tn, lastSeqIds); + } + } + if (!encodedRegionNames.isEmpty()) { + queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); + } + if (!lastSeqIds.isEmpty()) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + } } @Override @@ -99,7 +181,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig); - if (enabled && reopenRegionsAfterRefresh()) { + // if we need to jump to the special states for serial peers, then we need to disable the peer + // first if it is not disabled yet. + if (enabled && nextStateAfterRefresh() != super.nextStateAfterRefresh()) { env.getReplicationPeerManager().disablePeer(peerId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java index 3b807aa8384..7a1bc55f15a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java @@ -186,8 +186,8 @@ public class TestEnableTable { fail("Got an exception while deleting " + tableName); } int rowCount = 0; - try (ResultScanner scanner = - metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { + try (ResultScanner scanner = metaTable + .getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { for (Result result : scanner) { LOG.info("Found when none expected: " + result); rowCount++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java new file mode 100644 index 00000000000..eda15d815a8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +/** + * Testcase for HBASE-20296. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestRemoveFromSerialReplicationPeer extends SerialReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRemoveFromSerialReplicationPeer.class); + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + setupWALWriter(); + } + + private void waitUntilHasLastPushedSequenceId(RegionInfo region) throws Exception { + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID) > 0; + } + + @Override + public String explainFailure() throws Exception { + return "Still no last pushed sequence id for " + region; + } + }); + } + + @Test + public void testRemoveTable() throws Exception { + TableName tableName = createTable(); + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).setSerial(true).build(); + UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo(); + waitUntilHasLastPushedSequenceId(region); + + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, + ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build()); + + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertEquals(HConstants.NO_SEQNUM, + queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID)); + } + + @Test + public void testRemoveSerialFlag() throws Exception { + TableName tableName = createTable(); + addPeer(true); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo(); + waitUntilHasLastPushedSequenceId(region); + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig + .newBuilder(UTIL.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(false).build()); + waitUntilReplicationDone(100); + + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertEquals(HConstants.NO_SEQNUM, + queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID)); + } +}