diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 157ad1b6ed7..c1b39116c88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.protobuf; @@ -24,24 +23,24 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.WAL.Entry; - -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; @InterfaceAudience.Private public class ReplicationProtbufUtil { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5cbc4da652c..e3c2ca3cb56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1996,7 +1996,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private boolean shouldForbidMajorCompaction() { if (rsServices != null && rsServices.getReplicationSourceService() != null) { return rsServices.getReplicationSourceService().getSyncReplicationPeerInfoProvider() - .checkState(getRegionInfo(), ForbidMajorCompactionChecker.get()); + .checkState(getRegionInfo().getTable(), ForbidMajorCompactionChecker.get()); } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 677ee05e9ca..1a71cb71c38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2481,10 +2481,9 @@ public class HRegionServer extends HasThread implements } /** - * @return Return the object that implements the replication - * sink executorService. + * @return Return the object that implements the replication sink executorService. */ - ReplicationSinkService getReplicationSinkService() { + public ReplicationSinkService getReplicationSinkService() { return replicationSinkHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5316ac5ea77..bdb86d08de8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -2204,9 +2205,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private void checkShouldRejectReplicationRequest(List entries) throws IOException { + ReplicationSourceService replicationSource = regionServer.getReplicationSourceService(); + if (replicationSource == null || entries.isEmpty()) { + return; + } + // We can ensure that all entries are for one peer, so only need to check one entry's + // table name. if the table hit sync replication at peer side and the peer cluster + // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply + // those entries according to the design doc. + TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); + if (replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, + RejectReplicationRequestStateChecker.get())) { + throw new DoNotRetryIOException( + "Reject to apply to sink cluster because sync replication state of sink cluster " + + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); + } + } + /** * Replicate WAL entries on the region server. - * * @param controller the RPC controller * @param request the request * @throws ServiceException @@ -2220,7 +2238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (regionServer.replicationSinkHandler != null) { requestCount.increment(); List entries = request.getEntryList(); - CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); + checkShouldRejectReplicationRequest(entries); + CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), @@ -2435,7 +2454,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private boolean shouldRejectRequestsFromClient(HRegion region) { return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider() - .checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get()); + .checkState(region.getRegionInfo().getTable(), RejectRequestsFromClientStateChecker.get()); } private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java new file mode 100644 index 00000000000..9ad0af2286e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.function.BiPredicate; + +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Check whether we need to reject the replication request from source cluster. + */ +@InterfaceAudience.Private +public class RejectReplicationRequestStateChecker + implements BiPredicate { + + private static final RejectReplicationRequestStateChecker INST = + new RejectReplicationRequestStateChecker(); + + @Override + public boolean test(SyncReplicationState state, SyncReplicationState newState) { + return state == SyncReplicationState.ACTIVE || state == SyncReplicationState.DOWNGRADE_ACTIVE + || newState == SyncReplicationState.ACTIVE + || newState == SyncReplicationState.DOWNGRADE_ACTIVE; + } + + public static RejectReplicationRequestStateChecker get() { + return INST; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index c9c5ef647d2..3cf065c03d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -46,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; /** @@ -81,14 +80,19 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { throw initError; } LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId); - try (Reader reader = getReader()) { - List entries = readWALEntries(reader); - while (!entries.isEmpty()) { - Pair pair = ReplicationProtbufUtil - .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); - HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond()); - rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst()); - entries = readWALEntries(reader); + if (rs.getReplicationSinkService() != null) { + try (Reader reader = getReader()) { + List entries = readWALEntries(reader); + while (!entries.isEmpty()) { + Pair pair = ReplicationProtbufUtil + .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); + ReplicateWALEntryRequest request = pair.getFirst(); + rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), + pair.getSecond(), request.getReplicationClusterId(), + request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); + // Read next entries. + entries = readWALEntries(reader); + } } } return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 2846d2cedc3..2199415b4b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -275,7 +275,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer List oldSources = this.replicationManager.getOldSources(); for (ReplicationSourceInterface source : oldSources) { if (source instanceof ReplicationSource) { - sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + sourceMetricsList.add(source.getSourceMetrics()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index eb09a3a0564..a334b16df85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -93,9 +94,8 @@ public class ReplicationSink { /** * Create a sink for replication - * - * @param conf conf object - * @param stopper boolean to tell this thread to stop + * @param conf conf object + * @param stopper boolean to tell this thread to stop * @throws IOException thrown when HDFS goes bad or bad file name */ public ReplicationSink(Configuration conf, Stoppable stopper) @@ -104,16 +104,15 @@ public class ReplicationSink { decorateConf(); this.metrics = new MetricsSink(); this.walEntrySinkFilter = setupWALEntrySinkFilter(); - String className = - conf.get("hbase.replication.source.fs.conf.provider", - DefaultSourceFSConfigurationProvider.class.getCanonicalName()); + String className = conf.get("hbase.replication.source.fs.conf.provider", + DefaultSourceFSConfigurationProvider.class.getCanonicalName()); try { Class c = Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); this.provider = c.getDeclaredConstructor().newInstance(); } catch (Exception e) { throw new IllegalArgumentException( - "Configured source fs configuration provider class " + className + " throws error.", e); + "Configured source fs configuration provider class " + className + " throws error.", e); } } @@ -178,8 +177,7 @@ public class ReplicationSink { Map>>> bulkLoadHFileMap = null; for (WALEntry entry : entries) { - TableName table = - TableName.valueOf(entry.getKey().getTableName().toByteArray()); + TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); if (this.walEntrySinkFilter != null) { if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { // Skip Cells in CellScanner associated with this entry. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java index 66fe3be40f7..cfe525ac5d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Optional; import java.util.function.BiPredicate; -import org.apache.hadoop.hbase.client.RegionInfo; + +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -31,17 +32,17 @@ import org.apache.yetus.audience.InterfaceAudience; public interface SyncReplicationPeerInfoProvider { /** - * Return the peer id and remote WAL directory if the region is synchronously replicated and the + * Return the peer id and remote WAL directory if the table is synchronously replicated and the * state is {@link SyncReplicationState#ACTIVE}. */ - Optional> getPeerIdAndRemoteWALDir(RegionInfo info); + Optional> getPeerIdAndRemoteWALDir(TableName table); /** - * Check whether the give region is contained in a sync replication peer which can pass the state + * Check whether the given table is contained in a sync replication peer which can pass the state * checker. *

* Will call the checker with current sync replication state and new sync replication state. */ - boolean checkState(RegionInfo info, + boolean checkState(TableName table, BiPredicate checker); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java index cb33dabfcc3..75274ea09f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Optional; import java.util.function.BiPredicate; -import org.apache.hadoop.hbase.client.RegionInfo; + +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.SyncReplicationState; @@ -40,11 +41,11 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv } @Override - public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { - if (info == null) { + public Optional> getPeerIdAndRemoteWALDir(TableName table) { + if (table == null) { return Optional.empty(); } - String peerId = mapping.getPeerId(info); + String peerId = mapping.getPeerId(table); if (peerId == null) { return Optional.empty(); } @@ -65,9 +66,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv } @Override - public boolean checkState(RegionInfo info, + public boolean checkState(TableName table, BiPredicate checker) { - String peerId = mapping.getPeerId(info); + String peerId = mapping.getPeerId(table); if (peerId == null) { return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java index 64216cb1425..5d19f722446 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; @@ -42,7 +41,7 @@ class SyncReplicationPeerMappingManager { peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove); } - String getPeerId(RegionInfo info) { - return table2PeerId.get(info.getTable()); + String getPeerId(TableName tableName) { + return table2PeerId.get(tableName); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 3cd356d4214..3b56aa21ff1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -33,6 +33,7 @@ import java.util.function.BiPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -160,7 +161,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } WAL wal = null; Optional> peerIdAndRemoteWALDir = - peerInfoProvider.getPeerIdAndRemoteWALDir(region); + peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable()); if (peerIdAndRemoteWALDir.isPresent()) { Pair pair = peerIdAndRemoteWALDir.get(); wal = getWAL(pair.getFirst(), pair.getSecond()); @@ -273,12 +274,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen implements SyncReplicationPeerInfoProvider { @Override - public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { + public Optional> getPeerIdAndRemoteWALDir(TableName table) { return Optional.empty(); } @Override - public boolean checkState(RegionInfo info, + public boolean checkState(TableName table, BiPredicate checker) { return false; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index 30dbdb52c49..0d5fce8854e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -25,11 +25,13 @@ import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -37,9 +39,15 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -182,4 +190,28 @@ public class SyncReplicationTestBase { Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); return new Path(remoteWALDir, PEER_ID); } + + protected void verifyReplicationRequestRejection(HBaseTestingUtility utility, + boolean expectedRejection) throws Exception { + HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); + ClusterConnection connection = regionServer.getClusterConnection(); + Entry[] entries = new Entry[10]; + for (int i = 0; i < entries.length; i++) { + entries[i] = + new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); + } + if (!expectedRejection) { + ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), + entries, null, null, null); + } else { + try { + ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), + entries, null, null, null); + Assert.fail("Should throw IOException when sync-replication state is in A or DA"); + } catch (DoNotRetryIOException e) { + Assert.assertTrue(e.getMessage().contains("Reject to apply to sink cluster")); + Assert.assertTrue(e.getMessage().contains(TABLE_NAME.toString())); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index f4fb5fe2ce1..bff45722ebe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -29,7 +29,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSyncReplicationActive.class); + HBaseClassTestRule.forClass(TestSyncReplicationActive.class); + @Test public void testActive() throws Exception { @@ -37,13 +38,21 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { SyncReplicationState.STANDBY); UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.ACTIVE); + + // confirm that peer with state A will reject replication request. + verifyReplicationRequestRejection(UTIL1, true); + verifyReplicationRequestRejection(UTIL2, false); + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); write(UTIL1, 0, 100); Thread.sleep(2000); // peer is disabled so no data have been replicated verifyNotReplicatedThroughRegion(UTIL2, 0, 100); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.DOWNGRADE_ACTIVE); + // confirm that peer with state DA will reject replication request. + verifyReplicationRequestRejection(UTIL2, true); // confirm that the data is there after we convert the peer to DA verify(UTIL2, 0, 100); @@ -59,6 +68,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { // confirm that we can convert to DA even if the remote slave cluster is down UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.DOWNGRADE_ACTIVE); + // confirm that peer with state DA will reject replication request. + verifyReplicationRequestRejection(UTIL2, true); write(UTIL2, 200, 300); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index aa6c39cd53a..2d6c28fd760 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -129,8 +129,7 @@ public class TestReplicationSink { TestSourceFSConfigurationProvider.class.getCanonicalName()); TEST_UTIL.startMiniCluster(3); - SINK = - new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); @@ -419,7 +418,7 @@ public class TestReplicationSink { return builder.build(); } - private WALEntry.Builder createWALEntryBuilder(TableName table) { + public static WALEntry.Builder createWALEntryBuilder(TableName table) { WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 62990656195..fd9ff2924bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilder; @@ -127,7 +128,7 @@ public class TestWALEntrySinkFilter { conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass("hbase.client.connection.impl", DevNullConnection.class, - Connection.class); + Connection.class); ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); // Create some dumb walentries. List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 3263fe894b1..69ed44dc2e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -75,8 +75,8 @@ public class TestSyncReplicationWALProvider { public static final class InfoProvider implements SyncReplicationPeerInfoProvider { @Override - public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { - if (info.getTable().equals(TABLE)) { + public Optional> getPeerIdAndRemoteWALDir(TableName table) { + if (table != null && table.equals(TABLE)) { return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); } else { return Optional.empty(); @@ -84,7 +84,7 @@ public class TestSyncReplicationWALProvider { } @Override - public boolean checkState(RegionInfo info, + public boolean checkState(TableName table, BiPredicate checker) { // TODO Implement SyncReplicationPeerInfoProvider.isInState return false;