HBASE-21700 Simplify the implementation of RSGroupInfoManagerImpl

This commit is contained in:
zhangduo 2019-01-09 21:02:00 +08:00 committed by Duo Zhang
parent 8e39ec2c59
commit a1903ea8ab
5 changed files with 205 additions and 720 deletions

View File

@ -29,32 +29,29 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -63,6 +60,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
@ -72,6 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@ -84,50 +83,44 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
* This is an implementation of {@link RSGroupInfoManager} which makes
* use of an HBase table as the persistence store for the group information.
* It also makes use of zookeeper to store group information needed
* for bootstrapping during offline mode.
*
* <h2>Concurrency</h2>
* RSGroup state is kept locally in Maps. There is a rsgroup name to cached
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the
* rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the
* hbase:rsgroup table (and cached in zk) on each modification.
*
* <p>Mutations on state are synchronized but reads can continue without having
* to wait on an instance monitor, mutations do wholesale replace of the Maps on
* update -- Copy-On-Write; the local Maps of state are read-only, just-in-case
* (see flushConfig).
*
* <p>Reads must not block else there is a danger we'll deadlock.
*
* <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and
* then act on the results of the query modifying cache in zookeeper without another thread
* making intermediate modifications. These clients synchronize on the 'this' instance so
* no other has access concurrently. Reads must be able to continue concurrently.
* This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
* persistence store for the group information. It also makes use of zookeeper to store group
* information needed for bootstrapping during offline mode.
* <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
* too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
* zk) on each modification.
* <p>
* Mutations on state are synchronized but reads can continue without having to wait on an instance
* monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
* state are read-only, just-in-case (see flushConfig).
* <p>
* Reads must not block else there is a danger we'll deadlock.
* <p>
* Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
* on the results of the query modifying cache in zookeeper without another thread making
* intermediate modifications. These clients synchronize on the 'this' instance so no other has
* access concurrently. Reads must be able to continue concurrently.
*/
@InterfaceAudience.Private
final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
private final static HTableDescriptor RSGROUP_TABLE_DESC;
private static final TableDescriptor RSGROUP_TABLE_DESC;
static {
RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME);
RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
try {
RSGROUP_TABLE_DESC.addCoprocessor(
MultiRowMutationEndpoint.class.getName(),
null, Coprocessor.PRIORITY_SYSTEM, null);
builder.setCoprocessor(
CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
.setPriority(Coprocessor.PRIORITY_SYSTEM).build());
} catch (IOException ex) {
throw new RuntimeException(ex);
throw new Error(ex);
}
RSGROUP_TABLE_DESC = builder.build();
}
// There two Maps are immutable and wholesale replaced on each modification
@ -136,23 +129,24 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private volatile Map<TableName, String> tableMap = Collections.emptyMap();
private final MasterServices masterServices;
private Table rsGroupTable;
private final ClusterConnection conn;
private final Connection conn;
private final ZKWatcher watcher;
private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker();
private final RSGroupStartupWorker rsGroupStartupWorker;
// contains list of groups that were last flushed to persistent store
private Set<String> prevRSGroups = new HashSet<>();
private final ServerEventsListenerThread serverEventsListenerThread =
new ServerEventsListenerThread();
new ServerEventsListenerThread();
private FailedOpenUpdaterThread failedOpenUpdaterThread;
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
this.watcher = masterServices.getZooKeeper();
this.conn = masterServices.getClusterConnection();
this.conn = masterServices.getConnection();
this.rsGroupStartupWorker = new RSGroupStartupWorker();
}
private synchronized void init() throws IOException{
private synchronized void init() throws IOException {
refresh();
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
@ -167,7 +161,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
return instance;
}
public void start(){
public void start() {
// create system table of rsgroup
rsGroupStartupWorker.start();
}
@ -176,8 +170,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
checkGroupName(rsGroupInfo.getName());
if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
}
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
@ -192,6 +186,22 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
return rsGroupInfo;
}
/**
* @param master the master to get online servers for
* @return Set of online Servers named for their hostname and port (not ServerName).
*/
private static Set<Address> getOnlineServers(final MasterServices master) {
Set<Address> onlineServers = new HashSet<Address>();
if (master == null) {
return onlineServers;
}
for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
onlineServers.add(server.getAddress());
}
return onlineServers;
}
@Override
public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
String dstGroup) throws IOException {
@ -200,9 +210,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// If destination is 'default' rsgroup, only add servers that are online. If not online, drop
// it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
// rsgroup of dead servers that are to come back later).
Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)?
Utility.getOnlineServers(this.masterServices): null;
for (Address el: servers) {
Set<Address> onlineServers =
dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
: null;
for (Address el : servers) {
src.removeServer(el);
if (onlineServers != null) {
if (!onlineServers.contains(el)) {
@ -214,7 +225,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
dst.addServer(el);
}
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(src.getName(), src);
newGroupMap.put(dst.getName(), dst);
flushConfig(newGroupMap);
@ -223,7 +234,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@Override
public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
for (RSGroupInfo info: rsGroupMap.values()) {
for (RSGroupInfo info : rsGroupMap.values()) {
if (info.containsServer(serverHostPort)) {
return info;
}
@ -245,17 +256,17 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
public synchronized void moveTables(Set<TableName> tableNames, String groupName)
throws IOException {
if (groupName != null && !rsGroupMap.containsKey(groupName)) {
throw new DoNotRetryIOException("Group "+groupName+" does not exist");
throw new DoNotRetryIOException("Group " + groupName + " does not exist");
}
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
for(TableName tableName: tableNames) {
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
for (TableName tableName : tableNames) {
if (tableMap.containsKey(tableName)) {
RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
src.removeTable(tableName);
newGroupMap.put(src.getName(), src);
}
if(groupName != null) {
if (groupName != null) {
RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
dst.addTable(tableName);
newGroupMap.put(dst.getName(), dst);
@ -267,10 +278,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@Override
public synchronized void removeRSGroup(String groupName) throws IOException {
if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved "
+ "group");
throw new DoNotRetryIOException(
"Group " + groupName + " does not exist or is a reserved " + "group");
}
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.remove(groupName);
flushConfig(newGroupMap);
}
@ -286,25 +297,25 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
@Override
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
String srcGroup, String dstGroup) throws IOException {
//get server's group
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
String dstGroup) throws IOException {
// get server's group
RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
//move servers
for (Address el: servers) {
// move servers
for (Address el : servers) {
srcGroupInfo.removeServer(el);
dstGroupInfo.addServer(el);
}
//move tables
for(TableName tableName: tables) {
// move tables
for (TableName tableName : tables) {
srcGroupInfo.removeTable(tableName);
dstGroupInfo.addTable(tableName);
}
//flush changed groupinfo
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
// flush changed groupinfo
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
flushConfig(newGroupMap);
@ -313,7 +324,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@Override
public synchronized void removeServers(Set<Address> servers) throws IOException {
Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
for (Address el: servers) {
for (Address el : servers) {
RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
if (rsGroupInfo != null) {
RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
@ -324,7 +335,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
newRsGroupInfo.removeServer(el);
rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
}
}else {
} else {
LOG.warn("Server " + el + " does not belong to any rsgroup.");
}
}
@ -338,10 +349,17 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
for (Result result : rsGroupTable.getScanner(new Scan())) {
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(
result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan())) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
.parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
}
}
return rsGroupInfoList;
}
@ -349,27 +367,27 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
//Overwrite any info stored by table, this takes precedence
// Overwrite any info stored by table, this takes precedence
try {
if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
if (children == null) {
return RSGroupInfoList;
}
for(String znode: children) {
for (String znode : children) {
byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
if(data.length > 0) {
if (data.length > 0) {
ProtobufUtil.expectPBMagicPrefix(data);
ByteArrayInputStream bis = new ByteArrayInputStream(
data, ProtobufUtil.lengthOfPBMagic(), data.length);
RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(
RSGroupProtos.RSGroupInfo.parseFrom(bis)));
ByteArrayInputStream bis =
new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
RSGroupInfoList
.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
}
}
LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
}
} catch (KeeperException|DeserializationException|InterruptedException e) {
throw new IOException("Failed to read rsGroupZNode",e);
} catch (KeeperException | DeserializationException | InterruptedException e) {
throw new IOException("Failed to read rsGroupZNode", e);
}
return RSGroupInfoList;
}
@ -380,8 +398,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
/**
* Read rsgroup info from the source of truth, the hbase:rsgroup table.
* Update zk cache. Called on startup of the manager.
* Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
* startup of the manager.
*/
private synchronized void refresh(boolean forceOnline) throws IOException {
List<RSGroupInfo> groupList = new LinkedList<>();
@ -390,9 +408,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// if online read from GROUP table
if (forceOnline || isOnline()) {
LOG.debug("Refreshing in Online mode.");
if (rsGroupTable == null) {
rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
}
groupList.addAll(retrieveGroupListFromGroupTable());
} else {
LOG.debug("Refreshing in Offline mode.");
@ -401,26 +416,25 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// refresh default group, prune
NavigableSet<TableName> orphanTables = new TreeSet<>();
for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
orphanTables.add(TableName.valueOf(entry));
}
for (RSGroupInfo group: groupList) {
if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
for (RSGroupInfo group : groupList) {
if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
orphanTables.removeAll(group.getTables());
}
}
// This is added to the last of the list so it overwrites the 'default' rsgroup loaded
// from region group table or zk
groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(),
orphanTables));
groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables));
// populate the data
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
HashMap<TableName, String> newTableMap = Maps.newHashMap();
for (RSGroupInfo group : groupList) {
newGroupMap.put(group.getName(), group);
for(TableName table: group.getTables()) {
for (TableName table : group.getTables()) {
newTableMap.put(table, group.getName());
}
}
@ -428,43 +442,41 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
updateCacheOfRSGroups(rsGroupMap.keySet());
}
private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> groupMap)
private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap)
throws IOException {
Map<TableName,String> newTableMap = Maps.newHashMap();
Map<TableName, String> newTableMap = Maps.newHashMap();
List<Mutation> mutations = Lists.newArrayList();
// populate deletes
for(String groupName : prevRSGroups) {
if(!groupMap.containsKey(groupName)) {
for (String groupName : prevRSGroups) {
if (!groupMap.containsKey(groupName)) {
Delete d = new Delete(Bytes.toBytes(groupName));
mutations.add(d);
}
}
// populate puts
for(RSGroupInfo RSGroupInfo : groupMap.values()) {
for (RSGroupInfo RSGroupInfo : groupMap.values()) {
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
mutations.add(p);
for(TableName entry: RSGroupInfo.getTables()) {
for (TableName entry : RSGroupInfo.getTables()) {
newTableMap.put(entry, RSGroupInfo.getName());
}
}
if(mutations.size() > 0) {
if (mutations.size() > 0) {
multiMutate(mutations);
}
return newTableMap;
}
private synchronized void flushConfig()
throws IOException {
private synchronized void flushConfig() throws IOException {
flushConfig(this.rsGroupMap);
}
private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap)
throws IOException {
private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
Map<TableName, String> newTableMap;
// For offline mode persistence is still unavailable
@ -474,7 +486,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
if (!m.equals(newGroupMap) ||
!oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
!oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
throw new IOException("Only default servers can be updated during offline mode");
}
newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
@ -492,22 +504,21 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
for(String groupName : prevRSGroups) {
if(!newGroupMap.containsKey(groupName)) {
for (String groupName : prevRSGroups) {
if (!newGroupMap.containsKey(groupName)) {
String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
}
}
for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
LOG.debug("Updating znode: "+znode);
LOG.debug("Updating znode: " + znode);
ZKUtil.createAndFailSilent(watcher, znode);
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
ProtobufUtil.prependPBMagic(proto.toByteArray())));
ProtobufUtil.prependPBMagic(proto.toByteArray())));
}
LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
@ -515,14 +526,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
} catch (KeeperException e) {
LOG.error("Failed to write to rsGroupZNode", e);
masterServices.abort("Failed to write to rsGroupZNode", e);
throw new IOException("Failed to write to rsGroupZNode",e);
throw new IOException("Failed to write to rsGroupZNode", e);
}
updateCacheOfRSGroups(newGroupMap.keySet());
}
/**
* Make changes visible.
* Caller must be synchronized on 'this'.
* Make changes visible. Caller must be synchronized on 'this'.
*/
private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
Map<TableName, String> newTableMap) {
@ -532,8 +542,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
/**
* Update cache of rsgroups.
* Caller must be synchronized on 'this'.
* Update cache of rsgroups. Caller must be synchronized on 'this'.
* @param currentGroups Current list of Groups.
*/
private void updateCacheOfRSGroups(final Set<String> currentGroups) {
@ -549,7 +558,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
LOG.debug("Reading online RS from zookeeper");
List<ServerName> servers = new LinkedList<>();
try {
for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
servers.add(ServerName.parseServerName(el));
}
} catch (KeeperException e) {
@ -562,12 +571,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private SortedSet<Address> getDefaultServers() throws IOException {
SortedSet<Address> defaultServers = Sets.newTreeSet();
for (ServerName serverName : getOnlineRS()) {
Address server =
Address.fromParts(serverName.getHostname(), serverName.getPort());
Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
boolean found = false;
for(RSGroupInfo rsgi: listRSGroups()) {
if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) &&
rsgi.containsServer(server)) {
for (RSGroupInfo rsgi : listRSGroups()) {
if (!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) && rsgi.containsServer(server)) {
found = true;
break;
}
@ -593,13 +600,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private void updateFailedAssignments() {
// Kick all regions in FAILED_OPEN state
List<RegionInfo> stuckAssignments = Lists.newArrayList();
for (RegionStateNode state:
masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
for (RegionStateNode state : masterServices.getAssignmentManager().getRegionStates()
.getRegionsInTransition()) {
if (state.isStuck()) {
stuckAssignments.add(state.getRegionInfo());
}
}
for (RegionInfo region: stuckAssignments) {
for (RegionInfo region : stuckAssignments) {
LOG.info("Retrying assignment of " + region);
try {
masterServices.getAssignmentManager().unassign(region);
@ -612,8 +619,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
/**
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
* servers. Notifications about server changes are received by registering {@link ServerListener}.
* As a listener, we need to return immediately, so the real work of updating the servers is
* done asynchronously in this thread.
* As a listener, we need to return immediately, so the real work of updating the servers is done
* asynchronously in this thread.
*/
private class ServerEventsListenerThread extends Thread implements ServerListener {
private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
@ -642,14 +649,14 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
public void run() {
setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
SortedSet<Address> prevDefaultServers = new TreeSet<>();
while(isMasterRunning(masterServices)) {
while (isMasterRunning(masterServices)) {
try {
LOG.info("Updating default servers.");
SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
if (!servers.equals(prevDefaultServers)) {
RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
prevDefaultServers = servers;
LOG.info("Updated with servers: "+servers.size());
LOG.info("Updated with servers: " + servers.size());
}
try {
synchronized (this) {
@ -673,8 +680,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private volatile boolean hasChanged = false;
public FailedOpenUpdaterThread(Configuration conf) {
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
DEFAULT_REASSIGN_WAIT_INTERVAL);
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, DEFAULT_REASSIGN_WAIT_INTERVAL);
setDaemon(true);
}
@ -734,106 +740,61 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private volatile boolean online = false;
RSGroupStartupWorker() {
super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
setDaemon(true);
}
@Override
public void run() {
setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
if (waitForGroupTableOnline()) {
LOG.info("GroupBasedLoadBalancer is now online");
} else {
LOG.warn("Quit without making region group table online");
}
}
private boolean waitForGroupTableOnline() {
final List<RegionInfo> foundRegions = new LinkedList<>();
final List<RegionInfo> assignedRegions = new LinkedList<>();
final AtomicBoolean found = new AtomicBoolean(false);
final TableStateManager tsm = masterServices.getTableStateManager();
boolean createSent = false;
while (!found.get() && isMasterRunning(masterServices)) {
foundRegions.clear();
assignedRegions.clear();
found.set(true);
while (isMasterRunning(masterServices)) {
try {
boolean rootMetaFound =
Utility.verifyMetaRegionLocation(conn, masterServices.getZooKeeper(), 1);
if (rootMetaFound) {
MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
@Override
public boolean visitInternal(Result row) throws IOException {
RegionInfo info = MetaTableAccessor.getRegionInfo(row);
if (info != null) {
Cell serverCell =
row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
HConstants.SERVER_QUALIFIER);
if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
ServerName sn =
ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
if (sn == null) {
found.set(false);
} else if (tsm.isTableState(RSGROUP_TABLE_NAME, TableState.State.ENABLED)) {
try {
ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(info.getRegionName(),
new Get(ROW_KEY));
rs.get(null, request);
assignedRegions.add(info);
} catch(Exception ex) {
LOG.debug("Caught exception while verifying group region", ex);
}
}
foundRegions.add(info);
}
}
return true;
}
};
MetaTableAccessor.fullScanRegions(conn, visitor);
// if no regions in meta then we have to create the table
if (foundRegions.size() < 1 && rootMetaFound && !createSent) {
createRSGroupTable();
createSent = true;
}
LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get()
+ ", regionCount=" + foundRegions.size() + ", assignCount="
+ assignedRegions.size() + ", rootMetaFound=" + rootMetaFound);
found.set(found.get() && assignedRegions.size() == foundRegions.size()
&& foundRegions.size() > 0);
} else {
LOG.info("Waiting for catalog tables to come online");
found.set(false);
TableStateManager tsm = masterServices.getTableStateManager();
if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
createRSGroupTable();
}
if (found.get()) {
LOG.debug("With group table online, refreshing cached information.");
RSGroupInfoManagerImpl.this.refresh(true);
online = true;
//flush any inconsistencies between ZK and HTable
RSGroupInfoManagerImpl.this.flushConfig();
// try reading from the table
try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
table.get(new Get(ROW_KEY));
}
} catch (RuntimeException e) {
throw e;
} catch(Exception e) {
found.set(false);
LOG.info(
"RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
RSGroupInfoManagerImpl.this.refresh(true);
online = true;
// flush any inconsistencies between ZK and HTable
RSGroupInfoManagerImpl.this.flushConfig();
return true;
} catch (Exception e) {
LOG.warn("Failed to perform check", e);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.info("Sleep interrupted", e);
// 100ms is short so let's just ignore the interrupt
Threads.sleepWithoutInterrupt(100);
}
}
return found.get();
return false;
}
private void createRSGroupTable() throws IOException {
Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
OptionalLong optProcId = masterServices.getProcedures().stream()
.filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
.filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
.findFirst();
long procId;
if (optProcId.isPresent()) {
procId = optProcId.getAsLong();
} else {
procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
}
// wait for region to be online
int tries = 600;
while (!(masterServices.getMasterProcedureExecutor().isFinished(procId))
&& masterServices.getMasterProcedureExecutor().isRunning()
&& tries > 0) {
while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@ -841,13 +802,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
tries--;
}
if(tries <= 0) {
if (tries <= 0) {
throw new IOException("Failed to create group table in a given time.");
} else {
Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
if (result != null && result.isFailed()) {
throw new IOException("Failed to create group table. " +
MasterProcedureUtil.unwrapRemoteIOException(result));
throw new IOException(
"Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
}
}
}
@ -862,31 +823,32 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
private void multiMutate(List<Mutation> mutations) throws IOException {
CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
= MultiRowMutationProtos.MutateRowsRequest.newBuilder();
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
MultiRowMutationProtos.MutateRowsRequest.newBuilder();
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
mutation));
} else if (mutation instanceof Delete) {
mmrBuilder.addMutationRequest(
org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.
MutationType.DELETE, mutation));
} else {
throw new DoNotRetryIOException("multiMutate doesn't support "
+ mutation.getClass().getName());
} else if (mutation instanceof Delete) {
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
mutation));
} else {
throw new DoNotRetryIOException(
"multiMutate doesn't support " + mutation.getClass().getName());
}
}
}
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
try {
service.mutateRows(null, mmrBuilder.build());
} catch (ServiceException ex) {
ProtobufUtil.toIOException(ex);
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
try {
service.mutateRows(null, mmrBuilder.build());
} catch (ServiceException ex) {
ProtobufUtil.toIOException(ex);
}
}
}

View File

@ -1,245 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
/**
* Utility for this RSGroup package in hbase-rsgroup.
*/
@InterfaceAudience.Private
final class Utility {
private static final Logger LOG = LoggerFactory.getLogger(Utility.class);
private Utility() {
}
/**
* @param master the master to get online servers for
* @return Set of online Servers named for their hostname and port (not ServerName).
*/
static Set<Address> getOnlineServers(final MasterServices master) {
Set<Address> onlineServers = new HashSet<Address>();
if (master == null) {
return onlineServers;
}
for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
onlineServers.add(server.getAddress());
}
return onlineServers;
}
/**
* Verify <code>hbase:meta</code> is deployed and accessible.
* @param hConnection the connection to use
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
* @param timeout How long to wait on zk for meta address (passed through to the internal call to
* {@link #getMetaServerConnection}.
* @return True if the <code>hbase:meta</code> location is healthy.
* @throws IOException if the number of retries for getting the connection is exceeded
* @throws InterruptedException if waiting for the socket operation fails
*/
public static boolean verifyMetaRegionLocation(ClusterConnection hConnection, ZKWatcher zkw,
final long timeout) throws InterruptedException, IOException {
return verifyMetaRegionLocation(hConnection, zkw, timeout, RegionInfo.DEFAULT_REPLICA_ID);
}
/**
* Verify <code>hbase:meta</code> is deployed and accessible.
* @param connection the connection to use
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
* @param timeout How long to wait on zk for meta address (passed through to
* @param replicaId the ID of the replica
* @return True if the <code>hbase:meta</code> location is healthy.
* @throws InterruptedException if waiting for the socket operation fails
* @throws IOException if the number of retries for getting the connection is exceeded
*/
public static boolean verifyMetaRegionLocation(ClusterConnection connection, ZKWatcher zkw,
final long timeout, int replicaId) throws InterruptedException, IOException {
AdminProtos.AdminService.BlockingInterface service = null;
try {
service = getMetaServerConnection(connection, zkw, timeout, replicaId);
} catch (NotAllMetaRegionsOnlineException e) {
// Pass
} catch (ServerNotRunningYetException e) {
// Pass -- remote server is not up so can't be carrying root
} catch (UnknownHostException e) {
// Pass -- server name doesn't resolve so it can't be assigned anything.
} catch (RegionServerStoppedException e) {
// Pass -- server name sends us to a server that is dying or already dead.
}
return (service != null) && verifyRegionLocation(connection, service,
MetaTableLocator.getMetaRegionLocation(zkw, replicaId),
RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId)
.getRegionName());
}
/**
* Verify we can connect to <code>hostingServer</code> and that its carrying
* <code>regionName</code>.
* @param hostingServer Interface to the server hosting <code>regionName</code>
* @param address The servername that goes with the <code>metaServer</code> interface. Used
* logging.
* @param regionName The regionname we are interested in.
* @return True if we were able to verify the region located at other side of the interface.
*/
// TODO: We should be able to get the ServerName from the AdminProtocol
// rather than have to pass it in. Its made awkward by the fact that the
// HRI is likely a proxy against remote server so the getServerName needs
// to be fixed to go to a local method or to a cache before we can do this.
private static boolean verifyRegionLocation(final ClusterConnection connection,
AdminService.BlockingInterface hostingServer, final ServerName address,
final byte[] regionName) {
if (hostingServer == null) {
LOG.info("Passed hostingServer is null");
return false;
}
Throwable t;
HBaseRpcController controller = connection.getRpcControllerFactory().newController();
try {
// Try and get regioninfo from the hosting server.
return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null;
} catch (ConnectException e) {
t = e;
} catch (RetriesExhaustedException e) {
t = e;
} catch (RemoteException e) {
IOException ioe = e.unwrapRemoteException();
t = ioe;
} catch (IOException e) {
Throwable cause = e.getCause();
if (cause != null && cause instanceof EOFException) {
t = cause;
} else if (cause != null && cause.getMessage() != null &&
cause.getMessage().contains("Connection reset")) {
t = cause;
} else {
t = e;
}
}
LOG.info("Failed verification of " + Bytes.toStringBinary(regionName) + " at address=" +
address + ", exception=" + t.getMessage());
return false;
}
/**
* Gets a connection to the server hosting meta, as reported by ZooKeeper, waiting up to the
* specified timeout for availability.
* <p>
* WARNING: Does not retry. Use an {@link org.apache.hadoop.hbase.client.HTable} instead.
* @param connection the connection to use
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
* @param timeout How long to wait on meta location
* @param replicaId the ID of the replica
* @return connection to server hosting meta
* @throws InterruptedException if waiting for the socket operation fails
* @throws IOException if the number of retries for getting the connection is exceeded
*/
private static AdminService.BlockingInterface getMetaServerConnection(
ClusterConnection connection, ZKWatcher zkw, long timeout, int replicaId)
throws InterruptedException, IOException {
return getCachedConnection(connection,
MetaTableLocator.waitMetaRegionLocation(zkw, replicaId, timeout));
}
/**
* @param sn ServerName to get a connection against.
* @return The AdminProtocol we got when we connected to <code>sn</code> May have come from cache,
* may not be good, may have been setup by this invocation, or may be null.
* @throws IOException if the number of retries for getting the connection is exceeded
*/
private static AdminService.BlockingInterface getCachedConnection(ClusterConnection connection,
ServerName sn) throws IOException {
if (sn == null) {
return null;
}
AdminService.BlockingInterface service = null;
try {
service = connection.getAdmin(sn);
} catch (RetriesExhaustedException e) {
if (e.getCause() != null && e.getCause() instanceof ConnectException) {
LOG.debug("Catch this; presume it means the cached connection has gone bad.");
} else {
throw e;
}
} catch (SocketTimeoutException e) {
LOG.debug("Timed out connecting to " + sn);
} catch (NoRouteToHostException e) {
LOG.debug("Connecting to " + sn, e);
} catch (SocketException e) {
LOG.debug("Exception connecting to " + sn);
} catch (UnknownHostException e) {
LOG.debug("Unknown host exception connecting to " + sn);
} catch (FailedServerException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Server " + sn + " is in failed server list.");
}
} catch (IOException ioe) {
Throwable cause = ioe.getCause();
if (ioe instanceof ConnectException) {
LOG.debug("Catch. Connect refused.");
} else if (cause != null && cause instanceof EOFException) {
LOG.debug("Catch. Other end disconnected us.");
} else if (cause != null && cause.getMessage() != null &&
cause.getMessage().toLowerCase(Locale.ROOT).contains("connection reset")) {
LOG.debug("Catch. Connection reset.");
} else {
throw ioe;
}
}
return service;
}
}

View File

@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -43,8 +42,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@ -57,7 +55,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
/**
* Test RSGroupBasedLoadBalancer with SimpleLoadBalancer as internal balancer
*/
@Category(SmallTests.class)
@Category(LargeTests.class)
public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =

View File

@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -42,7 +41,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -52,7 +51,7 @@ import org.junit.experimental.categories.Category;
/**
* Test RSGroupBasedLoadBalancer with StochasticLoadBalancer as internal balancer
*/
@Category(SmallTests.class)
@Category(LargeTests.class)
public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
extends RSGroupableBalancerTestBase {
@ClassRule

View File

@ -1,229 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.net.ConnectException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
@Category({ MiscTests.class, MediumTests.class })
public class TestUtility {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestUtility.class);
private static final Logger LOG = LoggerFactory.getLogger(TestUtility.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final ServerName SN =
ServerName.valueOf("example.org", 1234, System.currentTimeMillis());
private ZKWatcher watcher;
private Abortable abortable;
@BeforeClass
public static void beforeClass() throws Exception {
// Set this down so tests run quicker
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
UTIL.startMiniZKCluster();
}
@AfterClass
public static void afterClass() throws IOException {
UTIL.getZkCluster().shutdown();
}
@Before
public void before() throws IOException {
this.abortable = new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.info(why, e);
}
@Override
public boolean isAborted() {
return false;
}
};
this.watcher =
new ZKWatcher(UTIL.getConfiguration(), this.getClass().getSimpleName(), this.abortable, true);
}
@After
public void after() {
try {
// Clean out meta location or later tests will be confused... they presume
// start fresh in zk.
MetaTableLocator.deleteMetaLocation(this.watcher);
} catch (KeeperException e) {
LOG.warn("Unable to delete hbase:meta location", e);
}
this.watcher.close();
}
/**
* @param admin An {@link AdminProtos.AdminService.BlockingInterface} instance; you'll likely want
* to pass a mocked HRS; can be null.
* @param client A mocked ClientProtocol instance, can be null
* @return Mock up a connection that returns a {@link Configuration} when
* {@link org.apache.hadoop.hbase.client.ClusterConnection#getConfiguration()} is called,
* a 'location' when
* {@link org.apache.hadoop.hbase.client.RegionLocator#getRegionLocation(byte[], boolean)}
* is called, and that returns the passed
* {@link AdminProtos.AdminService.BlockingInterface} instance when
* {@link org.apache.hadoop.hbase.client.ClusterConnection#getAdmin(ServerName)} is
* called, returns the passed {@link ClientProtos.ClientService.BlockingInterface}
* instance when
* {@link org.apache.hadoop.hbase.client.ClusterConnection#getClient(ServerName)} is
* called.
*/
private ClusterConnection mockConnection(final AdminProtos.AdminService.BlockingInterface admin,
final ClientProtos.ClientService.BlockingInterface client) throws IOException {
ClusterConnection connection =
HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
Mockito.doNothing().when(connection).close();
// Make it so we return any old location when asked.
final HRegionLocation anyLocation =
new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, SN);
Mockito.when(connection.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(),
Mockito.anyBoolean())).thenReturn(anyLocation);
Mockito.when(connection.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any()))
.thenReturn(anyLocation);
if (admin != null) {
// If a call to getHRegionConnection, return this implementation.
Mockito.when(connection.getAdmin(Mockito.any())).thenReturn(admin);
}
if (client != null) {
// If a call to getClient, return this implementation.
Mockito.when(connection.getClient(Mockito.any())).thenReturn(client);
}
return connection;
}
private void testVerifyMetaRegionLocationWithException(Exception ex)
throws IOException, InterruptedException, KeeperException, ServiceException {
// Mock an ClientProtocol.
final ClientProtos.ClientService.BlockingInterface implementation =
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
ClusterConnection connection = mockConnection(null, implementation);
// If a 'get' is called on mocked interface, throw connection refused.
Mockito.when(implementation.get((RpcController) Mockito.any(), (GetRequest) Mockito.any()))
.thenThrow(new ServiceException(ex));
long timeout = UTIL.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPENING);
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, timeout));
MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, timeout));
}
/**
* Test get of meta region fails properly if nothing to connect to.
*/
@Test
public void testVerifyMetaRegionLocationFails()
throws IOException, InterruptedException, KeeperException, ServiceException {
ClusterConnection connection = Mockito.mock(ClusterConnection.class);
ServiceException connectException =
new ServiceException(new ConnectException("Connection refused"));
final AdminProtos.AdminService.BlockingInterface implementation =
Mockito.mock(AdminProtos.AdminService.BlockingInterface.class);
Mockito.when(implementation.getRegionInfo((RpcController) Mockito.any(),
(GetRegionInfoRequest) Mockito.any())).thenThrow(connectException);
Mockito.when(connection.getAdmin(Mockito.any())).thenReturn(implementation);
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
Mockito.when(controllerFactory.newController())
.thenReturn(Mockito.mock(HBaseRpcController.class));
Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory);
ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis());
MetaTableLocator.setMetaLocation(this.watcher, sn, RegionState.State.OPENING);
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, 100));
MetaTableLocator.setMetaLocation(this.watcher, sn, RegionState.State.OPEN);
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, 100));
}
/**
* Test we survive a connection refused {@link ConnectException}
*/
@Test
public void testGetMetaServerConnectionFails()
throws IOException, InterruptedException, KeeperException, ServiceException {
testVerifyMetaRegionLocationWithException(new ConnectException("Connection refused"));
}
/**
* Test that verifyMetaRegionLocation properly handles getting a ServerNotRunningException. See
* HBASE-4470. Note this doesn't check the exact exception thrown in the HBASE-4470 as there it is
* thrown from getHConnection() and here it is thrown from get() -- but those are both called from
* the same function anyway, and this way is less invasive than throwing from getHConnection would
* be.
*/
@Test
public void testVerifyMetaRegionServerNotRunning()
throws IOException, InterruptedException, KeeperException, ServiceException {
testVerifyMetaRegionLocationWithException(new ServerNotRunningYetException("mock"));
}
}