HBASE-16018 Refactored the ReplicationPeers interface to clear up what some methods do and move away from a ZooKeeper-specific implementation.

Also added some documentation for undocumented methods.

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Joseph Hwang 2016-06-14 08:58:49 -07:00 committed by Elliott Clark
parent 5147fb12a5
commit 61ff6ced5b
9 changed files with 85 additions and 65 deletions

View File

@ -183,7 +183,7 @@ public class ReplicationAdmin implements Closeable {
if (tableCfs != null) {
peerConfig.setTableCFsMap(tableCfs);
}
this.replicationPeers.addPeer(id, peerConfig);
this.replicationPeers.registerPeer(id, peerConfig);
}
/**
@ -192,7 +192,7 @@ public class ReplicationAdmin implements Closeable {
* @param peerConfig configuration for the replication slave cluster
*/
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
this.replicationPeers.addPeer(id, peerConfig);
this.replicationPeers.registerPeer(id, peerConfig);
}
/**
@ -212,7 +212,7 @@ public class ReplicationAdmin implements Closeable {
* @param id a short name that identifies the cluster
*/
public void removePeer(String id) throws ReplicationException {
this.replicationPeers.removePeer(id);
this.replicationPeers.unregisterPeer(id);
}
/**
@ -556,7 +556,7 @@ public class ReplicationAdmin implements Closeable {
@VisibleForTesting
public void peerAdded(String id) throws ReplicationException {
this.replicationPeers.peerAdded(id);
this.replicationPeers.peerConnected(id);
}
@VisibleForTesting

View File

@ -51,18 +51,30 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
*/
void addPeer(String peerId, ReplicationPeerConfig peerConfig)
void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException;
/**
* Removes a remote slave cluster and stops the replication to it.
* @param peerId a short that identifies the cluster
*/
void removePeer(String peerId) throws ReplicationException;
void unregisterPeer(String peerId) throws ReplicationException;
boolean peerAdded(String peerId) throws ReplicationException;
/**
* Method called after a peer has been connected. It will create a ReplicationPeer to track the
* newly connected cluster.
* @param peerId a short that identifies the cluster
* @return whether a ReplicationPeer was successfully created
* @throws ReplicationException
*/
boolean peerConnected(String peerId) throws ReplicationException;
void peerRemoved(String peerId);
/**
* Method called after a peer has been disconnected. It will remove the ReplicationPeer that
* tracked the disconnected cluster.
* @param peerId a short that identifies the cluster
*/
void peerDisconnected(String peerId);
/**
* Restart the replication to the specified remote slave cluster.
@ -77,14 +89,14 @@ public interface ReplicationPeers {
void disablePeer(String peerId) throws ReplicationException;
/**
* Get the table and column-family list string of the peer from ZK.
* Get the table and column-family list string of the peer from the underlying storage.
* @param peerId a short that identifies the cluster
*/
public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
throws ReplicationException;
/**
* Set the table and column-family list string of the peer to ZK.
* Set the table and column-family list string of the peer to the underlying storage.
* @param peerId a short that identifies the cluster
* @param tableCFs the table and column-family list which will be replicated for this peer
*/
@ -93,17 +105,20 @@ public interface ReplicationPeers {
throws ReplicationException;
/**
* Returns the ReplicationPeer
* Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
* continue to track changes to the Peer's state and config. This method returns null if no
* peer has been connected with the given peerId.
* @param peerId id for the peer
* @return ReplicationPeer object
*/
ReplicationPeer getPeer(String peerId);
ReplicationPeer getConnectedPeer(String peerId);
/**
* Returns the set of peerIds defined
* Returns the set of peerIds of the clusters that have been connected and have an underlying
* ReplicationPeer.
* @return a Set of Strings for peerIds
*/
public Set<String> getPeerIds();
public Set<String> getConnectedPeerIds();
/**
* Get the replication status for the specified connected remote slave cluster.
@ -152,5 +167,11 @@ public interface ReplicationPeers {
*/
Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
/**
* Update the peerConfig for the a given peer cluster
* @param id a short that identifies the cluster
* @param peerConfig new config for the peer cluster
* @throws ReplicationException
*/
void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
}

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
* This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
@ -105,7 +104,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public void addPeer(String id, ReplicationPeerConfig peerConfig)
public void registerPeer(String id, ReplicationPeerConfig peerConfig)
throws ReplicationException {
try {
if (peerExists(id)) {
@ -148,7 +147,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public void removePeer(String id) throws ReplicationException {
public void unregisterPeer(String id) throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove peer with id=" + id
@ -219,7 +218,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
public boolean getStatusOfPeer(String id) {
ReplicationPeer replicationPeer = this.peerClusters.get(id);
if (replicationPeer == null) {
throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
}
return replicationPeer.getPeerState() == PeerState.ENABLED;
}
@ -270,12 +269,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public ReplicationPeer getPeer(String peerId) {
public ReplicationPeer getConnectedPeer(String peerId) {
return peerClusters.get(peerId);
}
@Override
public Set<String> getPeerIds() {
public Set<String> getConnectedPeerIds() {
return peerClusters.keySet(); // this is not thread-safe
}
@ -342,7 +341,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
@Override
public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
throws ReplicationException {
ReplicationPeer peer = getPeer(id);
ReplicationPeer peer = getConnectedPeer(id);
if (peer == null){
throw new ReplicationException("Could not find peer Id " + id);
}
@ -411,12 +410,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public boolean peerAdded(String peerId) throws ReplicationException {
public boolean peerConnected(String peerId) throws ReplicationException {
return createAndAddPeer(peerId);
}
@Override
public void peerRemoved(String peerId) {
public void peerDisconnected(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) {
((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);

View File

@ -237,7 +237,7 @@ public class ReplicationSource extends Thread
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0];
}
Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
Map<TableName, List<String>> tableCFMap = replicationPeers.getConnectedPeer(peerId).getTableCFs();
if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)

View File

@ -230,7 +230,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getPeerIds()) {
for (String id : this.replicationPeers.getConnectedPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
@ -264,7 +264,7 @@ public class ReplicationSourceManager implements ReplicationListener {
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
@ -306,7 +306,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void deleteSource(String peerId, boolean closeConnection) {
this.replicationQueues.removeQueue(peerId);
if (closeConnection) {
this.replicationPeers.peerRemoved(peerId);
this.replicationPeers.peerDisconnected(peerId);
}
}
@ -381,7 +381,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// update replication queues on ZK
synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
// the to-be-removed peer
for (String id : replicationPeers.getPeerIds()) {
for (String id : replicationPeers.getConnectedPeerIds()) {
try {
this.replicationQueues.addLog(id, logName);
} catch (ReplicationException e) {
@ -586,7 +586,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void peerListChanged(List<String> peerIds) {
for (String id : peerIds) {
try {
boolean added = this.replicationPeers.peerAdded(id);
boolean added = this.replicationPeers.peerConnected(id);
if (added) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
@ -659,7 +659,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
String actualPeerId = replicationQueueInfo.getPeerId();
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
@ -688,7 +688,7 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer);
if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
if (!this.rp.getConnectedPeerIds().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}

View File

@ -110,7 +110,7 @@ public class TestReplicationHFileCleaner {
@Before
public void setup() throws ReplicationException, IOException {
root = TEST_UTIL.getDataTestDirOnTestFS();
rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
rq.addPeerToHFileRefs(peerId);
}
@ -121,7 +121,7 @@ public class TestReplicationHFileCleaner {
} catch (IOException e) {
LOG.warn("Failed to delete files recursively from path " + root);
}
rp.removePeer(peerId);
rp.unregisterPeer(peerId);
}
@Test

View File

@ -165,7 +165,7 @@ public abstract class TestReplicationStateBasic {
rp.init();
try {
rp.addPeer(ID_ONE,
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
fail("Should throw an IllegalArgumentException because "
+ "zookeeper.znode.parent is missing leading '/'.");
@ -174,7 +174,7 @@ public abstract class TestReplicationStateBasic {
}
try {
rp.addPeer(ID_ONE,
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
} catch (IllegalArgumentException e) {
@ -182,7 +182,7 @@ public abstract class TestReplicationStateBasic {
}
try {
rp.addPeer(ID_ONE,
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
fail("Should throw an IllegalArgumentException because "
+ "hbase.zookeeper.property.clientPort is missing.");
@ -203,7 +203,7 @@ public abstract class TestReplicationStateBasic {
files1.add("file_3");
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rq1.addPeerToHFileRefs(ID_ONE);
rq1.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
@ -216,7 +216,7 @@ public abstract class TestReplicationStateBasic {
files2.add(removedString);
rq1.removeHFileRefs(ID_ONE, files2);
assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
rp.removePeer(ID_ONE);
rp.unregisterPeer(ID_ONE);
}
@Test
@ -225,9 +225,9 @@ public abstract class TestReplicationStateBasic {
rqc.init();
rp.init();
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rq1.addPeerToHFileRefs(ID_ONE);
rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
rq1.addPeerToHFileRefs(ID_TWO);
List<String> files1 = new ArrayList<String>(3);
@ -240,13 +240,13 @@ public abstract class TestReplicationStateBasic {
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
rp.removePeer(ID_ONE);
rp.unregisterPeer(ID_ONE);
rq1.removePeerFromHFileRefs(ID_ONE);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
rp.removePeer(ID_TWO);
rp.unregisterPeer(ID_TWO);
rq1.removePeerFromHFileRefs(ID_TWO);
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_TWO));
@ -258,7 +258,7 @@ public abstract class TestReplicationStateBasic {
// Test methods with non-existent peer ids
try {
rp.removePeer("bogus");
rp.unregisterPeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
@ -277,16 +277,16 @@ public abstract class TestReplicationStateBasic {
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
assertFalse(rp.peerAdded("bogus"));
rp.peerRemoved("bogus");
assertFalse(rp.peerConnected("bogus"));
rp.peerDisconnected("bogus");
assertNull(rp.getPeerConf("bogus"));
assertNumberOfPeers(0);
// Add some peers
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
assertNumberOfPeers(1);
rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
assertNumberOfPeers(2);
// Test methods with a peer that is added but not connected
@ -296,13 +296,13 @@ public abstract class TestReplicationStateBasic {
} catch (IllegalArgumentException e) {
}
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
rp.removePeer(ID_ONE);
rp.peerRemoved(ID_ONE);
rp.unregisterPeer(ID_ONE);
rp.peerDisconnected(ID_ONE);
assertNumberOfPeers(1);
// Add one peer
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rp.peerAdded(ID_ONE);
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rp.peerConnected(ID_ONE);
assertNumberOfPeers(2);
assertTrue(rp.getStatusOfPeer(ID_ONE));
rp.disablePeer(ID_ONE);
@ -311,7 +311,7 @@ public abstract class TestReplicationStateBasic {
assertConnectedPeerStatus(true, ID_ONE);
// Disconnect peer
rp.peerRemoved(ID_ONE);
rp.peerDisconnected(ID_ONE);
assertNumberOfPeers(2);
try {
rp.getStatusOfPeer(ID_ONE);
@ -361,7 +361,7 @@ public abstract class TestReplicationStateBasic {
rq3.addLog("qId" + i, "filename" + j);
}
//Add peers for the corresponding queues so they are not orphans
rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
}
}
}

View File

@ -204,9 +204,9 @@ public class TestReplicationStateHBaseImpl {
@Test
public void TestMultipleReplicationQueuesHBaseImpl () {
try {
rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
rp.registerPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
rp.registerPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
rp.registerPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
} catch (ReplicationException e) {
fail("Failed to add peers to ReplicationPeers");
}

View File

@ -147,9 +147,9 @@ public class TestReplicationTrackerZKImpl {
@Test(timeout = 30000)
public void testPeerRemovedEvent() throws Exception {
rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rt.registerListener(new DummyReplicationListener());
rp.removePeer("5");
rp.unregisterPeer("5");
// wait for event
while (peerRemovedCount.get() < 1) {
Thread.sleep(5);
@ -160,7 +160,7 @@ public class TestReplicationTrackerZKImpl {
@Test(timeout = 30000)
public void testPeerListChangedEvent() throws Exception {
// add a peer
rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
rt.registerListener(new DummyReplicationListener());
rp.disablePeer("5");
@ -177,23 +177,23 @@ public class TestReplicationTrackerZKImpl {
// clean up
//ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
rp.removePeer("5");
rp.unregisterPeer("5");
}
@Test(timeout = 30000)
public void testPeerNameControl() throws Exception {
int exists = 0;
int hyphen = 0;
rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
try{
rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
}catch(IllegalArgumentException e){
exists++;
}
try{
rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
}catch(IllegalArgumentException e){
hyphen++;
}
@ -201,7 +201,7 @@ public class TestReplicationTrackerZKImpl {
assertEquals(1, hyphen);
// clean up
rp.removePeer("6");
rp.unregisterPeer("6");
}
private class DummyReplicationListener implements ReplicationListener {
@ -217,7 +217,7 @@ public class TestReplicationTrackerZKImpl {
public void peerRemoved(String peerId) {
peerRemovedData = peerId;
peerRemovedCount.getAndIncrement();
LOG.debug("Received peerRemoved event: " + peerId);
LOG.debug("Received peerDisconnected event: " + peerId);
}
@Override