HBASE-3134 [replication] Add the ability to enable/disable streams (Teruyoshi Zenmyo)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1308675 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-04-03 04:14:46 +00:00
parent f9802e4cdf
commit 450583b9bd
13 changed files with 320 additions and 47 deletions

View File

@ -23,7 +23,6 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -136,16 +135,16 @@ public class ReplicationAdmin implements Closeable {
* Restart the replication stream to the specified peer. * Restart the replication stream to the specified peer.
* @param id a short that identifies the cluster * @param id a short that identifies the cluster
*/ */
public void enablePeer(String id) { public void enablePeer(String id) throws IOException {
throw new NotImplementedException("Not implemented"); this.replicationZk.enablePeer(id);
} }
/** /**
* Stop the replication stream to the specified peer. * Stop the replication stream to the specified peer.
* @param id a short that identifies the cluster * @param id a short that identifies the cluster
*/ */
public void disablePeer(String id) { public void disablePeer(String id) throws IOException {
throw new NotImplementedException("Not implemented"); this.replicationZk.disablePeer(id);
} }
/** /**
@ -164,6 +163,20 @@ public class ReplicationAdmin implements Closeable {
return this.replicationZk.listPeers(); return this.replicationZk.listPeers();
} }
/**
* Get state of the peer
*
* @param id peer's identifier
* @return current state of the peer
*/
public String getPeerState(String id) throws IOException {
try {
return this.replicationZk.getPeerState(id).name();
} catch (KeeperException e) {
throw new IOException("Couldn't get the state of the peer " + id, e);
}
}
/** /**
* Get the current status of the kill switch, if the cluster is replicating * Get the current status of the kill switch, if the cluster is replicating
* or not. * or not.

View File

@ -31,7 +31,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/** /**
* This class acts as a wrapper for all the objects used to identify and * This class acts as a wrapper for all the objects used to identify and
@ -50,6 +55,8 @@ public class ReplicationPeer implements Abortable, Closeable {
private ZooKeeperWatcher zkw; private ZooKeeperWatcher zkw;
private final Configuration conf; private final Configuration conf;
private PeerStateTracker peerStateTracker;
/** /**
* Constructor that takes all the objects required to communicate with the * Constructor that takes all the objects required to communicate with the
* specified peer, except for the region server addresses. * specified peer, except for the region server addresses.
@ -65,6 +72,31 @@ public class ReplicationPeer implements Abortable, Closeable {
this.reloadZkWatcher(); this.reloadZkWatcher();
} }
/**
* start a state tracker to check whether this peer is enabled or not
*
* @param zookeeper zk watcher for the local cluster
* @param peerStateNode path to zk node which stores peer state
* @throws KeeperException
*/
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
throws KeeperException {
if (ZKUtil.checkExists(zookeeper, peerStateNode) == -1) {
ZKUtil.createAndWatch(zookeeper, peerStateNode,
Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
}
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper,
this);
this.peerStateTracker.start();
this.readPeerStateZnode();
}
private void readPeerStateZnode() {
String currentState = Bytes.toString(peerStateTracker.getData(false));
this.peerEnabled.set(PeerState.ENABLED.equals(PeerState
.valueOf(currentState)));
}
/** /**
* Get the cluster key of that peer * Get the cluster key of that peer
* @return string consisting of zk ensemble addresses, client port * @return string consisting of zk ensemble addresses, client port
@ -152,4 +184,23 @@ public class ReplicationPeer implements Abortable, Closeable {
zkw.close(); zkw.close();
} }
} }
/**
* Tracker for state of this peer
*/
public class PeerStateTracker extends ZooKeeperNodeTracker {
public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, peerStateZNode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
readPeerStateZnode();
}
}
}
} }

View File

