HBASE-19082 Reject read/write from client but accept write from replication in state S

This commit is contained in:
zhangduo 2018-02-12 18:20:18 +08:00
parent 39dd81a7c6
commit a41c549ca4
15 changed files with 401 additions and 74 deletions

View File

@ -1355,9 +1355,6 @@ public final class HConstants {
public static final String NOT_IMPLEMENTED = "Not implemented"; public static final String NOT_IMPLEMENTED = "Not implemented";
// TODO: need to find a better place to hold it.
public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled";
private HConstants() { private HConstants() {
// Can't be instantiated with this ctor. // Can't be instantiated with this ctor.
} }

View File

@ -399,7 +399,8 @@ enum PeerSyncReplicationStateTransitionState {
REOPEN_ALL_REGIONS_IN_PEER = 5; REOPEN_ALL_REGIONS_IN_PEER = 5;
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6; TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7; REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 8; CREATE_DIR_FOR_REMOTE_WAL = 8;
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9;
} }
message PeerModificationStateData { message PeerModificationStateData {

View File

@ -37,6 +37,10 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public final class ReplicationUtils { public final class ReplicationUtils {
public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled";
public static final String REPLICATION_ATTR_NAME = "__rep__";
private ReplicationUtils() { private ReplicationUtils() {
} }

View File

@ -197,8 +197,18 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new)); .toArray(RefreshPeerProcedure[]::new));
if (toState == SyncReplicationState.STANDBY) {
setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
} else {
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
}
return Flow.HAS_MORE_STATE;
case CREATE_DIR_FOR_REMOTE_WAL:
// TODO: create wal for write remote wal
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
return Flow.HAS_MORE_STATE;
case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION: case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try { try {
postTransit(env); postTransit(env);

View File

@ -4328,12 +4328,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Add updates first to the wal and then add values to memstore. * Add updates first to the wal and then add values to memstore.
* <p>
* Warning: Assumption is caller has lock on passed in row. * Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column * @param edits Cell updates by column
* @throws IOException
*/ */
void put(final byte [] row, byte [] family, List<Cell> edits) void put(final byte[] row, byte[] family, List<Cell> edits) throws IOException {
throws IOException {
NavigableMap<byte[], List<Cell>> familyMap; NavigableMap<byte[], List<Cell>> familyMap;
familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);

View File

@ -1805,7 +1805,7 @@ public class HRegionServer extends HasThread implements
boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster && boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
(!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf)); (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf));
if (isMasterNoTableOrSystemTableOnly) { if (isMasterNoTableOrSystemTableOnly) {
conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false); conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false);
} }
WALFactory factory = new WALFactory(conf, serverName.toString()); WALFactory factory = new WALFactory(conf, serverName.toString());
if (!isMasterNoTableOrSystemTableOnly) { if (!isMasterNoTableOrSystemTableOnly) {

View File

@ -120,6 +120,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.AccessChecker;
@ -2431,6 +2433,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return region.execService(execController, serviceCall); return region.execService(execController, serviceCall);
} }
private boolean shouldRejectRequestsFromClient(HRegion region) {
return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
.checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get());
}
private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
if (shouldRejectRequestsFromClient(region)) {
throw new DoNotRetryIOException(
region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
}
}
/** /**
* Get data from a table. * Get data from a table.
* *
@ -2439,8 +2453,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException * @throws ServiceException
*/ */
@Override @Override
public GetResponse get(final RpcController controller, public GetResponse get(final RpcController controller, final GetRequest request)
final GetRequest request) throws ServiceException { throws ServiceException {
long before = EnvironmentEdgeManager.currentTime(); long before = EnvironmentEdgeManager.currentTime();
OperationQuota quota = null; OperationQuota quota = null;
HRegion region = null; HRegion region = null;
@ -2449,6 +2463,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
requestCount.increment(); requestCount.increment();
rpcGetRequestCount.increment(); rpcGetRequestCount.increment();
region = getRegion(request.getRegion()); region = getRegion(request.getRegion());
rejectIfInStandByState(region);
GetResponse.Builder builder = GetResponse.newBuilder(); GetResponse.Builder builder = GetResponse.newBuilder();
ClientProtos.Get get = request.getGet(); ClientProtos.Get get = request.getGet();
@ -2587,9 +2602,38 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
private void failRegionAction(MultiResponse.Builder responseBuilder,
RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
CellScanner cellScanner, Throwable error) {
rpcServer.getMetrics().exception(error);
regionActionResultBuilder.setException(ResponseConverter.buildException(error));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
// All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations.
if (cellScanner != null) {
skipCellsForMutations(regionAction.getActionList(), cellScanner);
}
}
private boolean isReplicationRequest(Action action) {
// replication request can only be put or delete.
if (!action.hasMutation()) {
return false;
}
MutationProto mutation = action.getMutation();
MutationType type = mutation.getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE) {
return false;
}
// replication will set a special attribute so we can make use of it to decide whether a request
// is for replication.
return mutation.getAttributeList().stream().map(p -> p.getName())
.filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
}
/** /**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*
* @param rpcc the RPC controller * @param rpcc the RPC controller
* @param request the multi request * @param request the multi request
* @throws ServiceException * @throws ServiceException
@ -2636,17 +2680,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region = getRegion(regionSpecifier); region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
} catch (IOException e) { } catch (IOException e) {
rpcServer.getMetrics().exception(e); failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
// All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations.
skipCellsForMutations(regionAction.getActionList(), cellScanner);
continue; // For this region it's a failure. continue; // For this region it's a failure.
} }
boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
if (regionAction.hasAtomic() && regionAction.getAtomic()) { if (regionAction.hasAtomic() && regionAction.getAtomic()) {
// We only allow replication in standby state and it will not set the atomic flag.
if (rejectIfFromClient) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
new DoNotRetryIOException(
region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
quota.close();
continue;
}
// How does this call happen? It may need some work to play well w/ the surroundings. // How does this call happen? It may need some work to play well w/ the surroundings.
// Need to return an item per Action along w/ Action index. TODO. // Need to return an item per Action along w/ Action index. TODO.
try { try {
@ -2677,6 +2723,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionActionResultBuilder.setException(ResponseConverter.buildException(e)); regionActionResultBuilder.setException(ResponseConverter.buildException(e));
} }
} else { } else {
if (rejectIfFromClient && regionAction.getActionCount() > 0 &&
!isReplicationRequest(regionAction.getAction(0))) {
// fail if it is not a replication request
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
new DoNotRetryIOException(
region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
quota.close();
continue;
}
// doNonAtomicRegionMutation manages the exception internally // doNonAtomicRegionMutation manages the exception internally
if (context != null && closeCallBack == null) { if (context != null && closeCallBack == null) {
// An RpcCallBack that creates a list of scanners that needs to perform callBack // An RpcCallBack that creates a list of scanners that needs to perform callBack
@ -2751,8 +2806,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param request the mutate request * @param request the mutate request
*/ */
@Override @Override
public MutateResponse mutate(final RpcController rpcc, public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
final MutateRequest request) throws ServiceException { throws ServiceException {
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data. // It is also the conduit via which we pass back data.
HBaseRpcController controller = (HBaseRpcController)rpcc; HBaseRpcController controller = (HBaseRpcController)rpcc;
@ -2772,6 +2827,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
requestCount.increment(); requestCount.increment();
rpcMutateRequestCount.increment(); rpcMutateRequestCount.increment();
region = getRegion(request.getRegion()); region = getRegion(request.getRegion());
rejectIfInStandByState(region);
MutateResponse.Builder builder = MutateResponse.newBuilder(); MutateResponse.Builder builder = MutateResponse.newBuilder();
MutationProto mutation = request.getMutation(); MutationProto mutation = request.getMutation();
if (!region.getRegionInfo().isMetaRegion()) { if (!region.getRegionInfo().isMetaRegion()) {
@ -2941,6 +2997,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
"'hbase.client.scanner.timeout.period' configuration."); "'hbase.client.scanner.timeout.period' configuration.");
} }
} }
rejectIfInStandByState(rsh.r);
RegionInfo hri = rsh.s.getRegionInfo(); RegionInfo hri = rsh.s.getRegionInfo();
// Yes, should be the same instance // Yes, should be the same instance
if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
@ -2967,6 +3024,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
throws IOException { throws IOException {
HRegion region = getRegion(request.getRegion()); HRegion region = getRegion(request.getRegion());
rejectIfInStandByState(region);
ClientProtos.Scan protoScan = request.getScan(); ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan); Scan scan = ProtobufUtil.toScan(protoScan);

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Check whether we need to reject the request from client.
*/
@InterfaceAudience.Private
public class RejectRequestsFromClientStateChecker
implements BiPredicate<SyncReplicationState, SyncReplicationState> {
private static final RejectRequestsFromClientStateChecker INST =
new RejectRequestsFromClientStateChecker();
@Override
public boolean test(SyncReplicationState state, SyncReplicationState newState) {
// reject requests from client if we are in standby state, or we are going to transit to standby
// state.
return state == SyncReplicationState.STANDBY || newState == SyncReplicationState.STANDBY;
}
public static RejectRequestsFromClientStateChecker get() {
return INST;
}
}

View File

@ -1,5 +1,4 @@
/* /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -29,7 +28,6 @@ import java.util.Map.Entry;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -41,9 +39,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
@ -52,13 +47,18 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* <p> * <p>
@ -82,10 +82,10 @@ public class ReplicationSink {
private final Configuration conf; private final Configuration conf;
// Volatile because of note in here -- look for double-checked locking: // Volatile because of note in here -- look for double-checked locking:
// http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
private volatile Connection sharedHtableCon; private volatile Connection sharedConn;
private final MetricsSink metrics; private final MetricsSink metrics;
private final AtomicLong totalReplicatedEdits = new AtomicLong(); private final AtomicLong totalReplicatedEdits = new AtomicLong();
private final Object sharedHtableConLock = new Object(); private final Object sharedConnLock = new Object();
// Number of hfiles that we successfully replicated // Number of hfiles that we successfully replicated
private long hfilesReplicated = 0; private long hfilesReplicated = 0;
private SourceFSConfigurationProvider provider; private SourceFSConfigurationProvider provider;
@ -108,12 +108,12 @@ public class ReplicationSink {
conf.get("hbase.replication.source.fs.conf.provider", conf.get("hbase.replication.source.fs.conf.provider",
DefaultSourceFSConfigurationProvider.class.getCanonicalName()); DefaultSourceFSConfigurationProvider.class.getCanonicalName());
try { try {
@SuppressWarnings("rawtypes") Class<? extends SourceFSConfigurationProvider> c =
Class c = Class.forName(className); Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); this.provider = c.getDeclaredConstructor().newInstance();
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException("Configured source fs configuration provider class " throw new IllegalArgumentException(
+ className + " throws error.", e); "Configured source fs configuration provider class " + className + " throws error.", e);
} }
} }
@ -221,6 +221,8 @@ public class ReplicationSink {
clusterIds.add(toUUID(clusterId)); clusterIds.add(toUUID(clusterId));
} }
mutation.setClusterIds(clusterIds); mutation.setClusterIds(clusterIds);
mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
HConstants.EMPTY_BYTE_ARRAY);
addToHashMultiMap(rowMap, table, clusterIds, mutation); addToHashMultiMap(rowMap, table, clusterIds, mutation);
} }
if (CellUtil.isDelete(cell)) { if (CellUtil.isDelete(cell)) {
@ -374,11 +376,11 @@ public class ReplicationSink {
*/ */
public void stopReplicationSinkServices() { public void stopReplicationSinkServices() {
try { try {
if (this.sharedHtableCon != null) { if (this.sharedConn != null) {
synchronized (sharedHtableConLock) { synchronized (sharedConnLock) {
if (this.sharedHtableCon != null) { if (this.sharedConn != null) {
this.sharedHtableCon.close(); this.sharedConn.close();
this.sharedHtableCon = null; this.sharedConn = null;
} }
} }
} }
@ -394,14 +396,12 @@ public class ReplicationSink {
* @param allRows list of actions * @param allRows list of actions
* @throws IOException * @throws IOException
*/ */
protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
if (allRows.isEmpty()) { if (allRows.isEmpty()) {
return; return;
} }
Table table = null;
try {
Connection connection = getConnection(); Connection connection = getConnection();
table = connection.getTable(tableName); try (Table table = connection.getTable(tableName)) {
for (List<Row> rows : allRows) { for (List<Row> rows : allRows) {
table.batch(rows, null); table.batch(rows, null);
} }
@ -414,21 +414,18 @@ public class ReplicationSink {
throw rewde; throw rewde;
} catch (InterruptedException ix) { } catch (InterruptedException ix) {
throw (InterruptedIOException) new InterruptedIOException().initCause(ix); throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
} finally {
if (table != null) {
table.close();
}
} }
} }
private Connection getConnection() throws IOException { private Connection getConnection() throws IOException {
// See https://en.wikipedia.org/wiki/Double-checked_locking // See https://en.wikipedia.org/wiki/Double-checked_locking
Connection connection = sharedHtableCon; Connection connection = sharedConn;
if (connection == null) { if (connection == null) {
synchronized (sharedHtableConLock) { synchronized (sharedConnLock) {
connection = sharedHtableCon; connection = sharedConn;
if (connection == null) { if (connection == null) {
connection = sharedHtableCon = ConnectionFactory.createConnection(conf); connection = ConnectionFactory.createConnection(conf);
sharedConn = connection;
} }
} }
} }
@ -441,9 +438,10 @@ public class ReplicationSink {
* of the last edit that was applied * of the last edit that was applied
*/ */
public String getStats() { public String getStats() {
return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + long total = this.totalReplicatedEdits.get();
"age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + return total == 0 ? ""
", total replicated edits: " + this.totalReplicatedEdits; : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
", total replicated edits: " + total;
} }
/** /**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -36,8 +37,11 @@ public interface SyncReplicationPeerInfoProvider {
Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info); Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
/** /**
* Check whether the give region is contained in a sync replication peer which is in the given * Check whether the give region is contained in a sync replication peer which can pass the state
* state. * checker.
* <p>
* Will call the checker with current sync replication state and new sync replication state.
*/ */
boolean isInState(RegionInfo info, SyncReplicationState state); boolean checkState(RegionInfo info,
BiPredicate<SyncReplicationState, SyncReplicationState> checker);
} }

View File

@ -18,8 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -44,11 +45,14 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
if (peerId == null) { if (peerId == null) {
return Optional.empty(); return Optional.empty();
} }
ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
if (peer == null) { if (peer == null) {
return Optional.empty(); return Optional.empty();
} }
if (peer.getSyncReplicationState() == SyncReplicationState.ACTIVE) { Pair<SyncReplicationState, SyncReplicationState> states =
peer.getSyncReplicationStateAndNewState();
if (states.getFirst() == SyncReplicationState.ACTIVE &&
states.getSecond() == SyncReplicationState.NONE) {
return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir())); return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir()));
} else { } else {
return Optional.empty(); return Optional.empty();
@ -56,16 +60,19 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
} }
@Override @Override
public boolean isInState(RegionInfo info, SyncReplicationState state) { public boolean checkState(RegionInfo info,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
String peerId = mapping.getPeerId(info); String peerId = mapping.getPeerId(info);
if (peerId == null) { if (peerId == null) {
return false; return false;
} }
ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
if (peer == null) { if (peer == null) {
return false; return false;
} }
return peer.getSyncReplicationState() == state; Pair<SyncReplicationState, SyncReplicationState> states =
peer.getSyncReplicationStateAndNewState();
return checker.test(states.getFirst(), states.getSecond());
} }
} }

