HBASE-17653 HBASE-17624 rsgroup synchronizations will (distributed) deadlock

This patch restores the regime instituted in original rsgroups patch
(HBASE-6721) where reading of rsgroup state runs unimpeded against
COW immutable Maps whereas mutation to state require exclusive locks
(updating the Maps of state when done). HBASE-17624 was
over-enthusiastic with its locking down of access making it likely
we'd deadlock.

Adds documentation on concurrency expectations.
This commit is contained in:
Michael Stack 2017-02-16 16:06:11 -08:00
parent d7325185ad
commit b392de3e31
5 changed files with 126 additions and 109 deletions

View File

@ -75,11 +75,15 @@ public class RSGroupAdminServer implements RSGroupAdmin {
@Override @Override
public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
// We are reading across two Maps in the below with out synchronizing across
// them; should be safe most of the time.
String groupName = getRSGroupInfoManager().getRSGroupOfTable(tableName); String groupName = getRSGroupInfoManager().getRSGroupOfTable(tableName);
return groupName == null? null: getRSGroupInfoManager().getRSGroup(groupName); return groupName == null? null: getRSGroupInfoManager().getRSGroup(groupName);
} }
private void checkOnlineServersOnly(Set<Address> servers) throws ConstraintException { private void checkOnlineServersOnly(Set<Address> servers) throws ConstraintException {
// This uglyness is because we only have Address, not ServerName.
// Online servers are keyed by ServerName.
Set<Address> onlineServers = new HashSet<Address>(); Set<Address> onlineServers = new HashSet<Address>();
for(ServerName server: master.getServerManager().getOnlineServers().keySet()) { for(ServerName server: master.getServerManager().getOnlineServers().keySet()) {
onlineServers.add(server.getAddress()); onlineServers.add(server.getAddress());
@ -152,29 +156,29 @@ public class RSGroupAdminServer implements RSGroupAdmin {
} }
RSGroupInfo targetGrp = getAndCheckRSGroupInfo(targetGroupName); RSGroupInfo targetGrp = getAndCheckRSGroupInfo(targetGroupName);
RSGroupInfoManager manager = getRSGroupInfoManager(); RSGroupInfoManager manager = getRSGroupInfoManager();
// Lock the manager during the below manipulations. // Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (manager) { synchronized (manager) {
if (master.getMasterCoprocessorHost() != null) { if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName); master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName);
} }
// Presume first server is the source group. Later we check all servers are from // Presume first server's source group. Later ensure all servers are from this group.
// this same group.
Address firstServer = servers.iterator().next(); Address firstServer = servers.iterator().next();
RSGroupInfo srcGrp = manager.getRSGroupOfServer(firstServer); RSGroupInfo srcGrp = manager.getRSGroupOfServer(firstServer);
if (srcGrp == null) { if (srcGrp == null) {
// Be careful. This message is tested for in TestRSGroupsBase... // Be careful. This exception message is tested for in TestRSGroupsBase...
throw new ConstraintException("Source RSGroup for server " + firstServer + " does not exist."); throw new ConstraintException("Source RSGroup for server " + firstServer + " does not exist.");
} }
if (srcGrp.getName().equals(targetGroupName)) { if (srcGrp.getName().equals(targetGroupName)) {
throw new ConstraintException( "Target RSGroup " + targetGroupName + throw new ConstraintException( "Target RSGroup " + targetGroupName +
" is same as source " + srcGrp + " RSGroup."); " is same as source " + srcGrp + " RSGroup.");
} }
// Only move online servers (when from 'default') or servers from other groups. // Only move online servers (when moving from 'default') or servers from other
// This prevents bogus servers from entering groups // groups. This prevents bogus servers from entering groups
if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) { if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) {
checkOnlineServersOnly(servers); checkOnlineServersOnly(servers);
} }
// Check all servers are of same rsgroup. // Ensure all servers are of same rsgroup.
for (Address server: servers) { for (Address server: servers) {
String tmpGroup = manager.getRSGroupOfServer(server).getName(); String tmpGroup = manager.getRSGroupOfServer(server).getName();
if (!tmpGroup.equals(srcGrp.getName())) { if (!tmpGroup.equals(srcGrp.getName())) {
@ -189,8 +193,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// MovedServers may be < passed in 'servers'. // MovedServers may be < passed in 'servers'.
Set<Address> movedServers = manager.moveServers(servers, srcGrp.getName(), targetGroupName); Set<Address> movedServers = manager.moveServers(servers, srcGrp.getName(), targetGroupName);
// Appy makes note that if we were passed in a List of servers,
// we'd save having to do stuff like the below.
List<Address> editableMovedServers = Lists.newArrayList(movedServers); List<Address> editableMovedServers = Lists.newArrayList(movedServers);
boolean foundRegionsToUnassign; boolean foundRegionsToUnassign;
do { do {
@ -202,9 +204,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Unassign regions for a server // Unassign regions for a server
// TODO: This is problematic especially if hbase:meta is in the mix. // TODO: This is problematic especially if hbase:meta is in the mix.
// We need to update state in hbase:meta and if unassigned we hang // We need to update state in hbase:meta on Master and if unassigned we hang
// around in here. There is a silly sort on linked list done above // around in here. There is a silly sort on linked list done above
// in getRegions putting hbase:meta last which helps but probably holes. // in getRegions putting hbase:meta last which helps but probably has holes.
LOG.info("Unassigning " + regions.size() + LOG.info("Unassigning " + regions.size() +
" region(s) from " + rs + " for server move to " + targetGroupName); " region(s) from " + rs + " for server move to " + targetGroupName);
if (!regions.isEmpty()) { if (!regions.isEmpty()) {
@ -253,12 +255,12 @@ public class RSGroupAdminServer implements RSGroupAdmin {
return; return;
} }
RSGroupInfoManager manager = getRSGroupInfoManager(); RSGroupInfoManager manager = getRSGroupInfoManager();
// Lock the manager during below machinations. // Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (manager) { synchronized (manager) {
if (master.getMasterCoprocessorHost() != null) { if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup); master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup);
} }
if(targetGroup != null) { if(targetGroup != null) {
RSGroupInfo destGroup = manager.getRSGroup(targetGroup); RSGroupInfo destGroup = manager.getRSGroup(targetGroup);
if(destGroup == null) { if(destGroup == null) {
@ -315,22 +317,23 @@ public class RSGroupAdminServer implements RSGroupAdmin {
@Override @Override
public void removeRSGroup(String name) throws IOException { public void removeRSGroup(String name) throws IOException {
RSGroupInfoManager manager = getRSGroupInfoManager(); RSGroupInfoManager manager = getRSGroupInfoManager();
// Hold lock across coprocessor calls. // Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (manager) { synchronized (manager) {
if (master.getMasterCoprocessorHost() != null) { if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preRemoveRSGroup(name); master.getMasterCoprocessorHost().preRemoveRSGroup(name);
} }
RSGroupInfo RSGroupInfo = manager.getRSGroup(name); RSGroupInfo rsgi = manager.getRSGroup(name);
if (RSGroupInfo == null) { if (rsgi == null) {
throw new ConstraintException("RSGroup " + name + " does not exist"); throw new ConstraintException("RSGroup " + name + " does not exist");
} }
int tableCount = RSGroupInfo.getTables().size(); int tableCount = rsgi.getTables().size();
if (tableCount > 0) { if (tableCount > 0) {
throw new ConstraintException("RSGroup " + name + " has " + tableCount + throw new ConstraintException("RSGroup " + name + " has " + tableCount +
" tables; you must remove these tables from the rsgroup before " + " tables; you must remove these tables from the rsgroup before " +
"the rsgroup can be removed."); "the rsgroup can be removed.");
} }
int serverCount = RSGroupInfo.getServers().size(); int serverCount = rsgi.getServers().size();
if (serverCount > 0) { if (serverCount > 0) {
throw new ConstraintException("RSGroup " + name + " has " + serverCount + throw new ConstraintException("RSGroup " + name + " has " + serverCount +
" servers; you must remove these servers from the RSGroup before" + " servers; you must remove these servers from the RSGroup before" +
@ -465,7 +468,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
Map<ServerName, List<HRegionInfo>> serverMap = Maps.newHashMap(); Map<ServerName, List<HRegionInfo>> serverMap = Maps.newHashMap();
for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) { for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) {
if(RSGroupInfo.getServers().contains(serverName.getAddress())) { if(RSGroupInfo.getServers().contains(serverName.getAddress())) {
serverMap.put(serverName, Collections.EMPTY_LIST); serverMap.put(serverName, Collections.emptyList());
} }
} }

View File

@ -79,15 +79,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
private Configuration config; private Configuration config;
private ClusterStatus clusterStatus; private ClusterStatus clusterStatus;
private MasterServices masterServices; private MasterServices masterServices;
// Synchronize on access until we take the time to cmoe up with a finer-grained
// locking regime.
private volatile RSGroupInfoManager rsGroupInfoManager; private volatile RSGroupInfoManager rsGroupInfoManager;
private LoadBalancer internalBalancer; private LoadBalancer internalBalancer;
//used during reflection by LoadBalancerFactory //used during reflection by LoadBalancerFactory
@InterfaceAudience.Private @InterfaceAudience.Private
public RSGroupBasedLoadBalancer() { public RSGroupBasedLoadBalancer() {}
}
//This constructor should only be used for unit testing //This constructor should only be used for unit testing
@InterfaceAudience.Private @InterfaceAudience.Private
@ -323,9 +320,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
if (assignedServer != null && if (assignedServer != null &&
(info == null || !info.containsServer(assignedServer.getAddress()))) { (info == null || !info.containsServer(assignedServer.getAddress()))) {
RSGroupInfo otherInfo = null; RSGroupInfo otherInfo = null;
synchronized (this.rsGroupInfoManager) { otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress());
otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress());
}
LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() + LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() +
" on server: " + assignedServer + " on server: " + assignedServer +
" found in group: " + otherInfo + " found in group: " + otherInfo +

View File

@ -91,16 +91,22 @@ import com.google.protobuf.ServiceException;
* for bootstrapping during offline mode. * for bootstrapping during offline mode.
* *
* <h2>Concurrency</h2> * <h2>Concurrency</h2>
* All methods are synchronized to protect against concurrent access on contained * RSGroup state is kept locally in Maps. There is a rsgroup name to cached
* Maps and so as only one writer at a time to the backing zookeeper cache and rsgroup table. * RSGroupInfo Map at this.rsGroupMap and a Map of tables to the name of the
* rsgroup they belong too (in this.tableMap). These Maps are persisted to the
* hbase:rsgroup table (and cached in zk) on each modification.
*
* <p>Mutations on state are synchronized but so reads can continue without having
* to wait on an instance monitor, mutations do wholesale replace of the Maps on
* update -- Copy-On-Write; the local Maps of state are read-only, just-in-case
* (see flushConfig).
*
* <p>Reads must not block else there is a danger we'll deadlock.
* *
* <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and * <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and
* then act on the results of the query modifying cache in zookeeper without another thread * then act on the results of the query modifying cache in zookeeper without another thread
* making intermediate modifications. These clients synchronize on the 'this' instance so * making intermediate modifications. These clients synchronize on the 'this' instance so
* no other has access concurrently. * no other has access concurrently. Reads must be able to continue concurrently.
*
* TODO: Spend time cleaning up this coarse locking that is prone to error if not carefully
* enforced everywhere.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener { public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener {
@ -121,38 +127,37 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
} }
} }
private Map<String, RSGroupInfo> rsGroupMap; // There two Maps are immutable and wholesale replaced on each modification
private Map<TableName, String> tableMap; // so are safe to access concurrently. See class comment.
private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
private volatile Map<TableName, String> tableMap = Collections.emptyMap();
private final MasterServices master; private final MasterServices master;
private Table rsGroupTable; private Table rsGroupTable;
private final ClusterConnection conn; private final ClusterConnection conn;
private final ZooKeeperWatcher watcher; private final ZooKeeperWatcher watcher;
private RSGroupStartupWorker rsGroupStartupWorker; private RSGroupStartupWorker rsGroupStartupWorker;
// contains list of groups that were last flushed to persistent store // contains list of groups that were last flushed to persistent store
private Set<String> prevRSGroups; private Set<String> prevRSGroups = new HashSet<String>();
private final RSGroupSerDe rsGroupSerDe; private final RSGroupSerDe rsGroupSerDe = new RSGroupSerDe();
private DefaultServerUpdater defaultServerUpdater; private DefaultServerUpdater defaultServerUpdater;
private boolean init = false; private boolean init = false;
public RSGroupInfoManagerImpl(MasterServices master) throws IOException { public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
this.rsGroupMap = Collections.emptyMap();
this.tableMap = Collections.emptyMap();
rsGroupSerDe = new RSGroupSerDe();
this.master = master; this.master = master;
this.watcher = master.getZooKeeper(); this.watcher = master.getZooKeeper();
this.conn = master.getClusterConnection(); this.conn = master.getClusterConnection();
prevRSGroups = new HashSet<String>();
} }
public synchronized void init() throws IOException{ public synchronized void init() throws IOException{
if (this.init) return;
rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn); rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn);
refresh(); refresh();
rsGroupStartupWorker.start(); rsGroupStartupWorker.start();
defaultServerUpdater = new DefaultServerUpdater(this); defaultServerUpdater = new DefaultServerUpdater(this);
master.getServerManager().registerListener(this); master.getServerManager().registerListener(this);
defaultServerUpdater.start(); defaultServerUpdater.start();
init = true; this.init = true;
} }
synchronized boolean isInit() { synchronized boolean isInit() {
@ -196,14 +201,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
RSGroupInfo src = getRSGroupInfo(srcGroup); RSGroupInfo src = getRSGroupInfo(srcGroup);
RSGroupInfo dst = getRSGroupInfo(dstGroup); RSGroupInfo dst = getRSGroupInfo(dstGroup);
// If destination is 'default' rsgroup, only add servers that are online. If not online, drop it. // If destination is 'default' rsgroup, only add servers that are online. If not online, drop it.
// If not 'default' group, add server to dst group EVEN IF IT IS NOT online (could be a group // If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a rsgroup
// of dead servers that are to come back later). // of dead servers that are to come back later).
Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)? Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)?
getOnlineServers(this.master): null; getOnlineServers(this.master): null;
for (Address el: servers) { for (Address el: servers) {
src.removeServer(el); src.removeServer(el);
if (onlineServers != null) { if (onlineServers != null) {
// onlineServers is non-null if 'default' rsgroup. If the server is not online, drop it.
if (!onlineServers.contains(el)) { if (!onlineServers.contains(el)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online"); LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
@ -227,9 +231,9 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
* @return An instance of GroupInfo. * @return An instance of GroupInfo.
*/ */
@Override @Override
public synchronized RSGroupInfo getRSGroupOfServer(Address hostPort) public RSGroupInfo getRSGroupOfServer(Address hostPort)
throws IOException { throws IOException {
for (RSGroupInfo info : rsGroupMap.values()) { for (RSGroupInfo info: rsGroupMap.values()) {
if (info.containsServer(hostPort)) { if (info.containsServer(hostPort)) {
return info; return info;
} }
@ -245,14 +249,14 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
* @return An instance of GroupInfo * @return An instance of GroupInfo
*/ */
@Override @Override
public synchronized RSGroupInfo getRSGroup(String groupName) throws IOException { public RSGroupInfo getRSGroup(String groupName) throws IOException {
return this.rsGroupMap.get(groupName); return this.rsGroupMap.get(groupName);
} }
@Override @Override
public synchronized String getRSGroupOfTable(TableName tableName) throws IOException { public String getRSGroupOfTable(TableName tableName) throws IOException {
return tableMap.get(tableName); return tableMap.get(tableName);
} }
@ -298,12 +302,12 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
} }
@Override @Override
public synchronized List<RSGroupInfo> listRSGroups() throws IOException { public List<RSGroupInfo> listRSGroups() throws IOException {
return Lists.newLinkedList(rsGroupMap.values()); return Lists.newLinkedList(rsGroupMap.values());
} }
@Override @Override
public synchronized boolean isOnline() { public boolean isOnline() {
return rsGroupStartupWorker.isOnline(); return rsGroupStartupWorker.isOnline();
} }
@ -312,10 +316,16 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
refresh(false); refresh(false);
} }
/**
* Read rsgroup info from the source of truth, the hbase:rsgroup table.
* Update zk cache. Called on startup of the manager.
* @param forceOnline
* @throws IOException
*/
private synchronized void refresh(boolean forceOnline) throws IOException { private synchronized void refresh(boolean forceOnline) throws IOException {
List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>(); List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>();
// overwrite anything read from zk, group table is source of truth // Overwrite anything read from zk, group table is source of truth
// if online read from GROUP table // if online read from GROUP table
if (forceOnline || isOnline()) { if (forceOnline || isOnline()) {
LOG.debug("Refreshing in Online mode."); LOG.debug("Refreshing in Online mode.");
@ -324,7 +334,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
} }
groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable)); groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable));
} else { } else {
LOG.debug("Refershing in Offline mode."); LOG.debug("Refreshing in Offline mode.");
String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode);
groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath)); groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath));
} }
@ -347,20 +357,18 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
} }
for(TableName table : specialTables) { for (TableName table : specialTables) {
orphanTables.add(table); orphanTables.add(table);
} }
for(RSGroupInfo group: groupList) { for (RSGroupInfo group: groupList) {
if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
orphanTables.removeAll(group.getTables()); orphanTables.removeAll(group.getTables());
} }
} }
// This is added to the last of the list // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
// so it overwrites the default group loaded
// from region group table or zk // from region group table or zk
groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(),
Sets.newTreeSet(getDefaultServers()),
orphanTables)); orphanTables));
// populate the data // populate the data
@ -372,11 +380,8 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
newTableMap.put(table, group.getName()); newTableMap.put(table, group.getName());
} }
} }
rsGroupMap = Collections.unmodifiableMap(newGroupMap); installNewMaps(newGroupMap, newTableMap);
tableMap = Collections.unmodifiableMap(newTableMap); updateCacheOfRSGroups(rsGroupMap.keySet());
prevRSGroups.clear();
prevRSGroups.addAll(rsGroupMap.keySet());
} }
private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap) private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap)
@ -408,12 +413,14 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
} }
return newTableMap; return newTableMap;
} }
private synchronized void flushConfig() throws IOException {
flushConfig(rsGroupMap); private synchronized void flushConfig()
throws IOException {
flushConfig(this.rsGroupMap);
} }
// Called from RSGroupStartupWorker thread so synchronize private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap)
private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { throws IOException {
Map<TableName, String> newTableMap; Map<TableName, String> newTableMap;
// For offline mode persistence is still unavailable // For offline mode persistence is still unavailable
@ -433,11 +440,8 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
newTableMap = flushConfigTable(newGroupMap); newTableMap = flushConfigTable(newGroupMap);
// make changes visible since it has been // Make changes visible after having been persisted to the source of truth
// persisted in the source of truth installNewMaps(newGroupMap, newTableMap);
rsGroupMap = Collections.unmodifiableMap(newGroupMap);
tableMap = Collections.unmodifiableMap(newTableMap);
try { try {
String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode);
@ -469,9 +473,28 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
master.abort("Failed to write to rsGroupZNode", e); master.abort("Failed to write to rsGroupZNode", e);
throw new IOException("Failed to write to rsGroupZNode",e); throw new IOException("Failed to write to rsGroupZNode",e);
} }
updateCacheOfRSGroups(newGroupMap.keySet());
}
prevRSGroups.clear(); /**
prevRSGroups.addAll(newGroupMap.keySet()); * Make changes visible.
* Caller must be synchronized on 'this'.
*/
private void installNewMaps(Map<String, RSGroupInfo> newRSGroupMap,
Map<TableName, String> newTableMap) {
// Make maps Immutable.
this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
this.tableMap = Collections.unmodifiableMap(newTableMap);
}
/**
* Update cache of rsgroups.
* Caller must be synchronized on 'this'.
* @param currentGroups Current list of Groups.
*/
private void updateCacheOfRSGroups(final Set<String> currentGroups) {
this.prevRSGroups.clear();
this.prevRSGroups.addAll(currentGroups);
} }
// Called by getDefaultServers. Presume it has lock in place. // Called by getDefaultServers. Presume it has lock in place.
@ -494,18 +517,19 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
// Called by DefaultServerUpdater. Presume it has lock on this manager when it runs. // Called by DefaultServerUpdater. Presume it has lock on this manager when it runs.
private SortedSet<Address> getDefaultServers() throws IOException { private SortedSet<Address> getDefaultServers() throws IOException {
SortedSet<Address> defaultServers = Sets.newTreeSet(); SortedSet<Address> defaultServers = Sets.newTreeSet();
for (ServerName server : getOnlineRS()) { for (ServerName serverName : getOnlineRS()) {
Address hostPort = Address.fromParts(server.getHostname(), server.getPort()); Address server =
Address.fromParts(serverName.getHostname(), serverName.getPort());
boolean found = false; boolean found = false;
for(RSGroupInfo RSGroupInfo: listRSGroups()) { for(RSGroupInfo rsgi: listRSGroups()) {
if(!RSGroupInfo.DEFAULT_GROUP.equals(RSGroupInfo.getName()) && if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) &&
RSGroupInfo.containsServer(hostPort)) { rsgi.containsServer(server)) {
found = true; found = true;
break; break;
} }
} }
if (!found) { if (!found) {
defaultServers.add(hostPort); defaultServers.add(server);
} }
} }
return defaultServers; return defaultServers;
@ -533,10 +557,12 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
defaultServerUpdater.serverChanged(); defaultServerUpdater.serverChanged();
} }
// TODO: Why do we need this extra thread? Why can't we just go
// fetch at balance time or admin time?
private static class DefaultServerUpdater extends Thread { private static class DefaultServerUpdater extends Thread {
private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class); private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
private final RSGroupInfoManagerImpl mgr; private final RSGroupInfoManagerImpl mgr;
private boolean hasChanged = false; private boolean changed = false;
public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) { public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
super("RSGroup.ServerUpdater"); super("RSGroup.ServerUpdater");
@ -550,20 +576,18 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
while(isMasterRunning(this.mgr.master)) { while(isMasterRunning(this.mgr.master)) {
try { try {
LOG.info("Updating default servers."); LOG.info("Updating default servers.");
synchronized (this.mgr) { SortedSet<Address> servers = mgr.getDefaultServers();
SortedSet<Address> servers = mgr.getDefaultServers(); if (!servers.equals(prevDefaultServers)) {
if (!servers.equals(prevDefaultServers)) { mgr.updateDefaultServers(servers);
mgr.updateDefaultServers(servers); prevDefaultServers = servers;
prevDefaultServers = servers; LOG.info("Updated with servers: " + servers.size());
LOG.info("Updated with servers: "+servers.size());
}
} }
try { try {
synchronized (this) { synchronized (this) {
if(!hasChanged) { if(!changed) {
wait(); wait();
} }
hasChanged = false; changed = false;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Interrupted", e); LOG.warn("Interrupted", e);
@ -576,7 +600,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
public void serverChanged() { public void serverChanged() {
synchronized (this) { synchronized (this) {
hasChanged = true; changed = true;
this.notify(); this.notify();
} }
} }
@ -736,7 +760,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
} }
tries--; tries--;
} }
if(tries <= 0) { if (tries <= 0) {
throw new IOException("Failed to create group table in a given time."); throw new IOException("Failed to create group table in a given time.");
} else { } else {
ProcedureInfo result = masterServices.getMasterProcedureExecutor().getResult(procId); ProcedureInfo result = masterServices.getMasterProcedureExecutor().getResult(procId);
@ -777,8 +801,8 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
} }
private void checkGroupName(String groupName) throws ConstraintException { private void checkGroupName(String groupName) throws ConstraintException {
if(!groupName.matches("[a-zA-Z0-9_]+")) { if (!groupName.matches("[a-zA-Z0-9_]+")) {
throw new ConstraintException("Group name should only contain alphanumeric characters"); throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
} }
} }
} }

View File

@ -60,7 +60,7 @@ public class TestRSGroups extends TestRSGroupsBase {
protected static final Log LOG = LogFactory.getLog(TestRSGroups.class); protected static final Log LOG = LogFactory.getLog(TestRSGroups.class);
private static HMaster master; private static HMaster master;
private static boolean INIT = false; private static boolean INIT = false;
private static RSGroupAdminEndpoint RSGroupAdminEndpoint; private static RSGroupAdminEndpoint rsGroupAdminEndpoint;
@BeforeClass @BeforeClass
@ -93,7 +93,7 @@ public class TestRSGroups extends TestRSGroupsBase {
admin.setBalancerRunning(false,true); admin.setBalancerRunning(false,true);
rsGroupAdmin = new VerifyingRSGroupAdminClient( rsGroupAdmin = new VerifyingRSGroupAdminClient(
new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration()); new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration());
RSGroupAdminEndpoint = rsGroupAdminEndpoint =
master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0); master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0);
} }
@ -236,11 +236,8 @@ public class TestRSGroups extends TestRSGroupsBase {
@Test @Test
public void testGroupInfoMultiAccessing() throws Exception { public void testGroupInfoMultiAccessing() throws Exception {
RSGroupInfoManager manager = RSGroupAdminEndpoint.getGroupInfoManager(); RSGroupInfoManager manager = rsGroupAdminEndpoint.getGroupInfoManager();
RSGroupInfo defaultGroup = null; RSGroupInfo defaultGroup = manager.getRSGroup("default");
synchronized (manager) {
defaultGroup = manager.getRSGroup("default");
}
// getRSGroup updates default group's server list // getRSGroup updates default group's server list
// this process must not affect other threads iterating the list // this process must not affect other threads iterating the list
Iterator<Address> it = defaultGroup.getServers().iterator(); Iterator<Address> it = defaultGroup.getServers().iterator();
@ -258,7 +255,7 @@ public class TestRSGroups extends TestRSGroupsBase {
TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15); TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
RSGroupAdminEndpoint.getGroupInfoManager() rsGroupAdminEndpoint.getGroupInfoManager()
.moveTables(Sets.newHashSet(tableName), RSGroupInfo.getName()); .moveTables(Sets.newHashSet(tableName), RSGroupInfo.getName());
assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName())); assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName()));

