diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java index 05e60d42c0c..13245d377aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java @@ -489,16 +489,17 @@ public class AsyncMetaTableAccessor { QueryType type) { return tableName.map((table) -> { switch (type) { - case REGION: - byte[] startRow = new byte[table.getName().length + 2]; - System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length); - startRow[startRow.length - 2] = HConstants.DELIMITER; - startRow[startRow.length - 1] = HConstants.DELIMITER; - return startRow; - case ALL: - case TABLE: - default: - return table.getName(); + case REGION: + case REPLICATION: + byte[] startRow = new byte[table.getName().length + 2]; + System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length); + startRow[startRow.length - 2] = HConstants.DELIMITER; + startRow[startRow.length - 1] = HConstants.DELIMITER; + return startRow; + case ALL: + case TABLE: + default: + return table.getName(); } }); } @@ -512,20 +513,21 @@ public class AsyncMetaTableAccessor { return tableName.map((table) -> { final byte[] stopRow; switch (type) { - case REGION: - stopRow = new byte[table.getName().length + 3]; - System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length); - stopRow[stopRow.length - 3] = ' '; - stopRow[stopRow.length - 2] = HConstants.DELIMITER; - stopRow[stopRow.length - 1] = HConstants.DELIMITER; - break; - case ALL: - case TABLE: - default: - stopRow = new byte[table.getName().length + 1]; - System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length); - stopRow[stopRow.length - 1] = ' '; - break; + case REGION: + case REPLICATION: + stopRow = new byte[table.getName().length + 3]; + System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length); + stopRow[stopRow.length - 3] = ' '; + stopRow[stopRow.length - 2] = HConstants.DELIMITER; + stopRow[stopRow.length - 1] = HConstants.DELIMITER; + break; + case ALL: + case TABLE: + default: + stopRow = new byte[table.getName().length + 1]; + System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length); + stopRow[stopRow.length - 1] = ' '; + break; } return stopRow; }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index a800c1c9479..4cc46c80904 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -192,7 +192,8 @@ public class MetaTableAccessor { public enum QueryType { ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY), REGION(HConstants.CATALOG_FAMILY), - TABLE(HConstants.TABLE_FAMILY); + TABLE(HConstants.TABLE_FAMILY), + REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY); private final byte[][] families; @@ -1168,8 +1169,9 @@ public class MetaTableAccessor { final List results = new ArrayList<>(); @Override public boolean visit(Result r) throws IOException { - if (r == null || r.isEmpty()) return true; - add(r); + if (r != null && !r.isEmpty()) { + add(r); + } return true; } @@ -2108,6 +2110,24 @@ public class MetaTableAccessor { } } + public static List> getTableEncodedRegionNameAndLastBarrier(Connection conn, + TableName tableName) throws IOException { + List> list = new ArrayList<>(); + scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION), + getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION, r -> { + byte[] value = + r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER); + if (value == null) { + return true; + } + long lastBarrier = Bytes.toLong(value); + String encodedRegionName = RegionInfo.encodeRegionName(r.getRow()); + list.add(Pair.newPair(encodedRegionName, lastBarrier)); + return true; + }); + return list; + } + private static void debugLogMutations(List mutations) throws IOException { if (!METALOG.isDebugEnabled()) { return; diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index fa6fa757a80..f710759b119 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -377,7 +377,11 @@ enum PeerModificationState { PRE_PEER_MODIFICATION = 1; UPDATE_PEER_STORAGE = 2; REFRESH_PEER_ON_RS = 3; - POST_PEER_MODIFICATION = 4; + SERIAL_PEER_REOPEN_REGIONS = 4; + SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID = 5; + SERIAL_PEER_SET_PEER_ENABLED = 6; + SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS = 7; + POST_PEER_MODIFICATION = 8; } message PeerModificationStateData { @@ -415,4 +419,5 @@ message AddPeerStateData { message UpdatePeerConfigStateData { required ReplicationPeer peer_config = 1; + optional ReplicationPeer old_peer_config = 2; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index cfe9c9c8e80..99a1e97a825 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -78,6 +78,14 @@ public interface ReplicationQueueStorage { */ long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException; + /** + * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up + * a serial replication peer. + * @param peerId peer id + * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. + */ + void setLastSequenceIds(String peerId, Map lastSeqIds) throws ReplicationException; + /** * Get the current position for a specific WAL in a given queue for a given regionserver. * @param serverName the name of the regionserver diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index e2479e035ef..1c42de4f205 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -111,13 +111,11 @@ public final class ReplicationUtils { return true; } - public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) { + public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1, + ReplicationPeerConfig rpc2) { if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) { return false; } - if (rpc1.isSerial() != rpc2.isSerial()) { - return false; - } if (rpc1.replicateAllUserTables()) { return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) && isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap()); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 1a5749eadb8..2ab08aeeb25 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -202,6 +202,24 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } + private void addLastSeqIdsToOps(String queueId, Map lastSeqIds, + List listOfOps) throws KeeperException { + for (Entry lastSeqEntry : lastSeqIds.entrySet()) { + String peerId = new ReplicationQueueInfo(queueId).getPeerId(); + String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); + /* + * Make sure the existence of path + * /hbase/replication/regions//-. As the javadoc in + * multiOrSequential() method said, if received a NodeExistsException, all operations will + * fail. So create the path here, and in fact, no need to add this operation to listOfOps, + * because only need to make sure that update file position and sequence id atomically. + */ + ZKUtil.createWithParents(zookeeper, path); + // Persist the max sequence id of region to zookeeper. + listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); + } + } + @Override public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, Map lastSeqIds) throws ReplicationException { @@ -212,23 +230,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase ZKUtil.positionToByteArray(position))); } // Persist the max sequence id(s) of regions for serial replication atomically. - for (Entry lastSeqEntry : lastSeqIds.entrySet()) { - String peerId = new ReplicationQueueInfo(queueId).getPeerId(); - String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); - /* - * Make sure the existence of path - * /hbase/replication/regions//-. As the javadoc in - * multiOrSequential() method said, if received a NodeExistsException, all operations will - * fail. So create the path here, and in fact, no need to add this operation to listOfOps, - * because only need to make sure that update file position and sequence id atomically. - */ - ZKUtil.createWithParents(zookeeper, path); - // Persist the max sequence id of region to zookeeper. - listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); - } - if (!listOfOps.isEmpty()) { - ZKUtil.multiOrSequential(zookeeper, listOfOps, false); - } + addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); } catch (KeeperException e) { throw new ReplicationException("Failed to set log position (serverName=" + serverName + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); @@ -255,6 +258,19 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase return HConstants.NO_SEQNUM; } + @Override + public void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException { + try { + List listOfOps = new ArrayList<>(); + addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps); + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + } catch (KeeperException e) { + throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId + + ", lastSeqIds.size=" + lastSeqIds.size(), e); + } + } + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index f0f77047ffd..72228f64cec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -56,6 +56,21 @@ public class AddPeerProcedure extends ModifyPeerProcedure { return PeerOperationType.ADD; } + @Override + protected boolean reopenRegionsAfterRefresh() { + return true; + } + + @Override + protected boolean enablePeerBeforeFinish() { + return enabled; + } + + @Override + protected ReplicationPeerConfig getNewPeerConfig() { + return peerConfig; + } + @Override protected void prePeerModification(MasterProcedureEnv env) throws IOException, ReplicationException { @@ -68,11 +83,13 @@ public class AddPeerProcedure extends ModifyPeerProcedure { @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { - env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled); + env.getReplicationPeerManager().addPeer(peerId, peerConfig, + peerConfig.isSerial() ? false : enabled); } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 83c5134c027..2b764873962 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -18,11 +18,28 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.TableStateManager; +import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException; +import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +55,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure new RefreshPeerProcedure(peerId, type, sn)) + .toArray(RefreshPeerProcedure[]::new)); + } + + protected ReplicationPeerConfig getOldPeerConfig() { + return null; + } + + protected ReplicationPeerConfig getNewPeerConfig() { + throw new UnsupportedOperationException(); + } + + private Stream getTables(MasterProcedureEnv env) throws IOException { + ReplicationPeerConfig peerConfig = getNewPeerConfig(); + Stream stream = env.getMasterServices().getTableDescriptors().getAll().values() + .stream().filter(TableDescriptor::hasGlobalReplicationScope) + .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName())); + ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); + if (oldPeerConfig != null && oldPeerConfig.isSerial()) { + stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName())); + } + return stream; + } + + private void reopenRegions(MasterProcedureEnv env) throws IOException { + Stream stream = getTables(env); + TableStateManager tsm = env.getMasterServices().getTableStateManager(); + stream.filter(td -> { + try { + return tsm.getTableState(td.getTableName()).isEnabled(); + } catch (TableStateNotFoundException e) { + return false; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).forEach(td -> { + try { + addChildProcedure(env.getAssignmentManager().createReopenProcedures( + env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName()))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + private void addToMap(Map lastSeqIds, String encodedRegionName, long barrier, + ReplicationQueueStorage queueStorage) throws ReplicationException { + if (barrier >= 0) { + lastSeqIds.put(encodedRegionName, barrier); + if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + lastSeqIds.clear(); + } + } + } + + private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + Stream stream = getTables(env); + TableStateManager tsm = env.getMasterServices().getTableStateManager(); + ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); + Connection conn = env.getMasterServices().getConnection(); + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + Map lastSeqIds = new HashMap(); + stream.forEach(td -> { + try { + if (tsm.getTableState(td.getTableName()).isEnabled()) { + for (Pair name2Barrier : MetaTableAccessor + .getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) { + addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, + queueStorage); + } + } else { + for (RegionInfo region : regionStates.getRegionsOfTable(td.getTableName(), true)) { + long maxSequenceId = + WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage); + } + } + } catch (IOException | ReplicationException e) { + throw new RuntimeException(e); + } + }); + if (!lastSeqIds.isEmpty()) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + } + } + @Override protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { @@ -104,9 +231,42 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure new RefreshPeerProcedure(peerId, getPeerOperationType(), sn)) - .toArray(RefreshPeerProcedure[]::new)); + refreshPeer(env, getPeerOperationType()); + setNextState(reopenRegionsAfterRefresh() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS + : PeerModificationState.POST_PEER_MODIFICATION); + return Flow.HAS_MORE_STATE; + case SERIAL_PEER_REOPEN_REGIONS: + try { + reopenRegions(env); + } catch (Exception e) { + LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); + return Flow.HAS_MORE_STATE; + case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: + try { + setLastSequenceIdForSerialPeer(env); + } catch (Exception e) { + LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(), + peerId, e); + throw new ProcedureYieldException(); + } + setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED + : PeerModificationState.POST_PEER_MODIFICATION); + return Flow.HAS_MORE_STATE; + case SERIAL_PEER_SET_PEER_ENABLED: + try { + env.getReplicationPeerManager().enablePeer(peerId); + } catch (ReplicationException e) { + LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(), + peerId, e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); + return Flow.HAS_MORE_STATE; + case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: + refreshPeer(env, PeerOperationType.ENABLE); setNextState(PeerModificationState.POST_PEER_MODIFICATION); return Flow.HAS_MORE_STATE; case POST_PEER_MODIFICATION: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 1e933730d41..a0e01e0edea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -85,7 +85,7 @@ public class ReplicationPeerManager { } } - public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) + void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) throws DoNotRetryIOException, ReplicationException { if (peerId.contains("-")) { throw new DoNotRetryIOException("Found invalid peer name: " + peerId); @@ -109,43 +109,47 @@ public class ReplicationPeerManager { return desc; } - public void preRemovePeer(String peerId) throws DoNotRetryIOException { + void preRemovePeer(String peerId) throws DoNotRetryIOException { checkPeerExists(peerId); } - public void preEnablePeer(String peerId) throws DoNotRetryIOException { + void preEnablePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); if (desc.isEnabled()) { throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); } } - public void preDisablePeer(String peerId) throws DoNotRetryIOException { + void preDisablePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); if (!desc.isEnabled()) { throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); } } - public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + /** + * Return the old peer description. Can never be null. + */ + ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkPeerConfig(peerConfig); ReplicationPeerDescription desc = checkPeerExists(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { throw new DoNotRetryIOException( - "Changing the cluster key on an existing peer is not allowed. Existing key '" + - oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + - peerConfig.getClusterKey() + "'"); + "Changing the cluster key on an existing peer is not allowed. Existing key '" + + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + + peerConfig.getClusterKey() + "'"); } if (!isStringEquals(peerConfig.getReplicationEndpointImpl(), oldPeerConfig.getReplicationEndpointImpl())) { throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + - "on an existing peer is not allowed. Existing class '" + - oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + - " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + "on an existing peer is not allowed. Existing class '" + + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); } + return desc; } public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) @@ -216,7 +220,7 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } - public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in @@ -340,7 +344,7 @@ public class ReplicationPeerManager { public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) throws ReplicationException { ReplicationPeerStorage peerStorage = - ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); + ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); ConcurrentMap peers = new ConcurrentHashMap<>(); for (String peerId : peerStorage.listPeerIds()) { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); @@ -348,7 +352,7 @@ public class ReplicationPeerManager { peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); } return new ReplicationPeerManager(peerStorage, - ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); + ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index 34974475ab0..b7e670af6c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +42,10 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { private ReplicationPeerConfig peerConfig; + private ReplicationPeerConfig oldPeerConfig; + + private boolean enabled; + public UpdatePeerConfigProcedure() { } @@ -53,22 +59,54 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { return PeerOperationType.UPDATE_CONFIG; } + @Override + protected boolean reopenRegionsAfterRefresh() { + // If we remove some tables from the peer config then we do not need to enter the extra states + // for serial replication. Could try to optimize later since it is not easy to determine this... + return peerConfig.isSerial() && (!oldPeerConfig.isSerial() || + !ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig)); + } + + @Override + protected boolean enablePeerBeforeFinish() { + // do not need to test reopenRegionsAfterRefresh since we can only enter here if + // reopenRegionsAfterRefresh returns true. + return enabled; + } + + @Override + protected ReplicationPeerConfig getOldPeerConfig() { + return oldPeerConfig; + } + + @Override + protected ReplicationPeerConfig getNewPeerConfig() { + return peerConfig; + } + @Override protected void prePeerModification(MasterProcedureEnv env) throws IOException { MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); } - env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); + ReplicationPeerDescription desc = + env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); + oldPeerConfig = desc.getPeerConfig(); + enabled = desc.isEnabled(); } @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig); + if (enabled && reopenRegionsAfterRefresh()) { + env.getReplicationPeerManager().disablePeer(peerId); + } } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { @@ -79,14 +117,23 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); - serializer.serialize(UpdatePeerConfigStateData.newBuilder() - .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build()); + UpdatePeerConfigStateData.Builder builder = UpdatePeerConfigStateData.newBuilder() + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); + if (oldPeerConfig != null) { + builder.setOldPeerConfig(ReplicationPeerConfigUtil.convert(oldPeerConfig)); + } + serializer.serialize(builder.build()); } @Override protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { super.deserializeStateData(serializer); - peerConfig = ReplicationPeerConfigUtil - .convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig()); + UpdatePeerConfigStateData data = serializer.deserialize(UpdatePeerConfigStateData.class); + peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); + if (data.hasOldPeerConfig()) { + oldPeerConfig = ReplicationPeerConfigUtil.convert(data.getOldPeerConfig()); + } else { + oldPeerConfig = null; + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index a02d18163f1..78c19775a5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; @@ -99,19 +100,26 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { @Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { Lock peerLock = peersLock.acquireLock(peerId); + ReplicationPeers peers = replicationSourceManager.getReplicationPeers(); ReplicationPeerImpl peer = null; ReplicationPeerConfig oldConfig = null; + PeerState oldState = null; boolean success = false; try { - peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); + peer = peers.getPeer(peerId); if (peer == null) { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } oldConfig = peer.getPeerConfig(); - ReplicationPeerConfig newConfig = - replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); + oldState = peer.getPeerState(); + ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId); + // also need to refresh peer state here. When updating a serial replication peer we may + // disable it first and then enable it. + PeerState newState = peers.refreshPeerState(peerId); // RS need to start work with the new replication config change - if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { + if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) || + oldConfig.isSerial() != newConfig.isSerial() || + (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))) { replicationSourceManager.refreshSources(peerId); } success = true; @@ -119,6 +127,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { if (!success && peer != null) { // Reset peer config if refresh source failed peer.setPeerConfig(oldConfig); + peer.setPeerState(oldState.equals(PeerState.ENABLED)); } peerLock.unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 23e1115bb96..3ecc50ad8d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -510,7 +510,7 @@ public class ReplicationSourceManager implements ReplicationListener { // synchronized on walsById to avoid race with preLogRoll synchronized (this.walsById) { NavigableSet wals = walsById.get(queueId).get(logPrefix); - if (wals != null && !wals.first().equals(log)) { + if (wals != null) { cleanOldLogs(wals, log, inclusive, queueId); } } @@ -755,7 +755,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @return a sorted set of wal names */ @VisibleForTesting - Map>> getWALs() { + public Map>> getWALs() { return Collections.unmodifiableMap(walsById); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 960d47341c2..22b2de78173 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -157,4 +157,12 @@ class WALEntryBatch { public void setLastSeqId(String region, long sequenceId) { lastSeqIds.put(region, sequenceId); } + + @Override + public String toString() { + return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath + + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" + + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" + + endOfFile + "]"; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java new file mode 100644 index 00000000000..83afd811160 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; + +/** + * Base class for testing serial replication. + */ +public class SerialReplicationTestBase { + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static String PEER_ID = "1"; + + protected static byte[] CF = Bytes.toBytes("CF"); + + protected static byte[] CQ = Bytes.toBytes("CQ"); + + protected static FileSystem FS; + + protected static Path LOG_DIR; + + protected static WALProvider.Writer WRITER; + + @Rule + public final TestName name = new TestName(); + + protected Path logPath; + + public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { + + private static final UUID PEER_UUID = UUID.randomUUID(); + + @Override + public UUID getPeerUUID() { + return PEER_UUID; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + synchronized (WRITER) { + try { + for (Entry entry : replicateContext.getEntries()) { + WRITER.append(entry); + } + WRITER.sync(false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return true; + } + + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); + UTIL.startMiniCluster(3); + // disable balancer + UTIL.getAdmin().balancerSwitch(false, true); + LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); + FS = UTIL.getTestFileSystem(); + FS.mkdirs(LOG_DIR); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + UTIL.getAdmin().removeReplicationPeer(PEER_ID); + rollAllWALs(); + if (WRITER != null) { + WRITER.close(); + WRITER = null; + } + } + + protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(rs.getServerName().getServerName())); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return rs.getRegion(region.getEncodedName()) != null; + } + + @Override + public String explainFailure() throws Exception { + return region + " is still not on " + rs; + } + }); + } + + protected static void rollAllWALs() throws Exception { + for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { + t.getRegionServer().getWalRoller().requestRollAll(); + } + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() + .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); + } + + @Override + public String explainFailure() throws Exception { + return "Log roll has not finished yet"; + } + }); + } + + protected final void setupWALWriter() throws IOException { + logPath = new Path(LOG_DIR, name.getMethodName()); + WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + } + + protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { + int count = 0; + while (reader.next() != null) { + count++; + } + return count >= expectedEntries; + } catch (IOException e) { + return false; + } + } + + @Override + public String explainFailure() throws Exception { + return "Not enough entries replicated"; + } + }); + } + + protected final void addPeer(boolean enabled) throws IOException { + UTIL.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) + .build(), + enabled); + } + + protected final void checkOrder(int expectedEntries) throws IOException { + try (WAL.Reader reader = + WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + long seqId = -1L; + int count = 0; + for (Entry entry;;) { + entry = reader.next(); + if (entry == null) { + break; + } + assertTrue( + "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), + entry.getKey().getSequenceId() >= seqId); + count++; + } + assertEquals(expectedEntries, count); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java new file mode 100644 index 00000000000..64b5bb1ec9a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.Collections; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +/** + * Testcase for HBASE-20147. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class); + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + setupWALWriter(); + } + + // make sure that we will start replication for the sequence id after move, that's what we want to + // test here. + private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception { + moveRegion(region, rs); + rollAllWALs(); + } + + private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs) throws Exception { + Path path = ((AbstractFSWAL) rs.getWAL(null)).getCurrentFileName(); + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName()); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + ReplicationSourceManager manager = + ((Replication) rs.getReplicationSourceService()).getReplicationManager(); + return manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1; + } + + @Override + public String explainFailure() throws Exception { + return "Still not replicated to the current WAL file yet"; + } + }); + } + + @Test + public void testAddPeer() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); + moveRegionAndArchiveOldWals(region, rs); + addPeer(true); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } + + @Test + public void testChangeToSerial() throws Exception { + ReplicationPeerConfig peerConfig = + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); + UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); + + TableName tableName = TableName.valueOf(name.getMethodName()); + + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); + HRegionServer rs = UTIL.getOtherRegionServer(srcRs); + moveRegionAndArchiveOldWals(region, rs); + waitUntilReplicationDone(100); + waitUntilReplicatedToTheCurrentWALFile(srcRs); + + UTIL.getAdmin().disableReplicationPeer(PEER_ID); + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, + ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build()); + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(200); + checkOrder(200); + } + + @Test + public void testAddToSerialPeer() throws Exception { + ReplicationPeerConfig peerConfig = + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()) + .setReplicateAllUserTables(false).setSerial(true).build(); + UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); + + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); + HRegionServer rs = UTIL.getOtherRegionServer(srcRs); + moveRegionAndArchiveOldWals(region, rs); + waitUntilReplicatedToTheCurrentWALFile(rs); + UTIL.getAdmin().disableReplicationPeer(PEER_ID); + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, + ReplicationPeerConfig.newBuilder(peerConfig) + .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build()); + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } + + @Test + public void testDisabledTable() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + UTIL.getAdmin().disableTable(tableName); + rollAllWALs(); + addPeer(true); + UTIL.getAdmin().enableTable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index 37f11f6c3cf..94b79d97c81 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -23,211 +23,49 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; -import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestSerialReplication { +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestSerialReplication extends SerialReplicationTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestSerialReplication.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static String PEER_ID = "1"; - - private static byte[] CF = Bytes.toBytes("CF"); - - private static byte[] CQ = Bytes.toBytes("CQ"); - - private static FileSystem FS; - - private static Path LOG_DIR; - - private static WALProvider.Writer WRITER; - - public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { - - private static final UUID PEER_UUID = UUID.randomUUID(); - - @Override - public UUID getPeerUUID() { - return PEER_UUID; - } - - @Override - public boolean replicate(ReplicateContext replicateContext) { - synchronized (WRITER) { - try { - for (Entry entry : replicateContext.getEntries()) { - WRITER.append(entry); - } - WRITER.sync(false); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - return true; - } - - @Override - public void start() { - startAsync(); - } - - @Override - public void stop() { - stopAsync(); - } - - @Override - protected void doStart() { - notifyStarted(); - } - - @Override - protected void doStop() { - notifyStopped(); - } - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); - UTIL.startMiniCluster(3); - // disable balancer - UTIL.getAdmin().balancerSwitch(false, true); - LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); - FS = UTIL.getTestFileSystem(); - FS.mkdirs(LOG_DIR); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Rule - public final TestName name = new TestName(); - - private Path logPath; - @Before public void setUp() throws IOException, StreamLacksCapabilityException { - logPath = new Path(LOG_DIR, name.getMethodName()); - WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + setupWALWriter(); // add in disable state, so later when enabling it all sources will start push together. - UTIL.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") - .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) - .build(), - false); - } - - @After - public void tearDown() throws Exception { - UTIL.getAdmin().removeReplicationPeer(PEER_ID); - for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { - t.getRegionServer().getWalRoller().requestRollAll(); - } - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() - .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); - } - - @Override - public String explainFailure() throws Exception { - return "Log roll has not finished yet"; - } - }); - for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { - t.getRegionServer().getWalRoller().requestRollAll(); - } - if (WRITER != null) { - WRITER.close(); - WRITER = null; - } - } - - private void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { - UTIL.getAdmin().move(region.getEncodedNameAsBytes(), - Bytes.toBytes(rs.getServerName().getServerName())); - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return rs.getRegion(region.getEncodedName()) != null; - } - - @Override - public String explainFailure() throws Exception { - return region + " is still not on " + rs; - } - }); + addPeer(false); } private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { UTIL.getAdmin().enableReplicationPeer(PEER_ID); - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { - int count = 0; - while (reader.next() != null) { - count++; - } - return count >= expectedEntries; - } catch (IOException e) { - return false; - } - } - - @Override - public String explainFailure() throws Exception { - return "Not enough entries replicated"; - } - }); + waitUntilReplicationDone(expectedEntries); } @Test @@ -251,22 +89,7 @@ public class TestSerialReplication { } } enablePeerAndWaitUntilReplicationDone(200); - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { - long seqId = -1L; - int count = 0; - for (Entry entry;;) { - entry = reader.next(); - if (entry == null) { - break; - } - assertTrue( - "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), - entry.getKey().getSequenceId() >= seqId); - count++; - } - assertEquals(200, count); - } + checkOrder(200); } @Test