View File

@ -141,6 +141,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override @Override
public WAL getWAL(RegionInfo region) throws IOException { public WAL getWAL(RegionInfo region) throws IOException {
if (region == null) {
return provider.getWAL(region);
}
Optional<Pair<String, String>> peerIdAndRemoteWALDir = Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerInfoProvider.getPeerIdAndRemoteWALDir(region); peerInfoProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) { if (peerIdAndRemoteWALDir.isPresent()) {

View File

@ -24,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@ -162,7 +162,7 @@ public class WALFactory {
// end required early initialization // end required early initialization
if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
if (conf.getBoolean(HConstants.SYNC_REPLICATION_ENABLED, false)) { if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) {
provider = new SyncReplicationWALProvider(provider); provider = new SyncReplicationWALProvider(provider);
} }
provider.init(this, conf, null); provider.init(this, conf, null);

View File

@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplication {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplication.class);
private static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility();
private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("SyncRep");
private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
private static String PEER_ID = "1";
private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster());
Configuration conf = util.getConfiguration();
conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
conf.setInt("replication.source.size.capacity", 102400);
conf.setLong("replication.source.sleepforretries", 100);
conf.setInt("hbase.regionserver.maxlogs", 10);
conf.setLong("hbase.master.logcleaner.ttl", 10);
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
conf.setFloat("replication.source.ratio", 1.0f);
conf.setBoolean("replication.source.eof.autorecovery", true);
}
@BeforeClass
public static void setUp() throws Exception {
ZK_UTIL.startMiniZKCluster();
initTestingUtility(UTIL1, "/cluster1");
initTestingUtility(UTIL2, "/cluster2");
UTIL1.startMiniCluster(3);
UTIL2.startMiniCluster(3);
TableDescriptor td =
TableDescriptorBuilder.newBuilder(TABLE_NAME).addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
UTIL1.getAdmin().createTable(td);
UTIL2.getAdmin().createTable(td);
FileSystem fs1 = UTIL1.getTestFileSystem();
FileSystem fs2 = UTIL2.getTestFileSystem();
Path remoteWALDir1 =
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
Path remoteWALDir2 =
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
UTIL1.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
.setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
UTIL2.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
.setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
.setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
}
@AfterClass
public static void tearDown() throws Exception {
UTIL1.shutdownMiniCluster();
UTIL2.shutdownMiniCluster();
ZK_UTIL.shutdownMiniZKCluster();
}
@FunctionalInterface
private interface TableAction {
void call(Table table) throws IOException;
}
private void assertDisallow(Table table, TableAction action) throws IOException {
try {
action.call(table);
} catch (DoNotRetryIOException | RetriesExhaustedException e) {
// expected
assertThat(e.getMessage(), containsString("STANDBY"));
}
}
@Test
public void testStandby() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
assertDisallow(table,
t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1));
assertDisallow(table,
t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
assertDisallow(table,
t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
assertDisallow(table,
t -> t
.put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
.add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
}
// But we should still allow replication writes
try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
// The reject check is in RSRpcService so we can still read through HRegion
HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
UTIL2.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !region.get(new Get(Bytes.toBytes(99))).isEmpty();
}
@Override
public String explainFailure() throws Exception {
return "Replication has not been catched up yet";
}
});
for (int i = 0; i < 100; i++) {
assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
}
}
}

View File

@ -24,10 +24,10 @@ import static org.junit.Assert.assertThat;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiPredicate;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -84,7 +85,8 @@ public class TestSyncReplicationWALProvider {
} }
@Override @Override
public boolean isInState(RegionInfo info, SyncReplicationState state) { public boolean checkState(RegionInfo info,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
// TODO Implement SyncReplicationPeerInfoProvider.isInState // TODO Implement SyncReplicationPeerInfoProvider.isInState
return false; return false;
} }
@ -92,7 +94,7 @@ public class TestSyncReplicationWALProvider {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setBoolean(HConstants.SYNC_REPLICATION_ENABLED, true); UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
UTIL.startMiniDFSCluster(3); UTIL.startMiniDFSCluster(3);
FACTORY = new WALFactory(UTIL.getConfiguration(), "test"); FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider()); ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());