From 7e910a573f30a9995cb779fa55a6911629ac2e5f Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 23 Sep 2020 08:30:43 +0800 Subject: [PATCH] HBASE-25074 Refactor ReplicationSinkManager: reduce code and make it easy to understand (#2430) Signed-off-by: Wellington Chevreuil Signed-off-by: Duo Zhang --- .../replication/HBaseReplicationEndpoint.java | 219 +++++++++++++----- .../HBaseInterClusterReplicationEndpoint.java | 51 +--- .../regionserver/ReplicationSinkManager.java | 193 --------------- .../TestHBaseReplicationEndpoint.java | 210 +++++++++++++++++ .../TestReplicationSinkManager.java | 210 ----------------- .../TestSerialReplicationEndpoint.java | 10 +- 6 files changed, 384 insertions(+), 509 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 3cde0d5113a..850a7912556 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -22,8 +22,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.Abortable; @@ -38,6 +46,9 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; + /** * A {@link BaseReplicationEndpoint} for replication endpoints whose * target cluster is an HBase cluster. @@ -50,8 +61,58 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private ZKWatcher zkw = null; - private List regionServers = new ArrayList<>(0); - private long lastRegionServerUpdate; + protected Configuration conf; + + protected AsyncClusterConnection conn; + + /** + * Default maximum number of times a replication sink can be reported as bad before + * it will no longer be provided as a sink for replication without the pool of + * replication sinks being refreshed. + */ + public static final int DEFAULT_BAD_SINK_THRESHOLD = 3; + + /** + * Default ratio of the total number of peer cluster region servers to consider + * replicating to. + */ + public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; + + // Ratio of total number of potential peer region servers to be used + private float ratio; + + // Maximum number of times a sink can be reported as bad before the pool of + // replication sinks is refreshed + private int badSinkThreshold; + // Count of "bad replication sink" reports per peer sink + private Map badReportCounts; + + private List sinkServers = new ArrayList<>(0); + + /* + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different + * Connection implementations, or initialize it in a different way, so defining createConnection + * as protected for possible overridings. + */ + protected AsyncClusterConnection createConnection(Configuration conf) throws IOException { + return ClusterConnectionFactory.createAsyncClusterConnection(conf, + null, User.getCurrent()); + } + + @Override + public void init(Context context) throws IOException { + super.init(context); + this.conf = HBaseConfiguration.create(ctx.getConfiguration()); + // TODO: This connection is replication specific or we should make it particular to + // replication and make replication specific settings such as compression or codec to use + // passing Cells. + this.conn = createConnection(this.conf); + this.ratio = + ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); + this.badSinkThreshold = + ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); + this.badReportCounts = Maps.newHashMap(); + } protected synchronized void disconnect() { if (zkw != null) { @@ -63,7 +124,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * A private method used to re-establish a zookeeper session with a peer cluster. * @param ke */ - protected void reconnect(KeeperException ke) { + private void reconnect(KeeperException ke) { if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException || ke instanceof AuthFailedException) { String clusterKey = ctx.getPeerConfig().getClusterKey(); @@ -117,23 +178,17 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint return peerUUID; } - /** - * Get the ZK connection to this peer - * @return zk connection - */ - protected synchronized ZKWatcher getZkw() { - return zkw; - } - /** * Closes the current ZKW (if not null) and creates a new one * @throws IOException If anything goes wrong connecting */ - synchronized void reloadZkWatcher() throws IOException { - if (zkw != null) zkw.close(); + private synchronized void reloadZkWatcher() throws IOException { + if (zkw != null) { + zkw.close(); + } zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); - getZkw().registerListener(new PeerRegionServerListener(this)); + zkw.registerListener(new PeerRegionServerListener(this)); } @Override @@ -150,13 +205,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint /** * Get the list of all the region servers from the specified peer - * @param zkw zk connection to use + * * @return list of region server addresses or an empty list if the slave is unavailable */ - protected static List fetchSlavesAddresses(ZKWatcher zkw) - throws KeeperException { - List children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, - zkw.getZNodePaths().rsZNode); + protected List fetchSlavesAddresses() { + List children = null; + try { + children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode); + } catch (KeeperException ke) { + if (LOG.isDebugEnabled()) { + LOG.debug("Fetch slaves addresses failed", ke); + } + reconnect(ke); + } if (children == null) { return Collections.emptyList(); } @@ -167,43 +228,70 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint return addresses; } - /** - * Get a list of all the addresses of all the available region servers - * for this peer cluster, or an empty list if no region servers available at peer cluster. - * @return list of addresses - */ - // Synchronize peer cluster connection attempts to avoid races and rate - // limit connections when multiple replication sources try to connect to - // the peer cluster. If the peer cluster is down we can get out of control - // over time. - public synchronized List getRegionServers() { - try { - setRegionServers(fetchSlavesAddresses(this.getZkw())); - } catch (KeeperException ke) { - if (LOG.isDebugEnabled()) { - LOG.debug("Fetch slaves addresses failed", ke); - } - reconnect(ke); + protected synchronized void chooseSinks() { + List slaveAddresses = fetchSlavesAddresses(); + if (slaveAddresses.isEmpty()) { + LOG.warn("No sinks available at peer. Will not be able to replicate"); } - return regionServers; + Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); + int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); + this.sinkServers = slaveAddresses.subList(0, numSinks); + badReportCounts.clear(); + } + + protected synchronized int getNumSinks() { + return sinkServers.size(); } /** - * Set the list of region servers for that peer - * @param regionServers list of addresses for the region servers + * Get a randomly-chosen replication sink to replicate to. + * @return a replication sink to replicate to */ - public synchronized void setRegionServers(List regionServers) { - this.regionServers = regionServers; - lastRegionServerUpdate = System.currentTimeMillis(); + protected synchronized SinkPeer getReplicationSink() throws IOException { + if (sinkServers.isEmpty()) { + LOG.info("Current list of sinks is out of date or empty, updating"); + chooseSinks(); + } + if (sinkServers.isEmpty()) { + throw new IOException("No replication sinks are available"); + } + ServerName serverName = + sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); + return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); } /** - * Get the timestamp at which the last change occurred to the list of region servers to replicate - * to. - * @return The System.currentTimeMillis at the last time the list of peer region servers changed. + * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it + * failed). If a single SinkPeer is reported as bad more than + * replication.bad.sink.threshold times, it will be removed + * from the pool of potential replication targets. + * + * @param sinkPeer The SinkPeer that had a failed replication attempt on it */ - public long getLastRegionServerUpdate() { - return lastRegionServerUpdate; + protected synchronized void reportBadSink(SinkPeer sinkPeer) { + ServerName serverName = sinkPeer.getServerName(); + int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1); + if (badReportCount > badSinkThreshold) { + this.sinkServers.remove(serverName); + if (sinkServers.isEmpty()) { + chooseSinks(); + } + } + } + + /** + * Report that a {@code SinkPeer} successfully replicated a chunk of data. + * + * @param sinkPeer + * The SinkPeer that had a failed replication attempt on it + */ + protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) { + badReportCounts.remove(sinkPeer.getServerName()); + } + + @VisibleForTesting + List getSinkServers() { + return sinkServers; } /** @@ -214,22 +302,39 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private final HBaseReplicationEndpoint replicationEndpoint; private final String regionServerListNode; - public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) { - super(replicationPeer.getZkw()); - this.replicationEndpoint = replicationPeer; - this.regionServerListNode = replicationEndpoint.getZkw().getZNodePaths().rsZNode; + public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) { + super(endpoint.zkw); + this.replicationEndpoint = endpoint; + this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode; } @Override public synchronized void nodeChildrenChanged(String path) { if (path.equals(regionServerListNode)) { - try { - LOG.info("Detected change to peer region servers, fetching updated list"); - replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw())); - } catch (KeeperException e) { - LOG.error("Error reading slave addresses", e); - } + LOG.info("Detected change to peer region servers, fetching updated list"); + replicationEndpoint.chooseSinks(); } } } + + /** + * Wraps a replication region server sink to provide the ability to identify it. + */ + public static class SinkPeer { + private ServerName serverName; + private AsyncRegionServerAdmin regionServer; + + public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { + this.serverName = serverName; + this.regionServer = regionServer; + } + + ServerName getServerName() { + return serverName; + } + + public AsyncRegionServerAdmin getRegionServer() { + return regionServer; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 4e0669c2e9f..b6e1f69173f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -41,7 +41,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CellUtil; @@ -60,7 +59,6 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -100,8 +98,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY = "hbase.replication.drop.on.deleted.columnfamily"; - private AsyncClusterConnection conn; - private Configuration conf; // How long should we sleep for each retry private long sleepForRetries; // Maximum number of retries before taking bold actions @@ -114,8 +110,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int replicationRpcLimit; //Metrics for this source private MetricsSource metrics; - // Handles connecting to peer region servers - private ReplicationSinkManager replicationSinkMgr; private boolean peersSelected = false; private String replicationClusterId = ""; private ThreadPoolExecutor exec; @@ -130,25 +124,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi //Initialising as 0 to guarantee at least one logging message private long lastSinkFetchTime = 0; - /* - * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different - * Connection implementations, or initialize it in a different way, so defining createConnection - * as protected for possible overridings. - */ - protected AsyncClusterConnection createConnection(Configuration conf) throws IOException { - return ClusterConnectionFactory.createAsyncClusterConnection(conf, - null, User.getCurrent()); - } - - /* - * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different - * ReplicationSinkManager implementations, or initialize it in a different way, - * so defining createReplicationSinkManager as protected for possible overridings. - */ - protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) { - return new ReplicationSinkManager(conn, this, this.conf); - } - @Override public void init(Context context) throws IOException { super.init(context); @@ -171,8 +146,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); - // ReplicationQueueInfo parses the peerId out of the znode for us - this.replicationSinkMgr = createReplicationSinkManager(conn); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); @@ -211,14 +184,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } private void connectToPeers() { - getRegionServers(); - int sleepMultiplier = 1; - // Connect to peer cluster first, unless we have to stop - while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { - replicationSinkMgr.chooseSinks(); - if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { + while (this.isRunning() && getNumSinks() == 0) { + chooseSinks(); + if (this.isRunning() && getNumSinks() == 0) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) { sleepMultiplier++; } @@ -253,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } private List> createParallelBatches(final List entries) { - int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); + int numSinks = Math.max(getNumSinks(), 1); int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks); List> entryLists = Stream.generate(ArrayList::new).limit(n).collect(Collectors.toList()); @@ -513,7 +483,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi peersSelected = true; } - int numSinks = replicationSinkMgr.getNumSinks(); + int numSinks = getNumSinks(); if (numSinks == 0) { if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) { LOG.warn( @@ -561,7 +531,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } else { LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), ioe); - replicationSinkMgr.chooseSinks(); + chooseSinks(); } } else { if (ioe instanceof SocketTimeoutException) { @@ -574,7 +544,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe); - replicationSinkMgr.chooseSinks(); + chooseSinks(); } else { LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe); } @@ -629,7 +599,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); } - sinkPeer = replicationSinkMgr.getReplicationSink(); + sinkPeer = getReplicationSink(); AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); try { ReplicationProtobufUtil.replicateWALEntry(rsAdmin, @@ -644,10 +614,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } throw e; } - replicationSinkMgr.reportSinkSuccess(sinkPeer); + reportSinkSuccess(sinkPeer); } catch (IOException ioe) { if (sinkPeer != null) { - replicationSinkMgr.reportBadSink(sinkPeer); + reportBadSink(sinkPeer); } throw ioe; } @@ -683,5 +653,4 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private String logPeerId(){ return "[Source for peer " + this.ctx.getPeerId() + "]:"; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java deleted file mode 100644 index db12dc0a6fd..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.AsyncClusterConnection; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; -import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.common.collect.Maps; - -/** - * Maintains a collection of peers to replicate to, and randomly selects a - * single peer to replicate to per set of data to replicate. Also handles - * keeping track of peer availability. - */ -@InterfaceAudience.Private -public class ReplicationSinkManager { - - private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class); - - /** - * Default maximum number of times a replication sink can be reported as bad before - * it will no longer be provided as a sink for replication without the pool of - * replication sinks being refreshed. - */ - static final int DEFAULT_BAD_SINK_THRESHOLD = 3; - - /** - * Default ratio of the total number of peer cluster region servers to consider - * replicating to. - */ - static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; - - - private final AsyncClusterConnection conn; - - private final HBaseReplicationEndpoint endpoint; - - // Count of "bad replication sink" reports per peer sink - private final Map badReportCounts; - - // Ratio of total number of potential peer region servers to be used - private final float ratio; - - // Maximum number of times a sink can be reported as bad before the pool of - // replication sinks is refreshed - private final int badSinkThreshold; - - // A timestamp of the last time the list of replication peers changed - private long lastUpdateToPeers; - - // The current pool of sinks to which replication can be performed - private List sinks = Lists.newArrayList(); - - /** - * Instantiate for a single replication peer cluster. - * @param conn connection to the peer cluster - * @param endpoint replication endpoint for inter cluster replication - * @param conf HBase configuration, used for determining replication source ratio and bad peer - * threshold - */ - public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint, - Configuration conf) { - this.conn = conn; - this.endpoint = endpoint; - this.badReportCounts = Maps.newHashMap(); - this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); - this.badSinkThreshold = - conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); - } - - /** - * Get a randomly-chosen replication sink to replicate to. - * @return a replication sink to replicate to - */ - public synchronized SinkPeer getReplicationSink() throws IOException { - if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) { - LOG.info("Current list of sinks is out of date or empty, updating"); - chooseSinks(); - } - - if (sinks.isEmpty()) { - throw new IOException("No replication sinks are available"); - } - ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size())); - return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); - } - - /** - * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it - * failed). If a single SinkPeer is reported as bad more than - * replication.bad.sink.threshold times, it will be removed - * from the pool of potential replication targets. - * - * @param sinkPeer - * The SinkPeer that had a failed replication attempt on it - */ - public synchronized void reportBadSink(SinkPeer sinkPeer) { - ServerName serverName = sinkPeer.getServerName(); - int badReportCount = (badReportCounts.containsKey(serverName) - ? badReportCounts.get(serverName) : 0) + 1; - badReportCounts.put(serverName, badReportCount); - if (badReportCount > badSinkThreshold) { - this.sinks.remove(serverName); - if (sinks.isEmpty()) { - chooseSinks(); - } - } - } - - /** - * Report that a {@code SinkPeer} successfully replicated a chunk of data. - * - * @param sinkPeer - * The SinkPeer that had a failed replication attempt on it - */ - public synchronized void reportSinkSuccess(SinkPeer sinkPeer) { - badReportCounts.remove(sinkPeer.getServerName()); - } - - /** - * Refresh the list of sinks. - */ - public synchronized void chooseSinks() { - List slaveAddresses = endpoint.getRegionServers(); - if(slaveAddresses.isEmpty()){ - LOG.warn("No sinks available at peer. Will not be able to replicate"); - } - Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); - int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); - sinks = slaveAddresses.subList(0, numSinks); - lastUpdateToPeers = System.currentTimeMillis(); - badReportCounts.clear(); - } - - public synchronized int getNumSinks() { - return sinks.size(); - } - - @VisibleForTesting - protected List getSinksForTesting() { - return Collections.unmodifiableList(sinks); - } - - /** - * Wraps a replication region server sink to provide the ability to identify - * it. - */ - public static class SinkPeer { - private ServerName serverName; - private AsyncRegionServerAdmin regionServer; - - public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { - this.serverName = serverName; - this.regionServer = regionServer; - } - - ServerName getServerName() { - return serverName; - } - - public AsyncRegionServerAdmin getRegionServer() { - return regionServer; - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java new file mode 100644 index 00000000000..41601417a9d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({ReplicationTests.class, SmallTests.class}) +public class TestHBaseReplicationEndpoint { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestHBaseReplicationEndpoint.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private HBaseReplicationEndpoint endpoint; + + @Before + public void setUp() throws Exception { + try { + ReplicationEndpoint.Context context = + new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), + null, null, null, null, null, null, null); + endpoint = new DummyHBaseReplicationEndpoint(); + endpoint.init(context); + } catch (Exception e) { + LOG.info("Failed", e); + } + } + + @Test + public void testChooseSinks() { + List serverNames = Lists.newArrayList(); + int totalServers = 20; + for (int i = 0; i < totalServers; i++) { + serverNames.add(mock(ServerName.class)); + } + ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); + endpoint.chooseSinks(); + int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); + assertEquals(expected, endpoint.getNumSinks()); + } + + @Test + public void testChooseSinksLessThanRatioAvailable() { + List serverNames = Lists.newArrayList(mock(ServerName.class), + mock(ServerName.class)); + ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); + endpoint.chooseSinks(); + assertEquals(1, endpoint.getNumSinks()); + } + + @Test + public void testReportBadSink() { + ServerName serverNameA = mock(ServerName.class); + ServerName serverNameB = mock(ServerName.class); + ((DummyHBaseReplicationEndpoint) endpoint) + .setRegionServers(Lists.newArrayList(serverNameA, serverNameB)); + endpoint.chooseSinks(); + // Sanity check + assertEquals(1, endpoint.getNumSinks()); + + SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); + endpoint.reportBadSink(sinkPeer); + // Just reporting a bad sink once shouldn't have an effect + assertEquals(1, endpoint.getNumSinks()); + } + + /** + * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not + * be replicated to anymore. + */ + @Test + public void testReportBadSinkPastThreshold() { + List serverNames = Lists.newArrayList(); + int totalServers = 30; + for (int i = 0; i < totalServers; i++) { + serverNames.add(mock(ServerName.class)); + } + ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); + endpoint.chooseSinks(); + // Sanity check + int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); + assertEquals(expected, endpoint.getNumSinks()); + + ServerName badSinkServer0 = endpoint.getSinkServers().get(0); + SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class)); + for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { + endpoint.reportBadSink(sinkPeer); + } + // Reporting a bad sink more than the threshold count should remove it + // from the list of potential sinks + assertEquals(expected - 1, endpoint.getNumSinks()); + + // now try a sink that has some successes + ServerName badSinkServer1 = endpoint.getSinkServers().get(0); + sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class)); + for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { + endpoint.reportBadSink(sinkPeer); + } + endpoint.reportSinkSuccess(sinkPeer); // one success + endpoint.reportBadSink(sinkPeer); + // did not remove the sink, since we had one successful try + assertEquals(expected - 1, endpoint.getNumSinks()); + + for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) { + endpoint.reportBadSink(sinkPeer); + } + // still not remove, since the success reset the counter + assertEquals(expected - 1, endpoint.getNumSinks()); + endpoint.reportBadSink(sinkPeer); + // but we exhausted the tries + assertEquals(expected - 2, endpoint.getNumSinks()); + } + + @Test + public void testReportBadSinkDownToZeroSinks() { + List serverNames = Lists.newArrayList(); + int totalServers = 4; + for (int i = 0; i < totalServers; i++) { + serverNames.add(mock(ServerName.class)); + } + ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); + endpoint.chooseSinks(); + // Sanity check + int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); + assertEquals(expected, endpoint.getNumSinks()); + + ServerName serverNameA = endpoint.getSinkServers().get(0); + ServerName serverNameB = endpoint.getSinkServers().get(1); + + SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); + + for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { + endpoint.reportBadSink(sinkPeerA); + endpoint.reportBadSink(sinkPeerB); + } + + // We've gone down to 0 good sinks, so the replication sinks + // should have been refreshed now, so out of 4 servers, 2 are not considered as they are + // reported as bad. + expected = + (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); + assertEquals(expected, endpoint.getNumSinks()); + } + + private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint { + + List regionServers; + + public void setRegionServers(List regionServers) { + this.regionServers = regionServers; + } + + @Override + public List fetchSlavesAddresses() { + return regionServers; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + return false; + } + + @Override + public AsyncClusterConnection createConnection(Configuration conf) throws IOException { + return null; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java deleted file mode 100644 index f8a2ab91760..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; - -import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.AsyncClusterConnection; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; -import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - -@Category({ReplicationTests.class, SmallTests.class}) -public class TestReplicationSinkManager { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationSinkManager.class); - - private ReplicationSinkManager sinkManager; - private HBaseReplicationEndpoint replicationEndpoint; - - /** - * Manage the 'getRegionServers' for the tests below. Override the base class handling - * of Regionservers. We used to use a mock for this but updated guava/errorprone disallows - * mocking of classes that implement Service. - */ - private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint { - List regionServers; - - @Override - public boolean replicate(ReplicateContext replicateContext) { - return false; - } - - @Override - public synchronized void setRegionServers(List regionServers) { - this.regionServers = regionServers; - } - - @Override - public List getRegionServers() { - return this.regionServers; - } - } - - @Before - public void setUp() { - this.replicationEndpoint = new SetServersHBaseReplicationEndpoint(); - this.sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class), - replicationEndpoint, new Configuration()); - } - - @Test - public void testChooseSinks() { - List serverNames = Lists.newArrayList(); - int totalServers = 20; - for (int i = 0; i < totalServers; i++) { - serverNames.add(mock(ServerName.class)); - } - replicationEndpoint.setRegionServers(serverNames); - sinkManager.chooseSinks(); - int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); - assertEquals(expected, sinkManager.getNumSinks()); - - } - - @Test - public void testChooseSinks_LessThanRatioAvailable() { - List serverNames = Lists.newArrayList(mock(ServerName.class), - mock(ServerName.class)); - replicationEndpoint.setRegionServers(serverNames); - sinkManager.chooseSinks(); - assertEquals(1, sinkManager.getNumSinks()); - } - - @Test - public void testReportBadSink() { - ServerName serverNameA = mock(ServerName.class); - ServerName serverNameB = mock(ServerName.class); - replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB)); - sinkManager.chooseSinks(); - // Sanity check - assertEquals(1, sinkManager.getNumSinks()); - - SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); - - sinkManager.reportBadSink(sinkPeer); - - // Just reporting a bad sink once shouldn't have an effect - assertEquals(1, sinkManager.getNumSinks()); - - } - - /** - * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not - * be replicated to anymore. - */ - @Test - public void testReportBadSink_PastThreshold() { - List serverNames = Lists.newArrayList(); - int totalServers = 30; - for (int i = 0; i < totalServers; i++) { - serverNames.add(mock(ServerName.class)); - } - replicationEndpoint.setRegionServers(serverNames); - sinkManager.chooseSinks(); - // Sanity check - int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); - assertEquals(expected, sinkManager.getNumSinks()); - - ServerName serverName = sinkManager.getSinksForTesting().get(0); - - SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class)); - - sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative - for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { - sinkManager.reportBadSink(sinkPeer); - } - - // Reporting a bad sink more than the threshold count should remove it - // from the list of potential sinks - assertEquals(expected - 1, sinkManager.getNumSinks()); - - // - // now try a sink that has some successes - // - serverName = sinkManager.getSinksForTesting().get(0); - - sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class)); - for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { - sinkManager.reportBadSink(sinkPeer); - } - sinkManager.reportSinkSuccess(sinkPeer); // one success - sinkManager.reportBadSink(sinkPeer); - - // did not remove the sink, since we had one successful try - assertEquals(expected - 1, sinkManager.getNumSinks()); - - for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) { - sinkManager.reportBadSink(sinkPeer); - } - // still not remove, since the success reset the counter - assertEquals(expected - 1, sinkManager.getNumSinks()); - - sinkManager.reportBadSink(sinkPeer); - // but we exhausted the tries - assertEquals(expected - 2, sinkManager.getNumSinks()); - } - - @Test - public void testReportBadSink_DownToZeroSinks() { - List serverNames = Lists.newArrayList(); - int totalServers = 4; - for (int i = 0; i < totalServers; i++) { - serverNames.add(mock(ServerName.class)); - } - replicationEndpoint.setRegionServers(serverNames); - sinkManager.chooseSinks(); - // Sanity check - List sinkList = sinkManager.getSinksForTesting(); - int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); - assertEquals(expected, sinkList.size()); - - ServerName serverNameA = sinkList.get(0); - ServerName serverNameB = sinkList.get(1); - - SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); - SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); - - for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { - sinkManager.reportBadSink(sinkPeerA); - sinkManager.reportBadSink(sinkPeerB); - } - - // We've gone down to 0 good sinks, so the replication sinks - // should have been refreshed now, so out of 4 servers, 2 are not considered as they are - // reported as bad. - expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); - assertEquals(expected, sinkManager.getNumSinks()); - } - -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java index 3c88ab31591..090129174cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; @@ -175,14 +174,9 @@ public class TestSerialReplicationEndpoint { } @Override - public synchronized List getRegionServers() { + public synchronized int getNumSinks() { // Return multiple server names for endpoint parallel replication. - return new ArrayList<>( - ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L), - ServerName.valueOf("www.example2.com", 12016, 1525245876026L), - ServerName.valueOf("www.example3.com", 12016, 1525245876026L), - ServerName.valueOf("www.example4.com", 12016, 1525245876026L), - ServerName.valueOf("www.example4.com", 12016, 1525245876026L))); + return 10; } } }