HBASE-25074 Refactor ReplicationSinkManager: reduce code and make it easy to understand (#2430)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2020-09-23 08:30:43 +08:00 committed by GitHub
parent 2c5055f81a
commit 7e910a573f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 384 additions and 509 deletions

View File

@ -22,8 +22,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
@ -38,6 +46,9 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
* target cluster is an HBase cluster.
@ -50,8 +61,58 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private ZKWatcher zkw = null;
private List<ServerName> regionServers = new ArrayList<>(0);
private long lastRegionServerUpdate;
protected Configuration conf;
protected AsyncClusterConnection conn;
/**
* Default maximum number of times a replication sink can be reported as bad before
* it will no longer be provided as a sink for replication without the pool of
* replication sinks being refreshed.
*/
public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
/**
* Default ratio of the total number of peer cluster region servers to consider
* replicating to.
*/
public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
// Ratio of total number of potential peer region servers to be used
private float ratio;
// Maximum number of times a sink can be reported as bad before the pool of
// replication sinks is refreshed
private int badSinkThreshold;
// Count of "bad replication sink" reports per peer sink
private Map<ServerName, Integer> badReportCounts;
private List<ServerName> sinkServers = new ArrayList<>(0);
/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
* as protected for possible overridings.
*/
protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
return ClusterConnectionFactory.createAsyncClusterConnection(conf,
null, User.getCurrent());
}
@Override
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = createConnection(this.conf);
this.ratio =
ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.badSinkThreshold =
ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
this.badReportCounts = Maps.newHashMap();
}
protected synchronized void disconnect() {
if (zkw != null) {
@ -63,7 +124,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* A private method used to re-establish a zookeeper session with a peer cluster.
* @param ke
*/
protected void reconnect(KeeperException ke) {
private void reconnect(KeeperException ke) {
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException) {
String clusterKey = ctx.getPeerConfig().getClusterKey();
@ -117,23 +178,17 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
return peerUUID;
}
/**
* Get the ZK connection to this peer
* @return zk connection
*/
protected synchronized ZKWatcher getZkw() {
return zkw;
}
/**
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
*/
synchronized void reloadZkWatcher() throws IOException {
if (zkw != null) zkw.close();
private synchronized void reloadZkWatcher() throws IOException {
if (zkw != null) {
zkw.close();
}
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
getZkw().registerListener(new PeerRegionServerListener(this));
zkw.registerListener(new PeerRegionServerListener(this));
}
@Override
@ -150,13 +205,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
*
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
throws KeeperException {
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
zkw.getZNodePaths().rsZNode);
protected List<ServerName> fetchSlavesAddresses() {
List<String> children = null;
try {
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
}
reconnect(ke);
}
if (children == null) {
return Collections.emptyList();
}
@ -167,43 +228,70 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
return addresses;
}
/**
* Get a list of all the addresses of all the available region servers
* for this peer cluster, or an empty list if no region servers available at peer cluster.
* @return list of addresses
*/
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
public synchronized List<ServerName> getRegionServers() {
try {
setRegionServers(fetchSlavesAddresses(this.getZkw()));
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
}
reconnect(ke);
protected synchronized void chooseSinks() {
List<ServerName> slaveAddresses = fetchSlavesAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
return regionServers;
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
this.sinkServers = slaveAddresses.subList(0, numSinks);
badReportCounts.clear();
}
protected synchronized int getNumSinks() {
return sinkServers.size();
}
/**
* Set the list of region servers for that peer
* @param regionServers list of addresses for the region servers
* Get a randomly-chosen replication sink to replicate to.
* @return a replication sink to replicate to
*/
public synchronized void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers;
lastRegionServerUpdate = System.currentTimeMillis();
protected synchronized SinkPeer getReplicationSink() throws IOException {
if (sinkServers.isEmpty()) {
LOG.info("Current list of sinks is out of date or empty, updating");
chooseSinks();
}
if (sinkServers.isEmpty()) {
throw new IOException("No replication sinks are available");
}
ServerName serverName =
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
/**
* Get the timestamp at which the last change occurred to the list of region servers to replicate
* to.
* @return The System.currentTimeMillis at the last time the list of peer region servers changed.
* Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
* failed). If a single SinkPeer is reported as bad more than
* replication.bad.sink.threshold times, it will be removed
* from the pool of potential replication targets.
*
* @param sinkPeer The SinkPeer that had a failed replication attempt on it
*/
public long getLastRegionServerUpdate() {
return lastRegionServerUpdate;
protected synchronized void reportBadSink(SinkPeer sinkPeer) {
ServerName serverName = sinkPeer.getServerName();
int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1);
if (badReportCount > badSinkThreshold) {
this.sinkServers.remove(serverName);
if (sinkServers.isEmpty()) {
chooseSinks();
}
}
}
/**
* Report that a {@code SinkPeer} successfully replicated a chunk of data.
*
* @param sinkPeer
* The SinkPeer that had a failed replication attempt on it
*/
protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
badReportCounts.remove(sinkPeer.getServerName());
}
@VisibleForTesting
List<ServerName> getSinkServers() {
return sinkServers;
}
/**
@ -214,22 +302,39 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private final HBaseReplicationEndpoint replicationEndpoint;
private final String regionServerListNode;
public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
super(replicationPeer.getZkw());
this.replicationEndpoint = replicationPeer;
this.regionServerListNode = replicationEndpoint.getZkw().getZNodePaths().rsZNode;
public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
super(endpoint.zkw);
this.replicationEndpoint = endpoint;
this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
}
@Override
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
try {
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
} catch (KeeperException e) {
LOG.error("Error reading slave addresses", e);
}
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.chooseSinks();
}
}
}
/**
* Wraps a replication region server sink to provide the ability to identify it.
*/
public static class SinkPeer {
private ServerName serverName;
private AsyncRegionServerAdmin regionServer;
public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
this.serverName = serverName;
this.regionServer = regionServer;
}
ServerName getServerName() {
return serverName;
}
public AsyncRegionServerAdmin getRegionServer() {
return regionServer;
}
}
}

View File

@ -41,7 +41,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CellUtil;
@ -60,7 +59,6 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -100,8 +98,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
"hbase.replication.drop.on.deleted.columnfamily";
private AsyncClusterConnection conn;
private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
// Maximum number of retries before taking bold actions
@ -114,8 +110,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private int replicationRpcLimit;
//Metrics for this source
private MetricsSource metrics;
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
private String replicationClusterId = "";
private ThreadPoolExecutor exec;
@ -130,25 +124,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
//Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
* as protected for possible overridings.
*/
protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
return ClusterConnectionFactory.createAsyncClusterConnection(conf,
null, User.getCurrent());
}
/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* ReplicationSinkManager implementations, or initialize it in a different way,
* so defining createReplicationSinkManager as protected for possible overridings.
*/
protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) {
return new ReplicationSinkManager(conn, this, this.conf);
}
@Override
public void init(Context context) throws IOException {
super.init(context);
@ -171,8 +146,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
@ -211,14 +184,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
private void connectToPeers() {
getRegionServers();
int sleepMultiplier = 1;
// Connect to peer cluster first, unless we have to stop
while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
replicationSinkMgr.chooseSinks();
if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
while (this.isRunning() && getNumSinks() == 0) {
chooseSinks();
if (this.isRunning() && getNumSinks() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++;
}
@ -253,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
int numSinks = Math.max(getNumSinks(), 1);
int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
List<List<Entry>> entryLists =
Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
@ -513,7 +483,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
peersSelected = true;
}
int numSinks = replicationSinkMgr.getNumSinks();
int numSinks = getNumSinks();
if (numSinks == 0) {
if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) {
LOG.warn(
@ -561,7 +531,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} else {
LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
ioe);
replicationSinkMgr.chooseSinks();
chooseSinks();
}
} else {
if (ioe instanceof SocketTimeoutException) {
@ -574,7 +544,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.socketTimeoutMultiplier);
} else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
replicationSinkMgr.chooseSinks();
chooseSinks();
} else {
LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
}
@ -629,7 +599,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
}
sinkPeer = replicationSinkMgr.getReplicationSink();
sinkPeer = getReplicationSink();
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
try {
ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
@ -644,10 +614,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
throw e;
}
replicationSinkMgr.reportSinkSuccess(sinkPeer);
reportSinkSuccess(sinkPeer);
} catch (IOException ioe) {
if (sinkPeer != null) {
replicationSinkMgr.reportBadSink(sinkPeer);
reportBadSink(sinkPeer);
}
throw ioe;
}
@ -683,5 +653,4 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private String logPeerId(){
return "[Source for peer " + this.ctx.getPeerId() + "]:";
}
}

