HBASE-7634 Replication handling of changes to peer clusters is inefficient (Gabriel Reid via JD)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1509332 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2013-08-01 17:03:52 +00:00
parent 33a263cc26
commit cb1ad44c24
7 changed files with 582 additions and 109 deletions

View File

@ -18,6 +18,13 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -35,12 +42,6 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* This class acts as a wrapper for all the objects used to identify and * This class acts as a wrapper for all the objects used to identify and
* communicate with remote peers and is responsible for answering to expired * communicate with remote peers and is responsible for answering to expired
@ -57,9 +58,11 @@ public class ReplicationPeer implements Abortable, Closeable {
// Cannot be final since a new object needs to be recreated when session fails // Cannot be final since a new object needs to be recreated when session fails
private ZooKeeperWatcher zkw; private ZooKeeperWatcher zkw;
private final Configuration conf; private final Configuration conf;
private long lastRegionserverUpdate;
private PeerStateTracker peerStateTracker; private PeerStateTracker peerStateTracker;
/** /**
* Constructor that takes all the objects required to communicate with the * Constructor that takes all the objects required to communicate with the
* specified peer, except for the region server addresses. * specified peer, except for the region server addresses.
@ -130,6 +133,7 @@ public class ReplicationPeer implements Abortable, Closeable {
*/ */
public void setRegionServers(List<ServerName> regionServers) { public void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers; this.regionServers = regionServers;
lastRegionserverUpdate = System.currentTimeMillis();
} }
/** /**
@ -140,6 +144,15 @@ public class ReplicationPeer implements Abortable, Closeable {
return zkw; return zkw;
} }
/**
* Get the timestamp at which the last change occurred to the list of region servers to replicate
* to.
* @return The System.currentTimeMillis at the last time the list of peer region servers changed.
*/
public long getLastRegionserverUpdate() {
return lastRegionserverUpdate;
}
/** /**
* Get the identifier of this peer * Get the identifier of this peer
* @return string representation of the id (short) * @return string representation of the id (short)

View File

@ -122,6 +122,14 @@ public interface ReplicationPeers {
*/ */
List<ServerName> getRegionServersOfConnectedPeer(String peerId); List<ServerName> getRegionServersOfConnectedPeer(String peerId);
/**
* Get the timestamp of the last change in composition of a given peer cluster.
* @param peerId identifier of the peer cluster for which the timestamp is requested
* @return the timestamp (in milliseconds) of the last change to the composition of
* the peer cluster
*/
long getTimestampOfLastChangeToPeer(String peerId);
/** /**
* Returns the UUID of the provided peer id. * Returns the UUID of the provided peer id.
* @param peerId the peer's ID that will be converted into a UUID * @param peerId the peer's ID that will be converted into a UUID

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.ConnectionLossException;
@ -266,6 +267,14 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return ids; return ids;
} }
@Override
public long getTimestampOfLastChangeToPeer(String peerId) {
if (!peerClusters.containsKey(peerId)) {
throw new IllegalArgumentException("Unknown peer id: " + peerId);
}
return peerClusters.get(peerId).getLastRegionserverUpdate();
}
/** /**
* A private method used during initialization. This method attempts to connect to all registered * A private method used during initialization. This method attempts to connect to all registered
* peer clusters. This method does not set a watch on the peer cluster znodes. * peer clusters. This method does not set a watch on the peer cluster znodes.
@ -291,6 +300,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke); LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
try { try {
peer.reloadZkWatcher(); peer.reloadZkWatcher();
peer.getZkw().registerListener(new PeerRegionServerListener(peer));
} catch (IOException io) { } catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io); LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
} }
@ -304,7 +314,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
*/ */
private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw) private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
throws KeeperException { throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode); List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
if (children == null) { if (children == null) {
return Collections.emptyList(); return Collections.emptyList();
} }
@ -315,6 +325,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return addresses; return addresses;
} }
private String getPeerStateNode(String id) { private String getPeerStateNode(String id) {
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
} }
@ -366,6 +377,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
ReplicationPeer peer = ReplicationPeer peer =
new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf)); new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId)); peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
peer.getZkw().registerListener(new PeerRegionServerListener(peer));
return peer; return peer;
} }
@ -406,4 +418,37 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
.toByteArray(); .toByteArray();
return ProtobufUtil.prependPBMagic(bytes); return ProtobufUtil.prependPBMagic(bytes);
} }
/**
* Tracks changes to the list of region servers in a peer's cluster.
*/
public static class PeerRegionServerListener extends ZooKeeperListener {
private ReplicationPeer peer;
private String regionServerListNode;
public PeerRegionServerListener(ReplicationPeer replicationPeer) {
super(replicationPeer.getZkw());
this.peer = replicationPeer;
this.regionServerListNode = peer.getZkw().rsZNode;
}
public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
super(zkw);
this.regionServerListNode = regionServerListNode;
}
@Override
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
try {
LOG.info("Detected change to peer regionservers, fetching updated list");
peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
} catch (KeeperException e) {
LOG.fatal("Error reading slave addresses", e);
}
}
}
}
} }

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Maintains a collection of peers to replicate to, and randomly selects a
* single peer to replicate to per set of data to replicate. Also handles
* keeping track of peer availability.
*/
public class ReplicationSinkManager {
private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class);
/**
* Default maximum number of times a replication sink can be reported as bad before
* it will no longer be provided as a sink for replication without the pool of
* replication sinks being refreshed.
*/
static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
/**
* Default ratio of the total number of peer cluster region servers to consider
* replicating to.
*/
static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
private final HConnection conn;
private final String peerClusterId;
private final ReplicationPeers replicationPeers;
// Count of "bad replication sink" reports per peer sink
private final Map<ServerName, Integer> badReportCounts;
// Ratio of total number of potential peer region servers to be used
private final float ratio;
// Maximum number of times a sink can be reported as bad before the pool of
// replication sinks is refreshed
private final int badSinkThreshold;
private final Random random;
// A timestamp of the last time the list of replication peers changed
private long lastUpdateToPeers;
// The current pool of sinks to which replication can be performed
private List<ServerName> sinks = Lists.newArrayList();
/**
* Instantiate for a single replication peer cluster.
* @param conn connection to the peer cluster
* @param peerClusterId identifier of the peer cluster
* @param replicationPeers manages peer clusters being replicated to
* @param conf HBase configuration, used for determining replication source ratio and bad peer
* threshold
*/
public ReplicationSinkManager(HConnection conn, String peerClusterId,
ReplicationPeers replicationPeers, Configuration conf) {
this.conn = conn;
this.peerClusterId = peerClusterId;
this.replicationPeers = replicationPeers;
this.badReportCounts = Maps.newHashMap();
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
DEFAULT_BAD_SINK_THRESHOLD);
this.random = new Random();
}
/**
* Get a randomly-chosen replication sink to replicate to.
*
* @return a replication sink to replicate to
*/
public SinkPeer getReplicationSink() throws IOException {
if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
> this.lastUpdateToPeers) {
LOG.info("Current list of sinks is out of date, updating");
chooseSinks();
}
if (sinks.isEmpty()) {
throw new IOException("No replication sinks are available");
}
ServerName serverName = sinks.get(random.nextInt(sinks.size()));
return new SinkPeer(serverName, conn.getAdmin(serverName));
}
/**
* Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
* failed). If a single SinkPeer is reported as bad more than
* replication.bad.sink.threshold times, it will be removed
* from the pool of potential replication targets.
*
* @param sinkPeer
* The SinkPeer that had a failed replication attempt on it
*/
public void reportBadSink(SinkPeer sinkPeer) {
ServerName serverName = sinkPeer.getServerName();
int badReportCount = (badReportCounts.containsKey(serverName)
? badReportCounts.get(serverName) : 0) + 1;
badReportCounts.put(serverName, badReportCount);
if (badReportCount > badSinkThreshold) {
this.sinks.remove(serverName);
if (sinks.isEmpty()) {
chooseSinks();
}
}
}
void chooseSinks() {
List<ServerName> slaveAddresses =
replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
Collections.shuffle(slaveAddresses, random);
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
lastUpdateToPeers = System.currentTimeMillis();
badReportCounts.clear();
}
List<ServerName> getSinks() {
return sinks;
}
/**
* Wraps a replication region server sink to provide the ability to identify
* it.
*/
public static class SinkPeer {
private ServerName serverName;
private AdminService.BlockingInterface regionServer;
public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
this.serverName = serverName;
this.regionServer = regionServer;
}
ServerName getServerName() {
return serverName;
}
public AdminService.BlockingInterface getRegionServer() {
return regionServer;
}
}
}

