HBASE-11535 ReplicationPeer map is not thread safe (Virag Kothari)

This commit is contained in:
Jonathan M Hsieh 2014-08-06 11:16:53 -07:00
parent 2c84b6e17c
commit 6dee406bf3
1 changed files with 11 additions and 7 deletions

View File

@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -82,7 +84,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
Abortable abortable) { Abortable abortable) {
super(zk, conf, abortable); super(zk, conf, abortable);
this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
this.peerClusters = new HashMap<String, ReplicationPeerZKImpl>(); this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
} }
@Override @Override
@ -187,18 +189,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
@Override @Override
public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException { public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException {
if (!this.peerClusters.containsKey(id)) { ReplicationPeer replicationPeer = this.peerClusters.get(id);
if (replicationPeer == null) {
throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
} }
return this.peerClusters.get(id).getTableCFs(); return replicationPeer.getTableCFs();
} }
@Override @Override
public boolean getStatusOfPeer(String id) { public boolean getStatusOfPeer(String id) {
if (!this.peerClusters.containsKey(id)) { ReplicationPeer replicationPeer = this.peerClusters.get(id);
if (replicationPeer == null) {
throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
} }
return this.peerClusters.get(id).getPeerState() == PeerState.ENABLED; return replicationPeer.getPeerState() == PeerState.ENABLED;
} }
@Override @Override
@ -359,7 +363,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
public void peerRemoved(String peerId) { public void peerRemoved(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId); ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) { if (rp != null) {
this.peerClusters.remove(peerId); ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
} }
} }
@ -385,7 +389,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
if (peer == null) { if (peer == null) {
return false; return false;
} }
this.peerClusters.put(peerId, peer); ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey()); LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey());
return true; return true;
} }