View File

@ -1,193 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
/**
* Maintains a collection of peers to replicate to, and randomly selects a
* single peer to replicate to per set of data to replicate. Also handles
* keeping track of peer availability.
*/
@InterfaceAudience.Private
public class ReplicationSinkManager {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class);
/**
* Default maximum number of times a replication sink can be reported as bad before
* it will no longer be provided as a sink for replication without the pool of
* replication sinks being refreshed.
*/
static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
/**
* Default ratio of the total number of peer cluster region servers to consider
* replicating to.
*/
static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
private final AsyncClusterConnection conn;
private final HBaseReplicationEndpoint endpoint;
// Count of "bad replication sink" reports per peer sink
private final Map<ServerName, Integer> badReportCounts;
// Ratio of total number of potential peer region servers to be used
private final float ratio;
// Maximum number of times a sink can be reported as bad before the pool of
// replication sinks is refreshed
private final int badSinkThreshold;
// A timestamp of the last time the list of replication peers changed
private long lastUpdateToPeers;
// The current pool of sinks to which replication can be performed
private List<ServerName> sinks = Lists.newArrayList();
/**
* Instantiate for a single replication peer cluster.
* @param conn connection to the peer cluster
* @param endpoint replication endpoint for inter cluster replication
* @param conf HBase configuration, used for determining replication source ratio and bad peer
* threshold
*/
public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
Configuration conf) {
this.conn = conn;
this.endpoint = endpoint;
this.badReportCounts = Maps.newHashMap();
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.badSinkThreshold =
conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
}
/**
* Get a randomly-chosen replication sink to replicate to.
* @return a replication sink to replicate to
*/
public synchronized SinkPeer getReplicationSink() throws IOException {
if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
LOG.info("Current list of sinks is out of date or empty, updating");
chooseSinks();
}
if (sinks.isEmpty()) {
throw new IOException("No replication sinks are available");
}
ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
/**
* Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
* failed). If a single SinkPeer is reported as bad more than
* replication.bad.sink.threshold times, it will be removed
* from the pool of potential replication targets.
*
* @param sinkPeer
* The SinkPeer that had a failed replication attempt on it
*/
public synchronized void reportBadSink(SinkPeer sinkPeer) {
ServerName serverName = sinkPeer.getServerName();
int badReportCount = (badReportCounts.containsKey(serverName)
? badReportCounts.get(serverName) : 0) + 1;
badReportCounts.put(serverName, badReportCount);
if (badReportCount > badSinkThreshold) {
this.sinks.remove(serverName);
if (sinks.isEmpty()) {
chooseSinks();
}
}
}
/**
* Report that a {@code SinkPeer} successfully replicated a chunk of data.
*
* @param sinkPeer
* The SinkPeer that had a failed replication attempt on it
*/
public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
badReportCounts.remove(sinkPeer.getServerName());
}
/**
* Refresh the list of sinks.
*/
public synchronized void chooseSinks() {
List<ServerName> slaveAddresses = endpoint.getRegionServers();
if(slaveAddresses.isEmpty()){
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
lastUpdateToPeers = System.currentTimeMillis();
badReportCounts.clear();
}
public synchronized int getNumSinks() {
return sinks.size();
}
@VisibleForTesting
protected List<ServerName> getSinksForTesting() {
return Collections.unmodifiableList(sinks);
}
/**
* Wraps a replication region server sink to provide the ability to identify
* it.
*/
public static class SinkPeer {
private ServerName serverName;
private AsyncRegionServerAdmin regionServer;
public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
this.serverName = serverName;
this.regionServer = regionServer;
}
ServerName getServerName() {
return serverName;
}
public AsyncRegionServerAdmin getRegionServer() {
return regionServer;
}
}
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({ReplicationTests.class, SmallTests.class})
public class TestHBaseReplicationEndpoint {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class);
private static final Logger LOG = LoggerFactory.getLogger(TestHBaseReplicationEndpoint.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private HBaseReplicationEndpoint endpoint;
@Before
public void setUp() throws Exception {
try {
ReplicationEndpoint.Context context =
new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(),
null, null, null, null, null, null, null);
endpoint = new DummyHBaseReplicationEndpoint();
endpoint.init(context);
} catch (Exception e) {
LOG.info("Failed", e);
}
}
@Test
public void testChooseSinks() {
List<ServerName> serverNames = Lists.newArrayList();
int totalServers = 20;
for (int i = 0; i < totalServers; i++) {
serverNames.add(mock(ServerName.class));
}
((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
endpoint.chooseSinks();
int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, endpoint.getNumSinks());
}
@Test
public void testChooseSinksLessThanRatioAvailable() {
List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
mock(ServerName.class));
((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
endpoint.chooseSinks();
assertEquals(1, endpoint.getNumSinks());
}
@Test
public void testReportBadSink() {
ServerName serverNameA = mock(ServerName.class);
ServerName serverNameB = mock(ServerName.class);
((DummyHBaseReplicationEndpoint) endpoint)
.setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
endpoint.chooseSinks();
// Sanity check
assertEquals(1, endpoint.getNumSinks());
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
endpoint.reportBadSink(sinkPeer);
// Just reporting a bad sink once shouldn't have an effect
assertEquals(1, endpoint.getNumSinks());
}
/**
* Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
* be replicated to anymore.
*/
@Test
public void testReportBadSinkPastThreshold() {
List<ServerName> serverNames = Lists.newArrayList();
int totalServers = 30;
for (int i = 0; i < totalServers; i++) {
serverNames.add(mock(ServerName.class));
}
((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
endpoint.chooseSinks();
// Sanity check
int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, endpoint.getNumSinks());
ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
endpoint.reportBadSink(sinkPeer);
}
// Reporting a bad sink more than the threshold count should remove it
// from the list of potential sinks
assertEquals(expected - 1, endpoint.getNumSinks());
// now try a sink that has some successes
ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
endpoint.reportBadSink(sinkPeer);
}
endpoint.reportSinkSuccess(sinkPeer); // one success
endpoint.reportBadSink(sinkPeer);
// did not remove the sink, since we had one successful try
assertEquals(expected - 1, endpoint.getNumSinks());
for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) {
endpoint.reportBadSink(sinkPeer);
}
// still not remove, since the success reset the counter
assertEquals(expected - 1, endpoint.getNumSinks());
endpoint.reportBadSink(sinkPeer);
// but we exhausted the tries
assertEquals(expected - 2, endpoint.getNumSinks());
}
@Test
public void testReportBadSinkDownToZeroSinks() {
List<ServerName> serverNames = Lists.newArrayList();
int totalServers = 4;
for (int i = 0; i < totalServers; i++) {
serverNames.add(mock(ServerName.class));
}
((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
endpoint.chooseSinks();
// Sanity check
int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, endpoint.getNumSinks());
ServerName serverNameA = endpoint.getSinkServers().get(0);
ServerName serverNameB = endpoint.getSinkServers().get(1);
SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
endpoint.reportBadSink(sinkPeerA);
endpoint.reportBadSink(sinkPeerB);
}
// We've gone down to 0 good sinks, so the replication sinks
// should have been refreshed now, so out of 4 servers, 2 are not considered as they are
// reported as bad.
expected =
(int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, endpoint.getNumSinks());
}
private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
List<ServerName> regionServers;
public void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers;
}
@Override
public List<ServerName> fetchSlavesAddresses() {
return regionServers;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
return false;
}
@Override
public AsyncClusterConnection createConnection(Configuration conf) throws IOException {
return null;
}
}
}