@ -50,18 +50,20 @@ import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.KeeperException.SessionExpiredException;
/** /**
* This class serves as a helper for all things related to zookeeper * This class serves as a helper for all things related to zookeeper in
* in replication. * replication.
* <p/> * <p/>
* The layout looks something like this under zookeeper.znode.parent * The layout looks something like this under zookeeper.znode.parent for the
* for the master cluster: * master cluster:
* <p/> * <p/>
*
* <pre> * <pre>
* replication/ * replication/
* state {contains true or false} * state {contains true or false}
* clusterId {contains a byte} * clusterId {contains a byte}
* peers/ * peers/
* 1/ {contains a full cluster address} * 1/ {contains a full cluster address}
* peer-state {contains ENABLED or DISABLED}
* 2/ * 2/
* ... * ...
* rs/ {lists all RS that replicate} * rs/ {lists all RS that replicate}
@ -82,6 +84,12 @@ public class ReplicationZookeeper implements Closeable{
LogFactory.getLog(ReplicationZookeeper.class); LogFactory.getLog(ReplicationZookeeper.class);
// Name of znode we use to lock when failover // Name of znode we use to lock when failover
private final static String RS_LOCK_ZNODE = "lock"; private final static String RS_LOCK_ZNODE = "lock";
// Values of znode which stores state of a peer
public static enum PeerState {
ENABLED, DISABLED
};
// Our handle on zookeeper // Our handle on zookeeper
private final ZooKeeperWatcher zookeeper; private final ZooKeeperWatcher zookeeper;
// Map of peer clusters keyed by their id // Map of peer clusters keyed by their id
@ -96,6 +104,8 @@ public class ReplicationZookeeper implements Closeable{
private String rsServerNameZnode; private String rsServerNameZnode;
// Name node if the replicationState znode // Name node if the replicationState znode
private String replicationStateNodeName; private String replicationStateNodeName;
// Name of zk node which stores peer state
private String peerStateNodeName;
private final Configuration conf; private final Configuration conf;
// Is this cluster replicating at the moment? // Is this cluster replicating at the moment?
private AtomicBoolean replicating; private AtomicBoolean replicating;
@ -150,6 +160,8 @@ public class ReplicationZookeeper implements Closeable{
conf.get("zookeeper.znode.replication", "replication"); conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = String peersZNodeName =
conf.get("zookeeper.znode.replication.peers", "peers"); conf.get("zookeeper.znode.replication.peers", "peers");
this.peerStateNodeName = conf.get(
"zookeeper.znode.replication.peers.state", "peer-state");
this.replicationStateNodeName = this.replicationStateNodeName =
conf.get("zookeeper.znode.replication.state", "state"); conf.get("zookeeper.znode.replication.state", "state");
String rsZNodeName = String rsZNodeName =
@ -339,8 +351,10 @@ public class ReplicationZookeeper implements Closeable{
return null; return null;
} }
return new ReplicationPeer(otherConf, peerId, ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
otherClusterKey); otherClusterKey);
peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
return peer;
} }
/** /**
@ -366,7 +380,8 @@ public class ReplicationZookeeper implements Closeable{
if (!peerExists(id)) { if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove inexisting peer"); throw new IllegalArgumentException("Cannot remove inexisting peer");
} }
ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Unable to remove a peer", e); throw new IOException("Unable to remove a peer", e);
} }
@ -388,6 +403,8 @@ public class ReplicationZookeeper implements Closeable{
ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.createAndWatch(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey)); ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Unable to add peer", e); throw new IOException("Unable to add peer", e);
} }
@ -398,6 +415,82 @@ public class ReplicationZookeeper implements Closeable{
ZKUtil.joinZNode(this.peersZNode, id)) >= 0; ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
} }
/**
* Enable replication to the peer
*
* @param id peer's identifier
* @throws IllegalArgumentException
* Thrown when the peer doesn't exist
*/
public void enablePeer(String id) throws IOException {
changePeerState(id, PeerState.ENABLED);
LOG.info("peer " + id + " is enabled");
}
/**
* Disable replication to the peer
*
* @param id peer's identifier
* @throws IllegalArgumentException
* Thrown when the peer doesn't exist
*/
public void disablePeer(String id) throws IOException {
changePeerState(id, PeerState.DISABLED);
LOG.info("peer " + id + " is disabled");
}
private void changePeerState(String id, PeerState state) throws IOException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("peer " + id + " is not registered");
}
String peerStateZNode = getPeerStateNode(id);
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
ZKUtil.setData(this.zookeeper, peerStateZNode,
Bytes.toBytes(state.name()));
} else {
ZKUtil.createAndWatch(zookeeper, peerStateZNode,
Bytes.toBytes(state.name()));
}
LOG.info("state of the peer " + id + " changed to " + state.name());
} catch (KeeperException e) {
throw new IOException("Unable to change state of the peer " + id, e);
}
}
/**
* Get state of the peer. This method checks the state by connecting to ZK.
*
* @param id peer's identifier
* @return current state of the peer
*/
public PeerState getPeerState(String id) throws KeeperException {
byte[] peerStateBytes = ZKUtil
.getData(this.zookeeper, getPeerStateNode(id));
return PeerState.valueOf(Bytes.toString(peerStateBytes));
}
/**
* Check whether the peer is enabled or not. This method checks the atomic
* boolean of ReplicationPeer locally.
*
* @param id peer identifier
* @return true if the peer is enabled, otherwise false
* @throws IllegalArgumentException
* Thrown when the peer doesn't exist
*/
public boolean getPeerEnabled(String id) {
if (!this.peerClusters.containsKey(id)) {
throw new IllegalArgumentException("peer " + id + " is not registered");
}
return this.peerClusters.get(id).getPeerEnabled().get();
}
private String getPeerStateNode(String id) {
return ZKUtil.joinZNode(this.peersZNode,
ZKUtil.joinZNode(id, this.peerStateNodeName));
}
/** /**
* This reads the state znode for replication and sets the atomic boolean * This reads the state znode for replication and sets the atomic boolean
*/ */