View File

@ -23,16 +23,11 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -45,20 +40,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -89,9 +83,6 @@ public class ReplicationSource extends Thread
private ReplicationQueues replicationQueues; private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers; private ReplicationPeers replicationPeers;
private Configuration conf; private Configuration conf;
// ratio of region servers to chose from a slave cluster
private float ratio;
private Random random;
private ReplicationQueueInfo replicationQueueInfo; private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to // id of the peer cluster this source replicates to
private String peerId; private String peerId;
@ -99,8 +90,6 @@ public class ReplicationSource extends Thread
private ReplicationSourceManager manager; private ReplicationSourceManager manager;
// Should we stop everything? // Should we stop everything?
private Stoppable stopper; private Stoppable stopper;
// List of chosen sinks (region servers)
private List<ServerName> currentPeers;
// How long should we sleep for each retry // How long should we sleep for each retry
private long sleepForRetries; private long sleepForRetries;
// Max size in bytes of entriesArray // Max size in bytes of entriesArray
@ -140,6 +129,8 @@ public class ReplicationSource extends Thread
private MetricsSource metrics; private MetricsSource metrics;
// Handle on the log reader helper // Handle on the log reader helper
private ReplicationHLogReaderManager repLogReader; private ReplicationHLogReaderManager repLogReader;
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
@ -178,9 +169,6 @@ public class ReplicationSource extends Thread
this.conn = HConnectionManager.getConnection(conf); this.conn = HConnectionManager.getConnection(conf);
this.replicationQueues = replicationQueues; this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers; this.replicationPeers = replicationPeers;
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
this.currentPeers = new ArrayList<ServerName>();
this.random = new Random();
this.manager = manager; this.manager = manager;
this.sleepForRetries = this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); this.conf.getLong("replication.source.sleepforretries", 1000);
@ -193,29 +181,9 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us // ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId(); this.peerId = this.replicationQueueInfo.getPeerId();
this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf);
} }
/**
* Select a number of peers at random using the ratio. Mininum 1.
*/
private void chooseSinks() {
this.currentPeers.clear();
List<ServerName> addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId);
Set<ServerName> setOfAddr = new HashSet<ServerName>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.debug("Getting " + nbPeers +
" rs from peer cluster # " + this.peerId);
for (int i = 0; i < nbPeers; i++) {
ServerName sn;
// Make sure we get one address that we don't already have
do {
sn = addresses.get(this.random.nextInt(addresses.size()));
} while (setOfAddr.contains(sn));
LOG.info("Choosing peer " + sn);
setOfAddr.add(sn);
}
this.currentPeers.addAll(setOfAddr);
}
@Override @Override
public void enqueueLog(Path log) { public void enqueueLog(Path log) {
@ -457,9 +425,9 @@ public class ReplicationSource extends Thread
int sleepMultiplier = 1; int sleepMultiplier = 1;
// Connect to peer cluster first, unless we have to stop // Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) { while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
chooseSinks(); replicationSinkMgr.chooseSinks();
if (this.isActive() && this.currentPeers.size() == 0) { if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
@ -583,7 +551,7 @@ public class ReplicationSource extends Thread
return (this.repLogReader.getPosition() == 0 && return (this.repLogReader.getPosition() == 0 &&
!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0); !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
} }
/** /**
* Do the sleeping logic * Do the sleeping logic
* @param msg Why we sleep * @param msg Why we sleep
@ -637,7 +605,7 @@ public class ReplicationSource extends Thread
/** /**
* Do the shipping logic * Do the shipping logic
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called * written to when this method was called
*/ */
protected void shipEdits(boolean currentWALisBeingWrittenTo) { protected void shipEdits(boolean currentWALisBeingWrittenTo) {
@ -653,8 +621,10 @@ public class ReplicationSource extends Thread
} }
continue; continue;
} }
SinkPeer sinkPeer = null;
try { try {
AdminService.BlockingInterface rrs = getRS(); sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + this.currentNbEntries + " entries"); LOG.trace("Replicating " + this.currentNbEntries + " entries");
} }
@ -700,27 +670,17 @@ public class ReplicationSource extends Thread
this.socketTimeoutMultiplier); this.socketTimeoutMultiplier);
} else if (ioe instanceof ConnectException) { } else if (ioe instanceof ConnectException) {
LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
chooseSinks(); replicationSinkMgr.chooseSinks();
} else { } else {
LOG.warn("Can't replicate because of a local or network error: ", ioe); LOG.warn("Can't replicate because of a local or network error: ", ioe);
} }
} }
try { if (sinkPeer != null) {
boolean down; replicationSinkMgr.reportBadSink(sinkPeer);
// Spin while the slave is down and we're not asked to shutdown/close }
do { if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
down = isSlaveDown(); sleepMultiplier++;
if (down) {
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
sleepMultiplier++;
} else {
chooseSinks();
}
}
} while (this.isActive() && down );
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to contact the peer cluster");
} }
} }
} }
@ -797,49 +757,6 @@ public class ReplicationSource extends Thread
Threads.shutdown(this, this.sleepForRetries); Threads.shutdown(this, this.sleepForRetries);
} }
/**
* Get a new region server at random from this peer
* @return
* @throws IOException
*/
private AdminService.BlockingInterface getRS() throws IOException {
if (this.currentPeers.size() == 0) {
throw new IOException(this.peerClusterZnode + " has 0 region servers");
}
ServerName address =
currentPeers.get(random.nextInt(this.currentPeers.size()));
return this.conn.getAdmin(address);
}
/**
* Check if the slave is down by trying to establish a connection
* @return true if down, false if up
* @throws InterruptedException
*/
public boolean isSlaveDown() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Thread pingThread = new Thread() {
public void run() {
try {
AdminService.BlockingInterface rrs = getRS();
// Dummy call which should fail
ProtobufUtil.getServerInfo(rrs);
latch.countDown();
} catch (IOException ex) {
if (ex instanceof RemoteException) {
ex = ((RemoteException) ex).unwrapRemoteException();
}
LOG.info("Slave cluster looks down: " + ex.getMessage(), ex);
}
}
};
pingThread.start();
// awaits returns true if countDown happened
boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
pingThread.interrupt();
return down;
}
public String getPeerClusterZnode() { public String getPeerClusterZnode() {
return this.peerClusterZnode; return this.peerClusterZnode;
} }

