HBASE-19782 Reject the replication request when peer is DA or A state

This commit is contained in:
huzheng 2018-03-02 18:05:29 +08:00 committed by zhangduo
parent d91784e666
commit fe339860b5
17 changed files with 169 additions and 60 deletions

View File

@ -16,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.protobuf;
@ -24,24 +23,24 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
@InterfaceAudience.Private
public class ReplicationProtbufUtil {

View File

@ -1996,7 +1996,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private boolean shouldForbidMajorCompaction() {
if (rsServices != null && rsServices.getReplicationSourceService() != null) {
return rsServices.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
.checkState(getRegionInfo(), ForbidMajorCompactionChecker.get());
.checkState(getRegionInfo().getTable(), ForbidMajorCompactionChecker.get());
}
return false;
}

View File

@ -2481,10 +2481,9 @@ public class HRegionServer extends HasThread implements
}
/**
* @return Return the object that implements the replication
* sink executorService.
* @return Return the object that implements the replication sink executorService.
*/
ReplicationSinkService getReplicationSinkService() {
public ReplicationSinkService getReplicationSinkService() {
return replicationSinkHandler;
}

View File

@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
@ -2204,9 +2205,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
ReplicationSourceService replicationSource = regionServer.getReplicationSourceService();
if (replicationSource == null || entries.isEmpty()) {
return;
}
// We can ensure that all entries are for one peer, so only need to check one entry's
// table name. if the table hit sync replication at peer side and the peer cluster
// is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
// those entries according to the design doc.
TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
if (replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
RejectReplicationRequestStateChecker.get())) {
throw new DoNotRetryIOException(
"Reject to apply to sink cluster because sync replication state of sink cluster "
+ "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
}
}
/**
* Replicate WAL entries on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
@ -2220,6 +2238,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (regionServer.replicationSinkHandler != null) {
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
checkShouldRejectReplicationRequest(entries);
CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
@ -2435,7 +2454,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private boolean shouldRejectRequestsFromClient(HRegion region) {
return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
.checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get());
.checkState(region.getRegionInfo().getTable(), RejectRequestsFromClientStateChecker.get());
}
private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Check whether we need to reject the replication request from source cluster.
*/
@InterfaceAudience.Private
public class RejectReplicationRequestStateChecker
implements BiPredicate<SyncReplicationState, SyncReplicationState> {
private static final RejectReplicationRequestStateChecker INST =
new RejectReplicationRequestStateChecker();
@Override
public boolean test(SyncReplicationState state, SyncReplicationState newState) {
return state == SyncReplicationState.ACTIVE || state == SyncReplicationState.DOWNGRADE_ACTIVE
|| newState == SyncReplicationState.ACTIVE
|| newState == SyncReplicationState.DOWNGRADE_ACTIVE;
}
public static RejectReplicationRequestStateChecker get() {
return INST;
}
}

View File

@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -46,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
/**
@ -81,16 +80,21 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
throw initError;
}
LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId);
if (rs.getReplicationSinkService() != null) {
try (Reader reader = getReader()) {
List<Entry> entries = readWALEntries(reader);
while (!entries.isEmpty()) {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
.buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond());
rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst());
ReplicateWALEntryRequest request = pair.getFirst();
rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
pair.getSecond(), request.getReplicationClusterId(),
request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath());
// Read next entries.
entries = readWALEntries(reader);
}
}
}
return null;
}

View File

@ -275,7 +275,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
for (ReplicationSourceInterface source : oldSources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
sourceMetricsList.add(source.getSourceMetrics());
}
}

View File

@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -93,7 +94,6 @@ public class ReplicationSink {
/**
* Create a sink for replication
*
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @throws IOException thrown when HDFS goes bad or bad file name
@ -104,8 +104,7 @@ public class ReplicationSink {
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
String className =
conf.get("hbase.replication.source.fs.conf.provider",
String className = conf.get("hbase.replication.source.fs.conf.provider",
DefaultSourceFSConfigurationProvider.class.getCanonicalName());
try {
Class<? extends SourceFSConfigurationProvider> c =
@ -178,8 +177,7 @@ public class ReplicationSink {
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
for (WALEntry entry : entries) {
TableName table =
TableName.valueOf(entry.getKey().getTableName().toByteArray());
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
// Skip Cells in CellScanner associated with this entry.

View File

@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@ -31,17 +32,17 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface SyncReplicationPeerInfoProvider {
/**
* Return the peer id and remote WAL directory if the region is synchronously replicated and the
* Return the peer id and remote WAL directory if the table is synchronously replicated and the
* state is {@link SyncReplicationState#ACTIVE}.
*/
Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table);
/**
* Check whether the give region is contained in a sync replication peer which can pass the state
* Check whether the given table is contained in a sync replication peer which can pass the state
* checker.
* <p>
* Will call the checker with current sync replication state and new sync replication state.
*/
boolean checkState(RegionInfo info,
boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker);
}