View File

@ -138,9 +138,6 @@ public class ReplicationSource extends Thread
private volatile boolean running = true; private volatile boolean running = true;
// Metrics for this source // Metrics for this source
private ReplicationSourceMetrics metrics; private ReplicationSourceMetrics metrics;
// If source is enabled, replication happens. If disabled, nothing will be
// replicated but HLogs will still be queued
private AtomicBoolean sourceEnabled = new AtomicBoolean();
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
@ -274,7 +271,7 @@ public class ReplicationSource extends Thread
// Loop until we close down // Loop until we close down
while (isActive()) { while (isActive()) {
// Sleep until replication is enabled again // Sleep until replication is enabled again
if (!this.replicating.get() || !this.sourceEnabled.get()) { if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
@ -601,6 +598,12 @@ public class ReplicationSource extends Thread
return; return;
} }
while (this.isActive()) { while (this.isActive()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
try { try {
HRegionInterface rrs = getRS(); HRegionInterface rrs = getRS();
LOG.debug("Replicating " + currentNbEntries); LOG.debug("Replicating " + currentNbEntries);
@ -659,6 +662,15 @@ public class ReplicationSource extends Thread
} }
} }
/**
* check whether the peer is enabled or not
*
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId);
}
/** /**
* If the queue isn't empty, switch to the next one * If the queue isn't empty, switch to the next one
* Else if this is a recovered queue, it means we're done! * Else if this is a recovered queue, it means we're done!
@ -765,10 +777,6 @@ public class ReplicationSource extends Thread
return this.currentPath; return this.currentPath;
} }
public void setSourceEnabled(boolean status) {
this.sourceEnabled.set(status);
}
private boolean isActive() { private boolean isActive() {
return !this.stopper.isStopped() && this.running; return !this.stopper.isStopped() && this.running;
} }

View File

@ -95,9 +95,4 @@ public interface ReplicationSourceInterface {
*/ */
public String getPeerClusterId(); public String getPeerClusterId();
/**
* Set if this source is enabled or disabled
* @param status the new status
*/
public void setSourceEnabled(boolean status);
} }

View File