View File

@ -0,0 +1,142 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test handling of changes to the number of a peer's regionservers.
*/
@Category(LargeTests.class)
public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
htable1.setAutoFlush(true);
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r :
utility1.getHBaseCluster().getRegionServerThreads()) {
r.getRegionServer().getWAL().rollWriter();
}
utility1.truncateTable(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call truncateTable on
// utility2 since late writes could make it to the slave in some way.
// Instead, we truncate the first table and wait for all the Deletes to
// make it to the slave.
Scan scan = new Scan();
int lastCount = 0;
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for truncate");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
scanner.close();
if (res.length != 0) {
if (res.length < lastCount) {
i--; // Don't increment timeout if we make progress
}
lastCount = res.length;
LOG.info("Still got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
}
@Test(timeout = 300000)
public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
LOG.info("testSimplePutDelete");
MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
doPutTest(Bytes.toBytes(1));
int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
peerCluster.stopRegionServer(rsToStop);
peerCluster.waitOnRegionServer(rsToStop);
// Sanity check
assertEquals(1, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(2));
peerCluster.startRegionServer();
// Sanity check
assertEquals(2, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(3));
}
private void doPutTest(byte[] row) throws IOException, InterruptedException {
Put put = new Put(row);
put.add(famName, row, row);
htable1 = new HTable(conf1, tableName);
htable1.put(put);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for put replication");
}
Result res = htable2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(SmallTests.class)
public class TestReplicationSinkManager {
private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
private ReplicationPeers replicationPeers;
private ReplicationSinkManager sinkManager;
@Before
public void setUp() {
replicationPeers = mock(ReplicationPeers.class);
sinkManager = new ReplicationSinkManager(mock(HConnection.class),
PEER_CLUSTER_ID, replicationPeers, new Configuration());
}
@Test
public void testChooseSinks() {
List<ServerName> serverNames = Lists.newArrayList();
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
.thenReturn(serverNames);
sinkManager.chooseSinks();
assertEquals(2, sinkManager.getSinks().size());
}
@Test
public void testChooseSinks_LessThanRatioAvailable() {
List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
mock(ServerName.class));
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
.thenReturn(serverNames);
sinkManager.chooseSinks();
assertEquals(1, sinkManager.getSinks().size());
}
@Test
public void testReportBadSink() {
ServerName serverNameA = mock(ServerName.class);
ServerName serverNameB = mock(ServerName.class);
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
Lists.newArrayList(serverNameA, serverNameB));
sinkManager.chooseSinks();
// Sanity check
assertEquals(1, sinkManager.getSinks().size());
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
sinkManager.reportBadSink(sinkPeer);
// Just reporting a bad sink once shouldn't have an effect
assertEquals(1, sinkManager.getSinks().size());
}
/**
* Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
* be replicated to anymore.
*/
@Test
public void testReportBadSink_PastThreshold() {
List<ServerName> serverNames = Lists.newArrayList();
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
.thenReturn(serverNames);
sinkManager.chooseSinks();
// Sanity check
assertEquals(2, sinkManager.getSinks().size());
ServerName serverName = sinkManager.getSinks().get(0);
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
sinkManager.reportBadSink(sinkPeer);
}
// Reporting a bad sink more than the threshold count should remove it
// from the list of potential sinks
assertEquals(1, sinkManager.getSinks().size());
}
@Test
public void testReportBadSink_DownToZeroSinks() {
List<ServerName> serverNames = Lists.newArrayList();
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
.thenReturn(serverNames);
sinkManager.chooseSinks();
// Sanity check
List<ServerName> sinkList = sinkManager.getSinks();
assertEquals(2, sinkList.size());
ServerName serverNameA = sinkList.get(0);
ServerName serverNameB = sinkList.get(1);
SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
sinkManager.reportBadSink(sinkPeerA);
sinkManager.reportBadSink(sinkPeerB);
}
// We've gone down to 0 good sinks, so the replication sinks
// should have been refreshed now
assertEquals(2, sinkManager.getSinks().size());
}
}