diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 4f4b870655a..6d6355c79e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -18,6 +18,13 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -35,12 +42,6 @@ import org.apache.zookeeper.KeeperException.NodeExistsException; import com.google.protobuf.InvalidProtocolBufferException; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This class acts as a wrapper for all the objects used to identify and * communicate with remote peers and is responsible for answering to expired @@ -57,9 +58,11 @@ public class ReplicationPeer implements Abortable, Closeable { // Cannot be final since a new object needs to be recreated when session fails private ZooKeeperWatcher zkw; private final Configuration conf; + private long lastRegionserverUpdate; private PeerStateTracker peerStateTracker; + /** * Constructor that takes all the objects required to communicate with the * specified peer, except for the region server addresses. @@ -130,6 +133,7 @@ public class ReplicationPeer implements Abortable, Closeable { */ public void setRegionServers(List regionServers) { this.regionServers = regionServers; + lastRegionserverUpdate = System.currentTimeMillis(); } /** @@ -140,6 +144,15 @@ public class ReplicationPeer implements Abortable, Closeable { return zkw; } + /** + * Get the timestamp at which the last change occurred to the list of region servers to replicate + * to. + * @return The System.currentTimeMillis at the last time the list of peer region servers changed. + */ + public long getLastRegionserverUpdate() { + return lastRegionserverUpdate; + } + /** * Get the identifier of this peer * @return string representation of the id (short) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index bfd09910409..21f9ee49d0e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -122,6 +122,14 @@ public interface ReplicationPeers { */ List getRegionServersOfConnectedPeer(String peerId); + /** + * Get the timestamp of the last change in composition of a given peer cluster. + * @param peerId identifier of the peer cluster for which the timestamp is requested + * @return the timestamp (in milliseconds) of the last change to the composition of + * the peer cluster + */ + long getTimestampOfLastChangeToPeer(String peerId); + /** * Returns the UUID of the provided peer id. * @param peerId the peer's ID that will be converted into a UUID diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 7dee2a41d8e..f0d6f143cc3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -266,6 +267,14 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return ids; } + @Override + public long getTimestampOfLastChangeToPeer(String peerId) { + if (!peerClusters.containsKey(peerId)) { + throw new IllegalArgumentException("Unknown peer id: " + peerId); + } + return peerClusters.get(peerId).getLastRegionserverUpdate(); + } + /** * A private method used during initialization. This method attempts to connect to all registered * peer clusters. This method does not set a watch on the peer cluster znodes. @@ -291,6 +300,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke); try { peer.reloadZkWatcher(); + peer.getZkw().registerListener(new PeerRegionServerListener(peer)); } catch (IOException io) { LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io); } @@ -304,7 +314,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re */ private static List fetchSlavesAddresses(ZooKeeperWatcher zkw) throws KeeperException { - List children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode); + List children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode); if (children == null) { return Collections.emptyList(); } @@ -315,6 +325,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return addresses; } + private String getPeerStateNode(String id) { return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); } @@ -366,6 +377,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ReplicationPeer peer = new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf)); peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId)); + peer.getZkw().registerListener(new PeerRegionServerListener(peer)); return peer; } @@ -406,4 +418,37 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re .toByteArray(); return ProtobufUtil.prependPBMagic(bytes); } + + /** + * Tracks changes to the list of region servers in a peer's cluster. + */ + public static class PeerRegionServerListener extends ZooKeeperListener { + + private ReplicationPeer peer; + private String regionServerListNode; + + public PeerRegionServerListener(ReplicationPeer replicationPeer) { + super(replicationPeer.getZkw()); + this.peer = replicationPeer; + this.regionServerListNode = peer.getZkw().rsZNode; + } + + public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) { + super(zkw); + this.regionServerListNode = regionServerListNode; + } + + @Override + public synchronized void nodeChildrenChanged(String path) { + if (path.equals(regionServerListNode)) { + try { + LOG.info("Detected change to peer regionservers, fetching updated list"); + peer.setRegionServers(fetchSlavesAddresses(peer.getZkw())); + } catch (KeeperException e) { + LOG.fatal("Error reading slave addresses", e); + } + } + } + + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java new file mode 100644 index 00000000000..839db9b1571 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.replication.ReplicationPeers; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Maintains a collection of peers to replicate to, and randomly selects a + * single peer to replicate to per set of data to replicate. Also handles + * keeping track of peer availability. + */ +public class ReplicationSinkManager { + + private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class); + + /** + * Default maximum number of times a replication sink can be reported as bad before + * it will no longer be provided as a sink for replication without the pool of + * replication sinks being refreshed. + */ + static final int DEFAULT_BAD_SINK_THRESHOLD = 3; + + /** + * Default ratio of the total number of peer cluster region servers to consider + * replicating to. + */ + static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f; + + + private final HConnection conn; + + private final String peerClusterId; + + private final ReplicationPeers replicationPeers; + + // Count of "bad replication sink" reports per peer sink + private final Map badReportCounts; + + // Ratio of total number of potential peer region servers to be used + private final float ratio; + + // Maximum number of times a sink can be reported as bad before the pool of + // replication sinks is refreshed + private final int badSinkThreshold; + + private final Random random; + + // A timestamp of the last time the list of replication peers changed + private long lastUpdateToPeers; + + // The current pool of sinks to which replication can be performed + private List sinks = Lists.newArrayList(); + + /** + * Instantiate for a single replication peer cluster. + * @param conn connection to the peer cluster + * @param peerClusterId identifier of the peer cluster + * @param replicationPeers manages peer clusters being replicated to + * @param conf HBase configuration, used for determining replication source ratio and bad peer + * threshold + */ + public ReplicationSinkManager(HConnection conn, String peerClusterId, + ReplicationPeers replicationPeers, Configuration conf) { + this.conn = conn; + this.peerClusterId = peerClusterId; + this.replicationPeers = replicationPeers; + this.badReportCounts = Maps.newHashMap(); + this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); + this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold", + DEFAULT_BAD_SINK_THRESHOLD); + this.random = new Random(); + } + + /** + * Get a randomly-chosen replication sink to replicate to. + * + * @return a replication sink to replicate to + */ + public SinkPeer getReplicationSink() throws IOException { + if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId) + > this.lastUpdateToPeers) { + LOG.info("Current list of sinks is out of date, updating"); + chooseSinks(); + } + + if (sinks.isEmpty()) { + throw new IOException("No replication sinks are available"); + } + ServerName serverName = sinks.get(random.nextInt(sinks.size())); + return new SinkPeer(serverName, conn.getAdmin(serverName)); + } + + /** + * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it + * failed). If a single SinkPeer is reported as bad more than + * replication.bad.sink.threshold times, it will be removed + * from the pool of potential replication targets. + * + * @param sinkPeer + * The SinkPeer that had a failed replication attempt on it + */ + public void reportBadSink(SinkPeer sinkPeer) { + ServerName serverName = sinkPeer.getServerName(); + int badReportCount = (badReportCounts.containsKey(serverName) + ? badReportCounts.get(serverName) : 0) + 1; + badReportCounts.put(serverName, badReportCount); + if (badReportCount > badSinkThreshold) { + this.sinks.remove(serverName); + if (sinks.isEmpty()) { + chooseSinks(); + } + } + } + + void chooseSinks() { + List slaveAddresses = + replicationPeers.getRegionServersOfConnectedPeer(peerClusterId); + Collections.shuffle(slaveAddresses, random); + int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); + sinks = slaveAddresses.subList(0, numSinks); + lastUpdateToPeers = System.currentTimeMillis(); + badReportCounts.clear(); + } + + List getSinks() { + return sinks; + } + + /** + * Wraps a replication region server sink to provide the ability to identify + * it. + */ + public static class SinkPeer { + private ServerName serverName; + private AdminService.BlockingInterface regionServer; + + public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) { + this.serverName = serverName; + this.regionServer = regionServer; + } + + ServerName getServerName() { + return serverName; + } + + public AdminService.BlockingInterface getRegionServer() { + return regionServer; + } + + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index dc637555840..7973a408b9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -23,16 +23,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -45,20 +40,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -89,9 +83,6 @@ public class ReplicationSource extends Thread private ReplicationQueues replicationQueues; private ReplicationPeers replicationPeers; private Configuration conf; - // ratio of region servers to chose from a slave cluster - private float ratio; - private Random random; private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; @@ -99,8 +90,6 @@ public class ReplicationSource extends Thread private ReplicationSourceManager manager; // Should we stop everything? private Stoppable stopper; - // List of chosen sinks (region servers) - private List currentPeers; // How long should we sleep for each retry private long sleepForRetries; // Max size in bytes of entriesArray @@ -140,6 +129,8 @@ public class ReplicationSource extends Thread private MetricsSource metrics; // Handle on the log reader helper private ReplicationHLogReaderManager repLogReader; + // Handles connecting to peer region servers + private ReplicationSinkManager replicationSinkMgr; /** * Instantiation method used by region servers @@ -178,9 +169,6 @@ public class ReplicationSource extends Thread this.conn = HConnectionManager.getConnection(conf); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; - this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); - this.currentPeers = new ArrayList(); - this.random = new Random(); this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); @@ -193,29 +181,9 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); + this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf); } - /** - * Select a number of peers at random using the ratio. Mininum 1. - */ - private void chooseSinks() { - this.currentPeers.clear(); - List addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId); - Set setOfAddr = new HashSet(); - int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); - LOG.debug("Getting " + nbPeers + - " rs from peer cluster # " + this.peerId); - for (int i = 0; i < nbPeers; i++) { - ServerName sn; - // Make sure we get one address that we don't already have - do { - sn = addresses.get(this.random.nextInt(addresses.size())); - } while (setOfAddr.contains(sn)); - LOG.info("Choosing peer " + sn); - setOfAddr.add(sn); - } - this.currentPeers.addAll(setOfAddr); - } @Override public void enqueueLog(Path log) { @@ -457,9 +425,9 @@ public class ReplicationSource extends Thread int sleepMultiplier = 1; // Connect to peer cluster first, unless we have to stop - while (this.isActive() && this.currentPeers.size() == 0) { - chooseSinks(); - if (this.isActive() && this.currentPeers.size() == 0) { + while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { + replicationSinkMgr.chooseSinks(); + if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) { sleepMultiplier++; } @@ -583,7 +551,7 @@ public class ReplicationSource extends Thread return (this.repLogReader.getPosition() == 0 && !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0); } - + /** * Do the sleeping logic * @param msg Why we sleep @@ -637,7 +605,7 @@ public class ReplicationSource extends Thread /** * Do the shipping logic - * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) + * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * written to when this method was called */ protected void shipEdits(boolean currentWALisBeingWrittenTo) { @@ -653,8 +621,10 @@ public class ReplicationSource extends Thread } continue; } + SinkPeer sinkPeer = null; try { - AdminService.BlockingInterface rrs = getRS(); + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + this.currentNbEntries + " entries"); } @@ -700,27 +670,17 @@ public class ReplicationSource extends Thread this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); - chooseSinks(); + replicationSinkMgr.chooseSinks(); } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } } - try { - boolean down; - // Spin while the slave is down and we're not asked to shutdown/close - do { - down = isSlaveDown(); - if (down) { - if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { - sleepMultiplier++; - } else { - chooseSinks(); - } - } - } while (this.isActive() && down ); - } catch (InterruptedException e) { - LOG.debug("Interrupted while trying to contact the peer cluster"); + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); + } + if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { + sleepMultiplier++; } } } @@ -797,49 +757,6 @@ public class ReplicationSource extends Thread Threads.shutdown(this, this.sleepForRetries); } - /** - * Get a new region server at random from this peer - * @return - * @throws IOException - */ - private AdminService.BlockingInterface getRS() throws IOException { - if (this.currentPeers.size() == 0) { - throw new IOException(this.peerClusterZnode + " has 0 region servers"); - } - ServerName address = - currentPeers.get(random.nextInt(this.currentPeers.size())); - return this.conn.getAdmin(address); - } - - /** - * Check if the slave is down by trying to establish a connection - * @return true if down, false if up - * @throws InterruptedException - */ - public boolean isSlaveDown() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - Thread pingThread = new Thread() { - public void run() { - try { - AdminService.BlockingInterface rrs = getRS(); - // Dummy call which should fail - ProtobufUtil.getServerInfo(rrs); - latch.countDown(); - } catch (IOException ex) { - if (ex instanceof RemoteException) { - ex = ((RemoteException) ex).unwrapRemoteException(); - } - LOG.info("Slave cluster looks down: " + ex.getMessage(), ex); - } - } - }; - pingThread.start(); - // awaits returns true if countDown happened - boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS); - pingThread.interrupt(); - return down; - } - public String getPeerClusterZnode() { return this.peerClusterZnode; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java new file mode 100644 index 00000000000..e5349b2be38 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test handling of changes to the number of a peer's regionservers. + */ +@Category(LargeTests.class) +public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { + + private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class); + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + htable1.setAutoFlush(true); + // Starting and stopping replication can make us miss new logs, + // rolling like this makes sure the most recent one gets added to the queue + for (JVMClusterUtil.RegionServerThread r : + utility1.getHBaseCluster().getRegionServerThreads()) { + r.getRegionServer().getWAL().rollWriter(); + } + utility1.truncateTable(tableName); + // truncating the table will send one Delete per row to the slave cluster + // in an async fashion, which is why we cannot just call truncateTable on + // utility2 since late writes could make it to the slave in some way. + // Instead, we truncate the first table and wait for all the Deletes to + // make it to the slave. + Scan scan = new Scan(); + int lastCount = 0; + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for truncate"); + } + ResultScanner scanner = htable2.getScanner(scan); + Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); + scanner.close(); + if (res.length != 0) { + if (res.length < lastCount) { + i--; // Don't increment timeout if we make progress + } + lastCount = res.length; + LOG.info("Still got " + res.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + @Test(timeout = 300000) + public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { + + LOG.info("testSimplePutDelete"); + MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); + + doPutTest(Bytes.toBytes(1)); + + int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; + peerCluster.stopRegionServer(rsToStop); + peerCluster.waitOnRegionServer(rsToStop); + + // Sanity check + assertEquals(1, peerCluster.getRegionServerThreads().size()); + + doPutTest(Bytes.toBytes(2)); + + peerCluster.startRegionServer(); + + // Sanity check + assertEquals(2, peerCluster.getRegionServerThreads().size()); + + doPutTest(Bytes.toBytes(3)); + + } + + private void doPutTest(byte[] row) throws IOException, InterruptedException { + Put put = new Put(row); + put.add(famName, row, row); + + htable1 = new HTable(conf1, tableName); + htable1.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java new file mode 100644 index 00000000000..296f9532e27 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(SmallTests.class) +public class TestReplicationSinkManager { + + private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; + + private ReplicationPeers replicationPeers; + private ReplicationSinkManager sinkManager; + + @Before + public void setUp() { + replicationPeers = mock(ReplicationPeers.class); + sinkManager = new ReplicationSinkManager(mock(HConnection.class), + PEER_CLUSTER_ID, replicationPeers, new Configuration()); + } + + @Test + public void testChooseSinks() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + .thenReturn(serverNames); + + sinkManager.chooseSinks(); + + assertEquals(2, sinkManager.getSinks().size()); + + } + + @Test + public void testChooseSinks_LessThanRatioAvailable() { + List serverNames = Lists.newArrayList(mock(ServerName.class), + mock(ServerName.class)); + + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + .thenReturn(serverNames); + + sinkManager.chooseSinks(); + + assertEquals(1, sinkManager.getSinks().size()); + } + + @Test + public void testReportBadSink() { + ServerName serverNameA = mock(ServerName.class); + ServerName serverNameB = mock(ServerName.class); + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn( + Lists.newArrayList(serverNameA, serverNameB)); + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(1, sinkManager.getSinks().size()); + + SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); + + sinkManager.reportBadSink(sinkPeer); + + // Just reporting a bad sink once shouldn't have an effect + assertEquals(1, sinkManager.getSinks().size()); + + } + + /** + * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not + * be replicated to anymore. + */ + @Test + public void testReportBadSink_PastThreshold() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + .thenReturn(serverNames); + + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(2, sinkManager.getSinks().size()); + + ServerName serverName = sinkManager.getSinks().get(0); + + SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + + for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { + sinkManager.reportBadSink(sinkPeer); + } + + // Reporting a bad sink more than the threshold count should remove it + // from the list of potential sinks + assertEquals(1, sinkManager.getSinks().size()); + } + + @Test + public void testReportBadSink_DownToZeroSinks() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + .thenReturn(serverNames); + + + sinkManager.chooseSinks(); + // Sanity check + + List sinkList = sinkManager.getSinks(); + assertEquals(2, sinkList.size()); + + ServerName serverNameA = sinkList.get(0); + ServerName serverNameB = sinkList.get(1); + + SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); + + for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { + sinkManager.reportBadSink(sinkPeerA); + sinkManager.reportBadSink(sinkPeerB); + } + + // We've gone down to 0 good sinks, so the replication sinks + // should have been refreshed now + assertEquals(2, sinkManager.getSinks().size()); + } + +}