@ -35,7 +35,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -48,6 +47,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* This class is responsible to manage all the replication * This class is responsible to manage all the replication
* sources. There are two classes of sources: * sources. There are two classes of sources:
@ -203,8 +204,6 @@ public class ReplicationSourceManager {
public ReplicationSourceInterface addSource(String id) throws IOException { public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src = ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
// TODO set it to what's in ZK
src.setSourceEnabled(true);
synchronized (this.hlogsById) { synchronized (this.hlogsById) {
this.sources.add(src); this.sources.add(src);
this.hlogsById.put(id, new TreeSet<String>()); this.hlogsById.put(id, new TreeSet<String>());
@ -585,8 +584,6 @@ public class ReplicationSourceManager {
for (String hlog : entry.getValue()) { for (String hlog : entry.getValue()) {
src.enqueueLog(new Path(oldLogDir, hlog)); src.enqueueLog(new Path(oldLogDir, hlog));
} }
// TODO set it to what's in ZK
src.setSourceEnabled(true);
src.startup(); src.startup();
} catch (IOException e) { } catch (IOException e) {
// TODO manage it // TODO manage it

View File

@ -49,6 +49,12 @@ module Hbase
@replication_admin.listPeers @replication_admin.listPeers
end end
#----------------------------------------------------------------------------------------------
# Get peer cluster state
def get_peer_state(id)
@replication_admin.getPeerState(id)
end
#---------------------------------------------------------------------------------------------- #----------------------------------------------------------------------------------------------
# Restart the replication stream to the specified peer # Restart the replication stream to the specified peer
def enable_peer(id) def enable_peer(id)

View File

@ -26,8 +26,6 @@ module Shell
Stops the replication stream to the specified cluster, but still Stops the replication stream to the specified cluster, but still
keeps track of new edits to replicate. keeps track of new edits to replicate.
CURRENTLY UNSUPPORTED
Examples: Examples:
hbase> disable_peer '1' hbase> disable_peer '1'

View File

@ -26,8 +26,6 @@ module Shell
Restarts the replication to the specified peer cluster, Restarts the replication to the specified peer cluster,
continuing from where it was disabled. continuing from where it was disabled.
CURRENTLY UNSUPPORTED
Examples: Examples:
hbase> enable_peer '1' hbase> enable_peer '1'

View File

@ -33,10 +33,11 @@ EOF
now = Time.now now = Time.now
peers = replication_admin.list_peers peers = replication_admin.list_peers
formatter.header(["PEER ID", "CLUSTER KEY"]) formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"])
peers.entrySet().each do |e| peers.entrySet().each do |e|
formatter.row([ e.key, e.value ]) state = replication_admin.get_peer_state(e.key)
formatter.row([ e.key, e.value, state ])
end end
formatter.footer(now) formatter.footer(now)

View File

@ -19,6 +19,9 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -26,9 +29,6 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Source that does nothing at all, helpful to test ReplicationSourceManager * Source that does nothing at all, helpful to test ReplicationSourceManager
*/ */
@ -81,10 +81,4 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public String getPeerClusterId() { public String getPeerClusterId() {
return peerClusterId; return peerClusterId;
} }
@Override
public void setSourceEnabled(boolean status) {
}
} }

View File

@ -26,7 +26,14 @@ import static org.junit.Assert.fail;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -444,9 +451,108 @@ public class TestReplication {
} }
/**
* Test disable/enable replication, trying to insert, make sure nothing's
* replicated, enable it, the insert should be replicated
*
* @throws Exception
*/
@Test(timeout = 300000)
public void testDisableEnable() throws Exception {
// Test disabling replication
admin.disablePeer("2");
byte[] rowkey = Bytes.toBytes("disable enable");
Put put = new Put(rowkey);
put.add(famName, row, row);
htable1.put(put);
Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
if (res.size() >= 1) {
fail("Replication wasn't disabled");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
// Test enable replication
admin.enablePeer("2");
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
return;
}
}
fail("Waited too much time for put replication");
}
/**
* Test disabling an inactive peer. Add a peer which is inactive, trying to
* insert, disable the peer, then activate the peer and make sure nothing is
* replicated. In Addition, enable the peer and check the updates are
* replicated.
*
* @throws Exception
*/
@Test(timeout = 600000)
public void testDisableInactivePeer() throws Exception {
// enabling and shutdown the peer
admin.enablePeer("2");
utility2.shutdownMiniHBaseCluster();
byte[] rowkey = Bytes.toBytes("disable inactive peer");
Put put = new Put(rowkey);
put.add(famName, row, row);
htable1.put(put);
// wait for the sleep interval of the master cluster to become long
Thread.sleep(SLEEP_TIME * NB_RETRIES);
// disable and start the peer
admin.disablePeer("2");
utility2.startMiniHBaseCluster(1, 1);
Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
if (res.size() >= 1) {
fail("Replication wasn't disabled");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
// Test enable replication
admin.enablePeer("2");
// wait since the sleep interval would be long
Thread.sleep(SLEEP_TIME * NB_RETRIES);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME * NB_RETRIES);
} else {
assertArrayEquals(res.value(), row);
return;
}
}
fail("Waited too much time for put replication");
}
/** /**
* Integration test for TestReplicationAdmin, removes and re-add a peer * Integration test for TestReplicationAdmin, removes and re-add a peer
* cluster * cluster
*
* @throws Exception * @throws Exception
*/ */
@Test(timeout=300000) @Test(timeout=300000)

View File

@ -30,13 +30,23 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -100,6 +110,9 @@ public class TestReplicationSourceManager {
ZKUtil.setData(zkw, "/hbase/replication/peers/1", ZKUtil.setData(zkw, "/hbase/replication/peers/1",
Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name()));
ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));