View File

@ -1,210 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({ReplicationTests.class, SmallTests.class})
public class TestReplicationSinkManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
private ReplicationSinkManager sinkManager;
private HBaseReplicationEndpoint replicationEndpoint;
/**
* Manage the 'getRegionServers' for the tests below. Override the base class handling
* of Regionservers. We used to use a mock for this but updated guava/errorprone disallows
* mocking of classes that implement Service.
*/
private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
List<ServerName> regionServers;
@Override
public boolean replicate(ReplicateContext replicateContext) {
return false;
}
@Override
public synchronized void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers;
}
@Override
public List<ServerName> getRegionServers() {
return this.regionServers;
}
}
@Before
public void setUp() {
this.replicationEndpoint = new SetServersHBaseReplicationEndpoint();
this.sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
replicationEndpoint, new Configuration());
}
@Test
public void testChooseSinks() {
List<ServerName> serverNames = Lists.newArrayList();
int totalServers = 20;
for (int i = 0; i < totalServers; i++) {
serverNames.add(mock(ServerName.class));
}
replicationEndpoint.setRegionServers(serverNames);
sinkManager.chooseSinks();
int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, sinkManager.getNumSinks());
}
@Test
public void testChooseSinks_LessThanRatioAvailable() {
List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
mock(ServerName.class));
replicationEndpoint.setRegionServers(serverNames);
sinkManager.chooseSinks();
assertEquals(1, sinkManager.getNumSinks());
}
@Test
public void testReportBadSink() {
ServerName serverNameA = mock(ServerName.class);
ServerName serverNameB = mock(ServerName.class);
replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
sinkManager.chooseSinks();
// Sanity check
assertEquals(1, sinkManager.getNumSinks());
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
sinkManager.reportBadSink(sinkPeer);
// Just reporting a bad sink once shouldn't have an effect
assertEquals(1, sinkManager.getNumSinks());
}
/**
* Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
* be replicated to anymore.
*/
@Test
public void testReportBadSink_PastThreshold() {
List<ServerName> serverNames = Lists.newArrayList();
int totalServers = 30;
for (int i = 0; i < totalServers; i++) {
serverNames.add(mock(ServerName.class));
}
replicationEndpoint.setRegionServers(serverNames);
sinkManager.chooseSinks();
// Sanity check
int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, sinkManager.getNumSinks());
ServerName serverName = sinkManager.getSinksForTesting().get(0);
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
sinkManager.reportBadSink(sinkPeer);
}
// Reporting a bad sink more than the threshold count should remove it
// from the list of potential sinks
assertEquals(expected - 1, sinkManager.getNumSinks());
//
// now try a sink that has some successes
//
serverName = sinkManager.getSinksForTesting().get(0);
sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
sinkManager.reportBadSink(sinkPeer);
}
sinkManager.reportSinkSuccess(sinkPeer); // one success
sinkManager.reportBadSink(sinkPeer);
// did not remove the sink, since we had one successful try
assertEquals(expected - 1, sinkManager.getNumSinks());
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
sinkManager.reportBadSink(sinkPeer);
}
// still not remove, since the success reset the counter
assertEquals(expected - 1, sinkManager.getNumSinks());
sinkManager.reportBadSink(sinkPeer);
// but we exhausted the tries
assertEquals(expected - 2, sinkManager.getNumSinks());
}
@Test
public void testReportBadSink_DownToZeroSinks() {
List<ServerName> serverNames = Lists.newArrayList();
int totalServers = 4;
for (int i = 0; i < totalServers; i++) {
serverNames.add(mock(ServerName.class));
}
replicationEndpoint.setRegionServers(serverNames);
sinkManager.chooseSinks();
// Sanity check
List<ServerName> sinkList = sinkManager.getSinksForTesting();
int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, sinkList.size());
ServerName serverNameA = sinkList.get(0);
ServerName serverNameB = sinkList.get(1);
SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
sinkManager.reportBadSink(sinkPeerA);
sinkManager.reportBadSink(sinkPeerB);
}
// We've gone down to 0 good sinks, so the replication sinks
// should have been refreshed now, so out of 4 servers, 2 are not considered as they are
// reported as bad.
expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
assertEquals(expected, sinkManager.getNumSinks());
}
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
@ -175,14 +174,9 @@ public class TestSerialReplicationEndpoint {
}
@Override
public synchronized List<ServerName> getRegionServers() {
public synchronized int getNumSinks() {
// Return multiple server names for endpoint parallel replication.
return new ArrayList<>(
ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L),
ServerName.valueOf("www.example2.com", 12016, 1525245876026L),
ServerName.valueOf("www.example3.com", 12016, 1525245876026L),
ServerName.valueOf("www.example4.com", 12016, 1525245876026L),
ServerName.valueOf("www.example4.com", 12016, 1525245876026L)));
return 10;
}
}
}