View File

@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
@ -40,11 +41,11 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
}
@Override
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
if (info == null) {
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
if (table == null) {
return Optional.empty();
}
String peerId = mapping.getPeerId(info);
String peerId = mapping.getPeerId(table);
if (peerId == null) {
return Optional.empty();
}
@ -65,9 +66,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
}
@Override
public boolean checkState(RegionInfo info,
public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
String peerId = mapping.getPeerId(info);
String peerId = mapping.getPeerId(table);
if (peerId == null) {
return false;
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
@ -42,7 +41,7 @@ class SyncReplicationPeerMappingManager {
peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove);
}
String getPeerId(RegionInfo info) {
return table2PeerId.get(info.getTable());
String getPeerId(TableName tableName) {
return table2PeerId.get(tableName);
}
}

View File

@ -33,6 +33,7 @@ import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@ -160,7 +161,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
WAL wal = null;
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerInfoProvider.getPeerIdAndRemoteWALDir(region);
peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable());
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
wal = getWAL(pair.getFirst(), pair.getSecond());
@ -273,12 +274,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
implements SyncReplicationPeerInfoProvider {
@Override
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
return Optional.empty();
}
@Override
public boolean checkState(RegionInfo info,
public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
return false;
}

View File

@ -25,11 +25,13 @@ import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@ -37,9 +39,15 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@ -182,4 +190,28 @@ public class SyncReplicationTestBase {
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
return new Path(remoteWALDir, PEER_ID);
}
protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,
boolean expectedRejection) throws Exception {
HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
ClusterConnection connection = regionServer.getClusterConnection();
Entry[] entries = new Entry[10];
for (int i = 0; i < entries.length; i++) {
entries[i] =
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
}
if (!expectedRejection) {
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
entries, null, null, null);
} else {
try {
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
entries, null, null, null);
Assert.fail("Should throw IOException when sync-replication state is in A or DA");
} catch (DoNotRetryIOException e) {
Assert.assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
Assert.assertTrue(e.getMessage().contains(TABLE_NAME.toString()));
}
}
}
}

View File

@ -31,19 +31,28 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
@Test
public void testActive() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
// confirm that peer with state A will reject replication request.
verifyReplicationRequestRejection(UTIL1, true);
verifyReplicationRequestRejection(UTIL2, false);
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 0, 100);
Thread.sleep(2000);
// peer is disabled so no data have been replicated
verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
// confirm that peer with state DA will reject replication request.
verifyReplicationRequestRejection(UTIL2, true);
// confirm that the data is there after we convert the peer to DA
verify(UTIL2, 0, 100);
@ -59,6 +68,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
// confirm that we can convert to DA even if the remote slave cluster is down
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
// confirm that peer with state DA will reject replication request.
verifyReplicationRequestRejection(UTIL2, true);
write(UTIL2, 200, 300);
}
}

View File

@ -129,8 +129,7 @@ public class TestReplicationSink {
TestSourceFSConfigurationProvider.class.getCanonicalName());
TEST_UTIL.startMiniCluster(3);
SINK =
new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
@ -419,7 +418,7 @@ public class TestReplicationSink {
return builder.build();
}
private WALEntry.Builder createWALEntryBuilder(TableName table) {
public static WALEntry.Builder createWALEntryBuilder(TableName table) {
WALEntry.Builder builder = WALEntry.newBuilder();
builder.setAssociatedCellCount(1);
WALKey.Builder keyBuilder = WALKey.newBuilder();

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;

View File

@ -75,8 +75,8 @@ public class TestSyncReplicationWALProvider {
public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
@Override
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
if (info.getTable().equals(TABLE)) {
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
if (table != null && table.equals(TABLE)) {
return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
} else {
return Optional.empty();
@ -84,7 +84,7 @@ public class TestSyncReplicationWALProvider {
}
@Override
public boolean checkState(RegionInfo info,
public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
// TODO Implement SyncReplicationPeerInfoProvider.isInState
return false;