View File

@ -162,13 +162,11 @@ public class TestRSGroupsOfflineMode {
RSGroupInfoManager groupMgr = RSGroupAdminEndpoint.getGroupInfoManager(); RSGroupInfoManager groupMgr = RSGroupAdminEndpoint.getGroupInfoManager();
//make sure balancer is in offline mode, since this is what we're testing //make sure balancer is in offline mode, since this is what we're testing
synchronized (groupMgr) { assertFalse(groupMgr.isOnline());
assertFalse(groupMgr.isOnline()); //verify the group affiliation that's loaded from ZK instead of tables
//verify the group affiliation that's loaded from ZK instead of tables assertEquals(newGroup,
assertEquals(newGroup,
groupMgr.getRSGroupOfTable(RSGroupInfoManager.RSGROUP_TABLE_NAME)); groupMgr.getRSGroupOfTable(RSGroupInfoManager.RSGROUP_TABLE_NAME));
assertEquals(RSGroupInfo.DEFAULT_GROUP, groupMgr.getRSGroupOfTable(failoverTable)); assertEquals(RSGroupInfo.DEFAULT_GROUP, groupMgr.getRSGroupOfTable(failoverTable));
}
//kill final regionserver to see the failover happens for all tables //kill final regionserver to see the failover happens for all tables
//except GROUP table since it's group does not have any online RS //except GROUP table since it's group does not have any online RS
killRS.stop("die"); killRS.stop("die");