diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 6f54c319f92..956b4488c90 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -29,32 +29,29 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -63,6 +60,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
+import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -72,6 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@@ -84,50 +83,44 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-
/**
- * This is an implementation of {@link RSGroupInfoManager} which makes
- * use of an HBase table as the persistence store for the group information.
- * It also makes use of zookeeper to store group information needed
- * for bootstrapping during offline mode.
- *
- *
Concurrency
- * RSGroup state is kept locally in Maps. There is a rsgroup name to cached
- * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the
- * rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the
- * hbase:rsgroup table (and cached in zk) on each modification.
- *
- * Mutations on state are synchronized but reads can continue without having
- * to wait on an instance monitor, mutations do wholesale replace of the Maps on
- * update -- Copy-On-Write; the local Maps of state are read-only, just-in-case
- * (see flushConfig).
- *
- *
Reads must not block else there is a danger we'll deadlock.
- *
- *
Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and
- * then act on the results of the query modifying cache in zookeeper without another thread
- * making intermediate modifications. These clients synchronize on the 'this' instance so
- * no other has access concurrently. Reads must be able to continue concurrently.
+ * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
+ * persistence store for the group information. It also makes use of zookeeper to store group
+ * information needed for bootstrapping during offline mode.
+ *
Concurrency
RSGroup state is kept locally in Maps. There is a rsgroup name to cached
+ * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
+ * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
+ * zk) on each modification.
+ *
+ * Mutations on state are synchronized but reads can continue without having to wait on an instance
+ * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
+ * state are read-only, just-in-case (see flushConfig).
+ *
+ * Reads must not block else there is a danger we'll deadlock.
+ *
+ * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
+ * on the results of the query modifying cache in zookeeper without another thread making
+ * intermediate modifications. These clients synchronize on the 'this' instance so no other has
+ * access concurrently. Reads must be able to continue concurrently.
*/
@InterfaceAudience.Private
final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
/** Table descriptor for hbase:rsgroup
catalog table */
- private final static HTableDescriptor RSGROUP_TABLE_DESC;
+ private static final TableDescriptor RSGROUP_TABLE_DESC;
static {
- RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME);
- RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
- RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
+ .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
try {
- RSGROUP_TABLE_DESC.addCoprocessor(
- MultiRowMutationEndpoint.class.getName(),
- null, Coprocessor.PRIORITY_SYSTEM, null);
+ builder.setCoprocessor(
+ CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
+ .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
} catch (IOException ex) {
- throw new RuntimeException(ex);
+ throw new Error(ex);
}
+ RSGROUP_TABLE_DESC = builder.build();
}
// There two Maps are immutable and wholesale replaced on each modification
@@ -136,23 +129,24 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private volatile Map tableMap = Collections.emptyMap();
private final MasterServices masterServices;
- private Table rsGroupTable;
- private final ClusterConnection conn;
+ private final Connection conn;
private final ZKWatcher watcher;
- private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker();
+ private final RSGroupStartupWorker rsGroupStartupWorker;
// contains list of groups that were last flushed to persistent store
private Set prevRSGroups = new HashSet<>();
private final ServerEventsListenerThread serverEventsListenerThread =
- new ServerEventsListenerThread();
+ new ServerEventsListenerThread();
private FailedOpenUpdaterThread failedOpenUpdaterThread;
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
this.watcher = masterServices.getZooKeeper();
- this.conn = masterServices.getClusterConnection();
+ this.conn = masterServices.getConnection();
+ this.rsGroupStartupWorker = new RSGroupStartupWorker();
}
- private synchronized void init() throws IOException{
+
+ private synchronized void init() throws IOException {
refresh();
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
@@ -167,7 +161,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
return instance;
}
- public void start(){
+ public void start() {
// create system table of rsgroup
rsGroupStartupWorker.start();
}
@@ -176,8 +170,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
checkGroupName(rsGroupInfo.getName());
if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
- rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
- throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
+ rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
+ throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
}
Map newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
@@ -192,6 +186,22 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
return rsGroupInfo;
}
+ /**
+ * @param master the master to get online servers for
+ * @return Set of online Servers named for their hostname and port (not ServerName).
+ */
+ private static Set getOnlineServers(final MasterServices master) {
+ Set onlineServers = new HashSet();
+ if (master == null) {
+ return onlineServers;
+ }
+
+ for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
+ onlineServers.add(server.getAddress());
+ }
+ return onlineServers;
+ }
+
@Override
public synchronized Set moveServers(Set servers, String srcGroup,
String dstGroup) throws IOException {
@@ -200,9 +210,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// If destination is 'default' rsgroup, only add servers that are online. If not online, drop
// it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
// rsgroup of dead servers that are to come back later).
- Set onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)?
- Utility.getOnlineServers(this.masterServices): null;
- for (Address el: servers) {
+ Set onlineServers =
+ dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
+ : null;
+ for (Address el : servers) {
src.removeServer(el);
if (onlineServers != null) {
if (!onlineServers.contains(el)) {
@@ -214,7 +225,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
dst.addServer(el);
}
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
+ Map newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(src.getName(), src);
newGroupMap.put(dst.getName(), dst);
flushConfig(newGroupMap);
@@ -223,7 +234,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@Override
public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
- for (RSGroupInfo info: rsGroupMap.values()) {
+ for (RSGroupInfo info : rsGroupMap.values()) {
if (info.containsServer(serverHostPort)) {
return info;
}
@@ -245,17 +256,17 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
public synchronized void moveTables(Set tableNames, String groupName)
throws IOException {
if (groupName != null && !rsGroupMap.containsKey(groupName)) {
- throw new DoNotRetryIOException("Group "+groupName+" does not exist");
+ throw new DoNotRetryIOException("Group " + groupName + " does not exist");
}
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
- for(TableName tableName: tableNames) {
+ Map newGroupMap = Maps.newHashMap(rsGroupMap);
+ for (TableName tableName : tableNames) {
if (tableMap.containsKey(tableName)) {
RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
src.removeTable(tableName);
newGroupMap.put(src.getName(), src);
}
- if(groupName != null) {
+ if (groupName != null) {
RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
dst.addTable(tableName);
newGroupMap.put(dst.getName(), dst);
@@ -267,10 +278,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@Override
public synchronized void removeRSGroup(String groupName) throws IOException {
if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
- throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved "
- + "group");
+ throw new DoNotRetryIOException(
+ "Group " + groupName + " does not exist or is a reserved " + "group");
}
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
+ Map newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.remove(groupName);
flushConfig(newGroupMap);
}
@@ -286,25 +297,25 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
@Override
- public void moveServersAndTables(Set servers, Set tables,
- String srcGroup, String dstGroup) throws IOException {
- //get server's group
+ public void moveServersAndTables(Set servers, Set tables, String srcGroup,
+ String dstGroup) throws IOException {
+ // get server's group
RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
- //move servers
- for (Address el: servers) {
+ // move servers
+ for (Address el : servers) {
srcGroupInfo.removeServer(el);
dstGroupInfo.addServer(el);
}
- //move tables
- for(TableName tableName: tables) {
+ // move tables
+ for (TableName tableName : tables) {
srcGroupInfo.removeTable(tableName);
dstGroupInfo.addTable(tableName);
}
- //flush changed groupinfo
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
+ // flush changed groupinfo
+ Map newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
flushConfig(newGroupMap);
@@ -313,7 +324,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@Override
public synchronized void removeServers(Set servers) throws IOException {
Map rsGroupInfos = new HashMap();
- for (Address el: servers) {
+ for (Address el : servers) {
RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
if (rsGroupInfo != null) {
RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
@@ -324,7 +335,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
newRsGroupInfo.removeServer(el);
rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
}
- }else {
+ } else {
LOG.warn("Server " + el + " does not belong to any rsgroup.");
}
}
@@ -338,10 +349,17 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
List retrieveGroupListFromGroupTable() throws IOException {
List rsGroupInfoList = Lists.newArrayList();
- for (Result result : rsGroupTable.getScanner(new Scan())) {
- RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(
- result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
- rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
+ try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
+ ResultScanner scanner = table.getScanner(new Scan())) {
+ for (Result result;;) {
+ result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
+ .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
+ rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
+ }
}
return rsGroupInfoList;
}
@@ -349,27 +367,27 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
List retrieveGroupListFromZookeeper() throws IOException {
String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
List RSGroupInfoList = Lists.newArrayList();
- //Overwrite any info stored by table, this takes precedence
+ // Overwrite any info stored by table, this takes precedence
try {
- if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
+ if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
List children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
if (children == null) {
return RSGroupInfoList;
}
- for(String znode: children) {
+ for (String znode : children) {
byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
- if(data.length > 0) {
+ if (data.length > 0) {
ProtobufUtil.expectPBMagicPrefix(data);
- ByteArrayInputStream bis = new ByteArrayInputStream(
- data, ProtobufUtil.lengthOfPBMagic(), data.length);
- RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(
- RSGroupProtos.RSGroupInfo.parseFrom(bis)));
+ ByteArrayInputStream bis =
+ new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ RSGroupInfoList
+ .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
}
}
LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
}
- } catch (KeeperException|DeserializationException|InterruptedException e) {
- throw new IOException("Failed to read rsGroupZNode",e);
+ } catch (KeeperException | DeserializationException | InterruptedException e) {
+ throw new IOException("Failed to read rsGroupZNode", e);
}
return RSGroupInfoList;
}
@@ -380,8 +398,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
/**
- * Read rsgroup info from the source of truth, the hbase:rsgroup table.
- * Update zk cache. Called on startup of the manager.
+ * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
+ * startup of the manager.
*/
private synchronized void refresh(boolean forceOnline) throws IOException {
List groupList = new LinkedList<>();
@@ -390,9 +408,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// if online read from GROUP table
if (forceOnline || isOnline()) {
LOG.debug("Refreshing in Online mode.");
- if (rsGroupTable == null) {
- rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
- }
groupList.addAll(retrieveGroupListFromGroupTable());
} else {
LOG.debug("Refreshing in Offline mode.");
@@ -401,26 +416,25 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// refresh default group, prune
NavigableSet orphanTables = new TreeSet<>();
- for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
+ for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
orphanTables.add(TableName.valueOf(entry));
}
- for (RSGroupInfo group: groupList) {
- if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
+ for (RSGroupInfo group : groupList) {
+ if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
orphanTables.removeAll(group.getTables());
}
}
// This is added to the last of the list so it overwrites the 'default' rsgroup loaded
// from region group table or zk
- groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(),
- orphanTables));
+ groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables));
// populate the data
HashMap newGroupMap = Maps.newHashMap();
HashMap newTableMap = Maps.newHashMap();
for (RSGroupInfo group : groupList) {
newGroupMap.put(group.getName(), group);
- for(TableName table: group.getTables()) {
+ for (TableName table : group.getTables()) {
newTableMap.put(table, group.getName());
}
}
@@ -428,43 +442,41 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
updateCacheOfRSGroups(rsGroupMap.keySet());
}
- private synchronized Map flushConfigTable(Map groupMap)
+ private synchronized Map flushConfigTable(Map groupMap)
throws IOException {
- Map newTableMap = Maps.newHashMap();
+ Map newTableMap = Maps.newHashMap();
List mutations = Lists.newArrayList();
// populate deletes
- for(String groupName : prevRSGroups) {
- if(!groupMap.containsKey(groupName)) {
+ for (String groupName : prevRSGroups) {
+ if (!groupMap.containsKey(groupName)) {
Delete d = new Delete(Bytes.toBytes(groupName));
mutations.add(d);
}
}
// populate puts
- for(RSGroupInfo RSGroupInfo : groupMap.values()) {
+ for (RSGroupInfo RSGroupInfo : groupMap.values()) {
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
mutations.add(p);
- for(TableName entry: RSGroupInfo.getTables()) {
+ for (TableName entry : RSGroupInfo.getTables()) {
newTableMap.put(entry, RSGroupInfo.getName());
}
}
- if(mutations.size() > 0) {
+ if (mutations.size() > 0) {
multiMutate(mutations);
}
return newTableMap;
}
- private synchronized void flushConfig()
- throws IOException {
+ private synchronized void flushConfig() throws IOException {
flushConfig(this.rsGroupMap);
}
- private synchronized void flushConfig(Map newGroupMap)
- throws IOException {
+ private synchronized void flushConfig(Map newGroupMap) throws IOException {
Map newTableMap;
// For offline mode persistence is still unavailable
@@ -474,7 +486,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
if (!m.equals(newGroupMap) ||
- !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
+ !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
throw new IOException("Only default servers can be updated during offline mode");
}
newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
@@ -492,22 +504,21 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
List zkOps = new ArrayList<>(newGroupMap.size());
- for(String groupName : prevRSGroups) {
- if(!newGroupMap.containsKey(groupName)) {
+ for (String groupName : prevRSGroups) {
+ if (!newGroupMap.containsKey(groupName)) {
String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
}
}
-
for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
- LOG.debug("Updating znode: "+znode);
+ LOG.debug("Updating znode: " + znode);
ZKUtil.createAndFailSilent(watcher, znode);
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
- ProtobufUtil.prependPBMagic(proto.toByteArray())));
+ ProtobufUtil.prependPBMagic(proto.toByteArray())));
}
LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
@@ -515,14 +526,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
} catch (KeeperException e) {
LOG.error("Failed to write to rsGroupZNode", e);
masterServices.abort("Failed to write to rsGroupZNode", e);
- throw new IOException("Failed to write to rsGroupZNode",e);
+ throw new IOException("Failed to write to rsGroupZNode", e);
}
updateCacheOfRSGroups(newGroupMap.keySet());
}
/**
- * Make changes visible.
- * Caller must be synchronized on 'this'.
+ * Make changes visible. Caller must be synchronized on 'this'.
*/
private void resetRSGroupAndTableMaps(Map newRSGroupMap,
Map newTableMap) {
@@ -532,8 +542,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
/**
- * Update cache of rsgroups.
- * Caller must be synchronized on 'this'.
+ * Update cache of rsgroups. Caller must be synchronized on 'this'.
* @param currentGroups Current list of Groups.
*/
private void updateCacheOfRSGroups(final Set currentGroups) {
@@ -549,7 +558,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
LOG.debug("Reading online RS from zookeeper");
List servers = new LinkedList<>();
try {
- for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
+ for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
servers.add(ServerName.parseServerName(el));
}
} catch (KeeperException e) {
@@ -562,12 +571,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private SortedSet getDefaultServers() throws IOException {
SortedSet defaultServers = Sets.newTreeSet();
for (ServerName serverName : getOnlineRS()) {
- Address server =
- Address.fromParts(serverName.getHostname(), serverName.getPort());
+ Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
boolean found = false;
- for(RSGroupInfo rsgi: listRSGroups()) {
- if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) &&
- rsgi.containsServer(server)) {
+ for (RSGroupInfo rsgi : listRSGroups()) {
+ if (!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) && rsgi.containsServer(server)) {
found = true;
break;
}
@@ -593,13 +600,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private void updateFailedAssignments() {
// Kick all regions in FAILED_OPEN state
List stuckAssignments = Lists.newArrayList();
- for (RegionStateNode state:
- masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
+ for (RegionStateNode state : masterServices.getAssignmentManager().getRegionStates()
+ .getRegionsInTransition()) {
if (state.isStuck()) {
stuckAssignments.add(state.getRegionInfo());
}
}
- for (RegionInfo region: stuckAssignments) {
+ for (RegionInfo region : stuckAssignments) {
LOG.info("Retrying assignment of " + region);
try {
masterServices.getAssignmentManager().unassign(region);
@@ -612,8 +619,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
/**
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
* servers. Notifications about server changes are received by registering {@link ServerListener}.
- * As a listener, we need to return immediately, so the real work of updating the servers is
- * done asynchronously in this thread.
+ * As a listener, we need to return immediately, so the real work of updating the servers is done
+ * asynchronously in this thread.
*/
private class ServerEventsListenerThread extends Thread implements ServerListener {
private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
@@ -642,14 +649,14 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
public void run() {
setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
SortedSet prevDefaultServers = new TreeSet<>();
- while(isMasterRunning(masterServices)) {
+ while (isMasterRunning(masterServices)) {
try {
LOG.info("Updating default servers.");
SortedSet servers = RSGroupInfoManagerImpl.this.getDefaultServers();
if (!servers.equals(prevDefaultServers)) {
RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
prevDefaultServers = servers;
- LOG.info("Updated with servers: "+servers.size());
+ LOG.info("Updated with servers: " + servers.size());
}
try {
synchronized (this) {
@@ -673,8 +680,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private volatile boolean hasChanged = false;
public FailedOpenUpdaterThread(Configuration conf) {
- this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
- DEFAULT_REASSIGN_WAIT_INTERVAL);
+ this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, DEFAULT_REASSIGN_WAIT_INTERVAL);
setDaemon(true);
}
@@ -734,106 +740,61 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private volatile boolean online = false;
RSGroupStartupWorker() {
+ super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
setDaemon(true);
}
@Override
public void run() {
- setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
if (waitForGroupTableOnline()) {
LOG.info("GroupBasedLoadBalancer is now online");
+ } else {
+ LOG.warn("Quit without making region group table online");
}
}
private boolean waitForGroupTableOnline() {
- final List foundRegions = new LinkedList<>();
- final List assignedRegions = new LinkedList<>();
- final AtomicBoolean found = new AtomicBoolean(false);
- final TableStateManager tsm = masterServices.getTableStateManager();
- boolean createSent = false;
- while (!found.get() && isMasterRunning(masterServices)) {
- foundRegions.clear();
- assignedRegions.clear();
- found.set(true);
+ while (isMasterRunning(masterServices)) {
try {
- boolean rootMetaFound =
- Utility.verifyMetaRegionLocation(conn, masterServices.getZooKeeper(), 1);
- if (rootMetaFound) {
- MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
- @Override
- public boolean visitInternal(Result row) throws IOException {
- RegionInfo info = MetaTableAccessor.getRegionInfo(row);
- if (info != null) {
- Cell serverCell =
- row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
- ServerName sn =
- ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
- if (sn == null) {
- found.set(false);
- } else if (tsm.isTableState(RSGROUP_TABLE_NAME, TableState.State.ENABLED)) {
- try {
- ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
- ClientProtos.GetRequest request =
- RequestConverter.buildGetRequest(info.getRegionName(),
- new Get(ROW_KEY));
- rs.get(null, request);
- assignedRegions.add(info);
- } catch(Exception ex) {
- LOG.debug("Caught exception while verifying group region", ex);
- }
- }
- foundRegions.add(info);
- }
- }
- return true;
- }
- };
- MetaTableAccessor.fullScanRegions(conn, visitor);
- // if no regions in meta then we have to create the table
- if (foundRegions.size() < 1 && rootMetaFound && !createSent) {
- createRSGroupTable();
- createSent = true;
- }
- LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get()
- + ", regionCount=" + foundRegions.size() + ", assignCount="
- + assignedRegions.size() + ", rootMetaFound=" + rootMetaFound);
- found.set(found.get() && assignedRegions.size() == foundRegions.size()
- && foundRegions.size() > 0);
- } else {
- LOG.info("Waiting for catalog tables to come online");
- found.set(false);
+ TableStateManager tsm = masterServices.getTableStateManager();
+ if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
+ createRSGroupTable();
}
- if (found.get()) {
- LOG.debug("With group table online, refreshing cached information.");
- RSGroupInfoManagerImpl.this.refresh(true);
- online = true;
- //flush any inconsistencies between ZK and HTable
- RSGroupInfoManagerImpl.this.flushConfig();
+ // try reading from the table
+ try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
+ table.get(new Get(ROW_KEY));
}
- } catch (RuntimeException e) {
- throw e;
- } catch(Exception e) {
- found.set(false);
+ LOG.info(
+ "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
+ RSGroupInfoManagerImpl.this.refresh(true);
+ online = true;
+ // flush any inconsistencies between ZK and HTable
+ RSGroupInfoManagerImpl.this.flushConfig();
+ return true;
+ } catch (Exception e) {
LOG.warn("Failed to perform check", e);
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted", e);
+ // 100ms is short so let's just ignore the interrupt
+ Threads.sleepWithoutInterrupt(100);
}
}
- return found.get();
+ return false;
}
private void createRSGroupTable() throws IOException {
- Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
+ OptionalLong optProcId = masterServices.getProcedures().stream()
+ .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
+ .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
+ .findFirst();
+ long procId;
+ if (optProcId.isPresent()) {
+ procId = optProcId.getAsLong();
+ } else {
+ procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
+ }
// wait for region to be online
int tries = 600;
- while (!(masterServices.getMasterProcedureExecutor().isFinished(procId))
- && masterServices.getMasterProcedureExecutor().isRunning()
- && tries > 0) {
+ while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
+ masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -841,13 +802,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
tries--;
}
- if(tries <= 0) {
+ if (tries <= 0) {
throw new IOException("Failed to create group table in a given time.");
} else {
Procedure> result = masterServices.getMasterProcedureExecutor().getResult(procId);
if (result != null && result.isFailed()) {
- throw new IOException("Failed to create group table. " +
- MasterProcedureUtil.unwrapRemoteIOException(result));
+ throw new IOException(
+ "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
}
}
}
@@ -862,31 +823,32 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
private void multiMutate(List mutations) throws IOException {
- CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
- MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
- = MultiRowMutationProtos.MutateRowsRequest.newBuilder();
- for (Mutation mutation : mutations) {
- if (mutation instanceof Put) {
- mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
+ try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
+ MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
+ MultiRowMutationProtos.MutateRowsRequest.newBuilder();
+ for (Mutation mutation : mutations) {
+ if (mutation instanceof Put) {
+ mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
mutation));
- } else if (mutation instanceof Delete) {
- mmrBuilder.addMutationRequest(
- org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
- org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.
- MutationType.DELETE, mutation));
- } else {
- throw new DoNotRetryIOException("multiMutate doesn't support "
- + mutation.getClass().getName());
+ } else if (mutation instanceof Delete) {
+ mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
+ mutation));
+ } else {
+ throw new DoNotRetryIOException(
+ "multiMutate doesn't support " + mutation.getClass().getName());
+ }
}
- }
- MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
- MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
- try {
- service.mutateRows(null, mmrBuilder.build());
- } catch (ServiceException ex) {
- ProtobufUtil.toIOException(ex);
+ MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
+ try {
+ service.mutateRows(null, mmrBuilder.build());
+ } catch (ServiceException ex) {
+ ProtobufUtil.toIOException(ex);
+ }
}
}
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/Utility.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/Utility.java
deleted file mode 100644
index d5450c4dfce..00000000000
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/Utility.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.rsgroup;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.NoRouteToHostException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Locale;
-import java.util.Set;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.ipc.FailedServerException;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.net.Address;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
-/**
- * Utility for this RSGroup package in hbase-rsgroup.
- */
-@InterfaceAudience.Private
-final class Utility {
-
- private static final Logger LOG = LoggerFactory.getLogger(Utility.class);
-
- private Utility() {
- }
-
- /**
- * @param master the master to get online servers for
- * @return Set of online Servers named for their hostname and port (not ServerName).
- */
- static Set getOnlineServers(final MasterServices master) {
- Set onlineServers = new HashSet();
- if (master == null) {
- return onlineServers;
- }
-
- for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
- onlineServers.add(server.getAddress());
- }
- return onlineServers;
- }
-
- /**
- * Verify hbase:meta
is deployed and accessible.
- * @param hConnection the connection to use
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param timeout How long to wait on zk for meta address (passed through to the internal call to
- * {@link #getMetaServerConnection}.
- * @return True if the hbase:meta
location is healthy.
- * @throws IOException if the number of retries for getting the connection is exceeded
- * @throws InterruptedException if waiting for the socket operation fails
- */
- public static boolean verifyMetaRegionLocation(ClusterConnection hConnection, ZKWatcher zkw,
- final long timeout) throws InterruptedException, IOException {
- return verifyMetaRegionLocation(hConnection, zkw, timeout, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- /**
- * Verify hbase:meta
is deployed and accessible.
- * @param connection the connection to use
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param timeout How long to wait on zk for meta address (passed through to
- * @param replicaId the ID of the replica
- * @return True if the hbase:meta
location is healthy.
- * @throws InterruptedException if waiting for the socket operation fails
- * @throws IOException if the number of retries for getting the connection is exceeded
- */
- public static boolean verifyMetaRegionLocation(ClusterConnection connection, ZKWatcher zkw,
- final long timeout, int replicaId) throws InterruptedException, IOException {
- AdminProtos.AdminService.BlockingInterface service = null;
- try {
- service = getMetaServerConnection(connection, zkw, timeout, replicaId);
- } catch (NotAllMetaRegionsOnlineException e) {
- // Pass
- } catch (ServerNotRunningYetException e) {
- // Pass -- remote server is not up so can't be carrying root
- } catch (UnknownHostException e) {
- // Pass -- server name doesn't resolve so it can't be assigned anything.
- } catch (RegionServerStoppedException e) {
- // Pass -- server name sends us to a server that is dying or already dead.
- }
- return (service != null) && verifyRegionLocation(connection, service,
- MetaTableLocator.getMetaRegionLocation(zkw, replicaId),
- RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId)
- .getRegionName());
- }
-
- /**
- * Verify we can connect to hostingServer
and that its carrying
- * regionName
.
- * @param hostingServer Interface to the server hosting regionName
- * @param address The servername that goes with the metaServer
interface. Used
- * logging.
- * @param regionName The regionname we are interested in.
- * @return True if we were able to verify the region located at other side of the interface.
- */
- // TODO: We should be able to get the ServerName from the AdminProtocol
- // rather than have to pass it in. Its made awkward by the fact that the
- // HRI is likely a proxy against remote server so the getServerName needs
- // to be fixed to go to a local method or to a cache before we can do this.
- private static boolean verifyRegionLocation(final ClusterConnection connection,
- AdminService.BlockingInterface hostingServer, final ServerName address,
- final byte[] regionName) {
- if (hostingServer == null) {
- LOG.info("Passed hostingServer is null");
- return false;
- }
- Throwable t;
- HBaseRpcController controller = connection.getRpcControllerFactory().newController();
- try {
- // Try and get regioninfo from the hosting server.
- return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null;
- } catch (ConnectException e) {
- t = e;
- } catch (RetriesExhaustedException e) {
- t = e;
- } catch (RemoteException e) {
- IOException ioe = e.unwrapRemoteException();
- t = ioe;
- } catch (IOException e) {
- Throwable cause = e.getCause();
- if (cause != null && cause instanceof EOFException) {
- t = cause;
- } else if (cause != null && cause.getMessage() != null &&
- cause.getMessage().contains("Connection reset")) {
- t = cause;
- } else {
- t = e;
- }
- }
- LOG.info("Failed verification of " + Bytes.toStringBinary(regionName) + " at address=" +
- address + ", exception=" + t.getMessage());
- return false;
- }
-
- /**
- * Gets a connection to the server hosting meta, as reported by ZooKeeper, waiting up to the
- * specified timeout for availability.
- *
- * WARNING: Does not retry. Use an {@link org.apache.hadoop.hbase.client.HTable} instead.
- * @param connection the connection to use
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param timeout How long to wait on meta location
- * @param replicaId the ID of the replica
- * @return connection to server hosting meta
- * @throws InterruptedException if waiting for the socket operation fails
- * @throws IOException if the number of retries for getting the connection is exceeded
- */
- private static AdminService.BlockingInterface getMetaServerConnection(
- ClusterConnection connection, ZKWatcher zkw, long timeout, int replicaId)
- throws InterruptedException, IOException {
- return getCachedConnection(connection,
- MetaTableLocator.waitMetaRegionLocation(zkw, replicaId, timeout));
- }
-
- /**
- * @param sn ServerName to get a connection against.
- * @return The AdminProtocol we got when we connected to sn
May have come from cache,
- * may not be good, may have been setup by this invocation, or may be null.
- * @throws IOException if the number of retries for getting the connection is exceeded
- */
- private static AdminService.BlockingInterface getCachedConnection(ClusterConnection connection,
- ServerName sn) throws IOException {
- if (sn == null) {
- return null;
- }
- AdminService.BlockingInterface service = null;
- try {
- service = connection.getAdmin(sn);
- } catch (RetriesExhaustedException e) {
- if (e.getCause() != null && e.getCause() instanceof ConnectException) {
- LOG.debug("Catch this; presume it means the cached connection has gone bad.");
- } else {
- throw e;
- }
- } catch (SocketTimeoutException e) {
- LOG.debug("Timed out connecting to " + sn);
- } catch (NoRouteToHostException e) {
- LOG.debug("Connecting to " + sn, e);
- } catch (SocketException e) {
- LOG.debug("Exception connecting to " + sn);
- } catch (UnknownHostException e) {
- LOG.debug("Unknown host exception connecting to " + sn);
- } catch (FailedServerException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Server " + sn + " is in failed server list.");
- }
- } catch (IOException ioe) {
- Throwable cause = ioe.getCause();
- if (ioe instanceof ConnectException) {
- LOG.debug("Catch. Connect refused.");
- } else if (cause != null && cause instanceof EOFException) {
- LOG.debug("Catch. Other end disconnected us.");
- } else if (cause != null && cause.getMessage() != null &&
- cause.getMessage().toLowerCase(Locale.ROOT).contains("connection reset")) {
- LOG.debug("Catch. Connection reset.");
- } else {
- throw ioe;
- }
-
- }
- return service;
- }
-}
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
index 94876276713..d0855ec8bd1 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
@@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -43,8 +42,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -57,7 +55,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
/**
* Test RSGroupBasedLoadBalancer with SimpleLoadBalancer as internal balancer
*/
-@Category(SmallTests.class)
+@Category(LargeTests.class)
public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
index 723a295b248..e588a7e198b 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -42,7 +41,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -52,7 +51,7 @@ import org.junit.experimental.categories.Category;
/**
* Test RSGroupBasedLoadBalancer with StochasticLoadBalancer as internal balancer
*/
-@Category(SmallTests.class)
+@Category(LargeTests.class)
public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
extends RSGroupableBalancerTestBase {
@ClassRule
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestUtility.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestUtility.java
deleted file mode 100644
index 00656102ffa..00000000000
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestUtility.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.rsgroup;
-
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
-
-@Category({ MiscTests.class, MediumTests.class })
-public class TestUtility {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestUtility.class);
-
- private static final Logger LOG = LoggerFactory.getLogger(TestUtility.class);
-
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static final ServerName SN =
- ServerName.valueOf("example.org", 1234, System.currentTimeMillis());
-
- private ZKWatcher watcher;
-
- private Abortable abortable;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- // Set this down so tests run quicker
- UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
- UTIL.startMiniZKCluster();
- }
-
- @AfterClass
- public static void afterClass() throws IOException {
- UTIL.getZkCluster().shutdown();
- }
-
- @Before
- public void before() throws IOException {
- this.abortable = new Abortable() {
- @Override
- public void abort(String why, Throwable e) {
- LOG.info(why, e);
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- };
- this.watcher =
- new ZKWatcher(UTIL.getConfiguration(), this.getClass().getSimpleName(), this.abortable, true);
- }
-
- @After
- public void after() {
- try {
- // Clean out meta location or later tests will be confused... they presume
- // start fresh in zk.
- MetaTableLocator.deleteMetaLocation(this.watcher);
- } catch (KeeperException e) {
- LOG.warn("Unable to delete hbase:meta location", e);
- }
-
- this.watcher.close();
- }
-
- /**
- * @param admin An {@link AdminProtos.AdminService.BlockingInterface} instance; you'll likely want
- * to pass a mocked HRS; can be null.
- * @param client A mocked ClientProtocol instance, can be null
- * @return Mock up a connection that returns a {@link Configuration} when
- * {@link org.apache.hadoop.hbase.client.ClusterConnection#getConfiguration()} is called,
- * a 'location' when
- * {@link org.apache.hadoop.hbase.client.RegionLocator#getRegionLocation(byte[], boolean)}
- * is called, and that returns the passed
- * {@link AdminProtos.AdminService.BlockingInterface} instance when
- * {@link org.apache.hadoop.hbase.client.ClusterConnection#getAdmin(ServerName)} is
- * called, returns the passed {@link ClientProtos.ClientService.BlockingInterface}
- * instance when
- * {@link org.apache.hadoop.hbase.client.ClusterConnection#getClient(ServerName)} is
- * called.
- */
- private ClusterConnection mockConnection(final AdminProtos.AdminService.BlockingInterface admin,
- final ClientProtos.ClientService.BlockingInterface client) throws IOException {
- ClusterConnection connection =
- HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
- Mockito.doNothing().when(connection).close();
- // Make it so we return any old location when asked.
- final HRegionLocation anyLocation =
- new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, SN);
- Mockito.when(connection.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(),
- Mockito.anyBoolean())).thenReturn(anyLocation);
- Mockito.when(connection.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any()))
- .thenReturn(anyLocation);
- if (admin != null) {
- // If a call to getHRegionConnection, return this implementation.
- Mockito.when(connection.getAdmin(Mockito.any())).thenReturn(admin);
- }
- if (client != null) {
- // If a call to getClient, return this implementation.
- Mockito.when(connection.getClient(Mockito.any())).thenReturn(client);
- }
- return connection;
- }
-
- private void testVerifyMetaRegionLocationWithException(Exception ex)
- throws IOException, InterruptedException, KeeperException, ServiceException {
- // Mock an ClientProtocol.
- final ClientProtos.ClientService.BlockingInterface implementation =
- Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
-
- ClusterConnection connection = mockConnection(null, implementation);
-
- // If a 'get' is called on mocked interface, throw connection refused.
- Mockito.when(implementation.get((RpcController) Mockito.any(), (GetRequest) Mockito.any()))
- .thenThrow(new ServiceException(ex));
-
- long timeout = UTIL.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
- MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPENING);
- assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, timeout));
-
- MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
- assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, timeout));
- }
-
- /**
- * Test get of meta region fails properly if nothing to connect to.
- */
- @Test
- public void testVerifyMetaRegionLocationFails()
- throws IOException, InterruptedException, KeeperException, ServiceException {
- ClusterConnection connection = Mockito.mock(ClusterConnection.class);
- ServiceException connectException =
- new ServiceException(new ConnectException("Connection refused"));
- final AdminProtos.AdminService.BlockingInterface implementation =
- Mockito.mock(AdminProtos.AdminService.BlockingInterface.class);
- Mockito.when(implementation.getRegionInfo((RpcController) Mockito.any(),
- (GetRegionInfoRequest) Mockito.any())).thenThrow(connectException);
- Mockito.when(connection.getAdmin(Mockito.any())).thenReturn(implementation);
- RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
- Mockito.when(controllerFactory.newController())
- .thenReturn(Mockito.mock(HBaseRpcController.class));
- Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory);
-
- ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis());
- MetaTableLocator.setMetaLocation(this.watcher, sn, RegionState.State.OPENING);
- assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, 100));
- MetaTableLocator.setMetaLocation(this.watcher, sn, RegionState.State.OPEN);
- assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, 100));
- }
-
- /**
- * Test we survive a connection refused {@link ConnectException}
- */
- @Test
- public void testGetMetaServerConnectionFails()
- throws IOException, InterruptedException, KeeperException, ServiceException {
- testVerifyMetaRegionLocationWithException(new ConnectException("Connection refused"));
- }
-
- /**
- * Test that verifyMetaRegionLocation properly handles getting a ServerNotRunningException. See
- * HBASE-4470. Note this doesn't check the exact exception thrown in the HBASE-4470 as there it is
- * thrown from getHConnection() and here it is thrown from get() -- but those are both called from
- * the same function anyway, and this way is less invasive than throwing from getHConnection would
- * be.
- */
- @Test
- public void testVerifyMetaRegionServerNotRunning()
- throws IOException, InterruptedException, KeeperException, ServiceException {
- testVerifyMetaRegionLocationWithException(new ServerNotRunningYetException("mock"));
- }
-}