diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 488d37a4743..af028fbaad4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -82,7 +84,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re Abortable abortable) { super(zk, conf, abortable); this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); - this.peerClusters = new HashMap(); + this.peerClusters = new ConcurrentHashMap(); } @Override @@ -187,18 +189,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public Map> getTableCFs(String id) throws IllegalArgumentException { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); } - return this.peerClusters.get(id).getTableCFs(); + return replicationPeer.getTableCFs(); } @Override public boolean getStatusOfPeer(String id) { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); } - return this.peerClusters.get(id).getPeerState() == PeerState.ENABLED; + return replicationPeer.getPeerState() == PeerState.ENABLED; } @Override @@ -359,7 +363,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re public void peerRemoved(String peerId) { ReplicationPeer rp = this.peerClusters.get(peerId); if (rp != null) { - this.peerClusters.remove(peerId); + ((ConcurrentMap) peerClusters).remove(peerId, rp); } } @@ -385,7 +389,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re if (peer == null) { return false; } - this.peerClusters.put(peerId, peer); + ((ConcurrentMap) peerClusters).putIfAbsent(peerId, peer); LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey()); return true; }