HBASE-21700 Simplify the implementation of RSGroupInfoManagerImpl
This commit is contained in:
parent
2cf13d04a1
commit
58b11dcb1c
|
@ -29,32 +29,29 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.constraint.ConstraintException;
|
||||
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
|
@ -63,6 +60,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.master.ServerListener;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
|
@ -72,6 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
|
@ -84,50 +83,44 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
||||
/**
|
||||
* This is an implementation of {@link RSGroupInfoManager} which makes
|
||||
* use of an HBase table as the persistence store for the group information.
|
||||
* It also makes use of zookeeper to store group information needed
|
||||
* for bootstrapping during offline mode.
|
||||
*
|
||||
* <h2>Concurrency</h2>
|
||||
* RSGroup state is kept locally in Maps. There is a rsgroup name to cached
|
||||
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the
|
||||
* rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the
|
||||
* hbase:rsgroup table (and cached in zk) on each modification.
|
||||
*
|
||||
* <p>Mutations on state are synchronized but reads can continue without having
|
||||
* to wait on an instance monitor, mutations do wholesale replace of the Maps on
|
||||
* update -- Copy-On-Write; the local Maps of state are read-only, just-in-case
|
||||
* (see flushConfig).
|
||||
*
|
||||
* <p>Reads must not block else there is a danger we'll deadlock.
|
||||
*
|
||||
* <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and
|
||||
* then act on the results of the query modifying cache in zookeeper without another thread
|
||||
* making intermediate modifications. These clients synchronize on the 'this' instance so
|
||||
* no other has access concurrently. Reads must be able to continue concurrently.
|
||||
* This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
|
||||
* persistence store for the group information. It also makes use of zookeeper to store group
|
||||
* information needed for bootstrapping during offline mode.
|
||||
* <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
|
||||
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
|
||||
* too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
|
||||
* zk) on each modification.
|
||||
* <p>
|
||||
* Mutations on state are synchronized but reads can continue without having to wait on an instance
|
||||
* monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
|
||||
* state are read-only, just-in-case (see flushConfig).
|
||||
* <p>
|
||||
* Reads must not block else there is a danger we'll deadlock.
|
||||
* <p>
|
||||
* Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
|
||||
* on the results of the query modifying cache in zookeeper without another thread making
|
||||
* intermediate modifications. These clients synchronize on the 'this' instance so no other has
|
||||
* access concurrently. Reads must be able to continue concurrently.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
|
||||
|
||||
/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
|
||||
private final static HTableDescriptor RSGROUP_TABLE_DESC;
|
||||
private static final TableDescriptor RSGROUP_TABLE_DESC;
|
||||
static {
|
||||
RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME);
|
||||
RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
|
||||
RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
|
||||
.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
|
||||
try {
|
||||
RSGROUP_TABLE_DESC.addCoprocessor(
|
||||
MultiRowMutationEndpoint.class.getName(),
|
||||
null, Coprocessor.PRIORITY_SYSTEM, null);
|
||||
builder.setCoprocessor(
|
||||
CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
|
||||
.setPriority(Coprocessor.PRIORITY_SYSTEM).build());
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
throw new Error(ex);
|
||||
}
|
||||
RSGROUP_TABLE_DESC = builder.build();
|
||||
}
|
||||
|
||||
// There two Maps are immutable and wholesale replaced on each modification
|
||||
|
@ -136,23 +129,24 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
private volatile Map<TableName, String> tableMap = Collections.emptyMap();
|
||||
|
||||
private final MasterServices masterServices;
|
||||
private Table rsGroupTable;
|
||||
private final ClusterConnection conn;
|
||||
private final Connection conn;
|
||||
private final ZKWatcher watcher;
|
||||
private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker();
|
||||
private final RSGroupStartupWorker rsGroupStartupWorker;
|
||||
// contains list of groups that were last flushed to persistent store
|
||||
private Set<String> prevRSGroups = new HashSet<>();
|
||||
private final ServerEventsListenerThread serverEventsListenerThread =
|
||||
new ServerEventsListenerThread();
|
||||
new ServerEventsListenerThread();
|
||||
private FailedOpenUpdaterThread failedOpenUpdaterThread;
|
||||
|
||||
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
|
||||
this.masterServices = masterServices;
|
||||
this.watcher = masterServices.getZooKeeper();
|
||||
this.conn = masterServices.getClusterConnection();
|
||||
this.conn = masterServices.getConnection();
|
||||
this.rsGroupStartupWorker = new RSGroupStartupWorker();
|
||||
}
|
||||
|
||||
private synchronized void init() throws IOException{
|
||||
|
||||
private synchronized void init() throws IOException {
|
||||
refresh();
|
||||
serverEventsListenerThread.start();
|
||||
masterServices.getServerManager().registerListener(serverEventsListenerThread);
|
||||
|
@ -167,7 +161,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
return instance;
|
||||
}
|
||||
|
||||
public void start(){
|
||||
public void start() {
|
||||
// create system table of rsgroup
|
||||
rsGroupStartupWorker.start();
|
||||
}
|
||||
|
@ -176,8 +170,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
|
||||
checkGroupName(rsGroupInfo.getName());
|
||||
if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
|
||||
rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
|
||||
throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
|
||||
rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
|
||||
throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
|
||||
}
|
||||
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
|
||||
|
@ -192,6 +186,22 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
return rsGroupInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param master the master to get online servers for
|
||||
* @return Set of online Servers named for their hostname and port (not ServerName).
|
||||
*/
|
||||
private static Set<Address> getOnlineServers(final MasterServices master) {
|
||||
Set<Address> onlineServers = new HashSet<Address>();
|
||||
if (master == null) {
|
||||
return onlineServers;
|
||||
}
|
||||
|
||||
for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
|
||||
onlineServers.add(server.getAddress());
|
||||
}
|
||||
return onlineServers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
|
||||
String dstGroup) throws IOException {
|
||||
|
@ -200,9 +210,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
// If destination is 'default' rsgroup, only add servers that are online. If not online, drop
|
||||
// it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
|
||||
// rsgroup of dead servers that are to come back later).
|
||||
Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)?
|
||||
Utility.getOnlineServers(this.masterServices): null;
|
||||
for (Address el: servers) {
|
||||
Set<Address> onlineServers =
|
||||
dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
|
||||
: null;
|
||||
for (Address el : servers) {
|
||||
src.removeServer(el);
|
||||
if (onlineServers != null) {
|
||||
if (!onlineServers.contains(el)) {
|
||||
|
@ -214,7 +225,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
dst.addServer(el);
|
||||
}
|
||||
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
newGroupMap.put(src.getName(), src);
|
||||
newGroupMap.put(dst.getName(), dst);
|
||||
flushConfig(newGroupMap);
|
||||
|
@ -223,7 +234,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
|
||||
@Override
|
||||
public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
|
||||
for (RSGroupInfo info: rsGroupMap.values()) {
|
||||
for (RSGroupInfo info : rsGroupMap.values()) {
|
||||
if (info.containsServer(serverHostPort)) {
|
||||
return info;
|
||||
}
|
||||
|
@ -245,17 +256,17 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
public synchronized void moveTables(Set<TableName> tableNames, String groupName)
|
||||
throws IOException {
|
||||
if (groupName != null && !rsGroupMap.containsKey(groupName)) {
|
||||
throw new DoNotRetryIOException("Group "+groupName+" does not exist");
|
||||
throw new DoNotRetryIOException("Group " + groupName + " does not exist");
|
||||
}
|
||||
|
||||
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
for(TableName tableName: tableNames) {
|
||||
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
for (TableName tableName : tableNames) {
|
||||
if (tableMap.containsKey(tableName)) {
|
||||
RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
|
||||
src.removeTable(tableName);
|
||||
newGroupMap.put(src.getName(), src);
|
||||
}
|
||||
if(groupName != null) {
|
||||
if (groupName != null) {
|
||||
RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
|
||||
dst.addTable(tableName);
|
||||
newGroupMap.put(dst.getName(), dst);
|
||||
|
@ -267,10 +278,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
@Override
|
||||
public synchronized void removeRSGroup(String groupName) throws IOException {
|
||||
if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
|
||||
throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved "
|
||||
+ "group");
|
||||
throw new DoNotRetryIOException(
|
||||
"Group " + groupName + " does not exist or is a reserved " + "group");
|
||||
}
|
||||
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
newGroupMap.remove(groupName);
|
||||
flushConfig(newGroupMap);
|
||||
}
|
||||
|
@ -286,25 +297,25 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
|
||||
String srcGroup, String dstGroup) throws IOException {
|
||||
//get server's group
|
||||
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
|
||||
String dstGroup) throws IOException {
|
||||
// get server's group
|
||||
RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
|
||||
RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
|
||||
|
||||
//move servers
|
||||
for (Address el: servers) {
|
||||
// move servers
|
||||
for (Address el : servers) {
|
||||
srcGroupInfo.removeServer(el);
|
||||
dstGroupInfo.addServer(el);
|
||||
}
|
||||
//move tables
|
||||
for(TableName tableName: tables) {
|
||||
// move tables
|
||||
for (TableName tableName : tables) {
|
||||
srcGroupInfo.removeTable(tableName);
|
||||
dstGroupInfo.addTable(tableName);
|
||||
}
|
||||
|
||||
//flush changed groupinfo
|
||||
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
// flush changed groupinfo
|
||||
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
|
||||
newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
|
||||
flushConfig(newGroupMap);
|
||||
|
@ -313,7 +324,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
@Override
|
||||
public synchronized void removeServers(Set<Address> servers) throws IOException {
|
||||
Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
|
||||
for (Address el: servers) {
|
||||
for (Address el : servers) {
|
||||
RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
|
||||
if (rsGroupInfo != null) {
|
||||
RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
|
||||
|
@ -324,7 +335,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
newRsGroupInfo.removeServer(el);
|
||||
rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
|
||||
}
|
||||
}else {
|
||||
} else {
|
||||
LOG.warn("Server " + el + " does not belong to any rsgroup.");
|
||||
}
|
||||
}
|
||||
|
@ -338,10 +349,17 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
|
||||
List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
|
||||
List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
|
||||
for (Result result : rsGroupTable.getScanner(new Scan())) {
|
||||
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(
|
||||
result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
|
||||
rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
|
||||
try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(new Scan())) {
|
||||
for (Result result;;) {
|
||||
result = scanner.next();
|
||||
if (result == null) {
|
||||
break;
|
||||
}
|
||||
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
|
||||
.parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
|
||||
rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
|
||||
}
|
||||
}
|
||||
return rsGroupInfoList;
|
||||
}
|
||||
|
@ -349,27 +367,27 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
|
||||
String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
|
||||
List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
|
||||
//Overwrite any info stored by table, this takes precedence
|
||||
// Overwrite any info stored by table, this takes precedence
|
||||
try {
|
||||
if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
|
||||
if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
|
||||
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
|
||||
if (children == null) {
|
||||
return RSGroupInfoList;
|
||||
}
|
||||
for(String znode: children) {
|
||||
for (String znode : children) {
|
||||
byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
|
||||
if(data.length > 0) {
|
||||
if (data.length > 0) {
|
||||
ProtobufUtil.expectPBMagicPrefix(data);
|
||||
ByteArrayInputStream bis = new ByteArrayInputStream(
|
||||
data, ProtobufUtil.lengthOfPBMagic(), data.length);
|
||||
RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(
|
||||
RSGroupProtos.RSGroupInfo.parseFrom(bis)));
|
||||
ByteArrayInputStream bis =
|
||||
new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
|
||||
RSGroupInfoList
|
||||
.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
|
||||
}
|
||||
}
|
||||
LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
|
||||
}
|
||||
} catch (KeeperException|DeserializationException|InterruptedException e) {
|
||||
throw new IOException("Failed to read rsGroupZNode",e);
|
||||
} catch (KeeperException | DeserializationException | InterruptedException e) {
|
||||
throw new IOException("Failed to read rsGroupZNode", e);
|
||||
}
|
||||
return RSGroupInfoList;
|
||||
}
|
||||
|
@ -380,8 +398,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Read rsgroup info from the source of truth, the hbase:rsgroup table.
|
||||
* Update zk cache. Called on startup of the manager.
|
||||
* Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
|
||||
* startup of the manager.
|
||||
*/
|
||||
private synchronized void refresh(boolean forceOnline) throws IOException {
|
||||
List<RSGroupInfo> groupList = new LinkedList<>();
|
||||
|
@ -390,9 +408,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
// if online read from GROUP table
|
||||
if (forceOnline || isOnline()) {
|
||||
LOG.debug("Refreshing in Online mode.");
|
||||
if (rsGroupTable == null) {
|
||||
rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
|
||||
}
|
||||
groupList.addAll(retrieveGroupListFromGroupTable());
|
||||
} else {
|
||||
LOG.debug("Refreshing in Offline mode.");
|
||||
|
@ -401,26 +416,25 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
|
||||
// refresh default group, prune
|
||||
NavigableSet<TableName> orphanTables = new TreeSet<>();
|
||||
for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
|
||||
for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
|
||||
orphanTables.add(TableName.valueOf(entry));
|
||||
}
|
||||
for (RSGroupInfo group: groupList) {
|
||||
if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
|
||||
for (RSGroupInfo group : groupList) {
|
||||
if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
|
||||
orphanTables.removeAll(group.getTables());
|
||||
}
|
||||
}
|
||||
|
||||
// This is added to the last of the list so it overwrites the 'default' rsgroup loaded
|
||||
// from region group table or zk
|
||||
groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(),
|
||||
orphanTables));
|
||||
groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables));
|
||||
|
||||
// populate the data
|
||||
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
|
||||
HashMap<TableName, String> newTableMap = Maps.newHashMap();
|
||||
for (RSGroupInfo group : groupList) {
|
||||
newGroupMap.put(group.getName(), group);
|
||||
for(TableName table: group.getTables()) {
|
||||
for (TableName table : group.getTables()) {
|
||||
newTableMap.put(table, group.getName());
|
||||
}
|
||||
}
|
||||
|
@ -428,43 +442,41 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
updateCacheOfRSGroups(rsGroupMap.keySet());
|
||||
}
|
||||
|
||||
private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> groupMap)
|
||||
private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap)
|
||||
throws IOException {
|
||||
Map<TableName,String> newTableMap = Maps.newHashMap();
|
||||
Map<TableName, String> newTableMap = Maps.newHashMap();
|
||||
List<Mutation> mutations = Lists.newArrayList();
|
||||
|
||||
// populate deletes
|
||||
for(String groupName : prevRSGroups) {
|
||||
if(!groupMap.containsKey(groupName)) {
|
||||
for (String groupName : prevRSGroups) {
|
||||
if (!groupMap.containsKey(groupName)) {
|
||||
Delete d = new Delete(Bytes.toBytes(groupName));
|
||||
mutations.add(d);
|
||||
}
|
||||
}
|
||||
|
||||
// populate puts
|
||||
for(RSGroupInfo RSGroupInfo : groupMap.values()) {
|
||||
for (RSGroupInfo RSGroupInfo : groupMap.values()) {
|
||||
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
|
||||
Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
|
||||
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
|
||||
mutations.add(p);
|
||||
for(TableName entry: RSGroupInfo.getTables()) {
|
||||
for (TableName entry : RSGroupInfo.getTables()) {
|
||||
newTableMap.put(entry, RSGroupInfo.getName());
|
||||
}
|
||||
}
|
||||
|
||||
if(mutations.size() > 0) {
|
||||
if (mutations.size() > 0) {
|
||||
multiMutate(mutations);
|
||||
}
|
||||
return newTableMap;
|
||||
}
|
||||
|
||||
private synchronized void flushConfig()
|
||||
throws IOException {
|
||||
private synchronized void flushConfig() throws IOException {
|
||||
flushConfig(this.rsGroupMap);
|
||||
}
|
||||
|
||||
private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap)
|
||||
throws IOException {
|
||||
private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
|
||||
Map<TableName, String> newTableMap;
|
||||
|
||||
// For offline mode persistence is still unavailable
|
||||
|
@ -474,7 +486,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
|
||||
RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
|
||||
if (!m.equals(newGroupMap) ||
|
||||
!oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
|
||||
!oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
|
||||
throw new IOException("Only default servers can be updated during offline mode");
|
||||
}
|
||||
newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
|
||||
|
@ -492,22 +504,21 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
|
||||
|
||||
List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
|
||||
for(String groupName : prevRSGroups) {
|
||||
if(!newGroupMap.containsKey(groupName)) {
|
||||
for (String groupName : prevRSGroups) {
|
||||
if (!newGroupMap.containsKey(groupName)) {
|
||||
String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
|
||||
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
|
||||
String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
|
||||
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
|
||||
LOG.debug("Updating znode: "+znode);
|
||||
LOG.debug("Updating znode: " + znode);
|
||||
ZKUtil.createAndFailSilent(watcher, znode);
|
||||
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
|
||||
zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
|
||||
ProtobufUtil.prependPBMagic(proto.toByteArray())));
|
||||
ProtobufUtil.prependPBMagic(proto.toByteArray())));
|
||||
}
|
||||
LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
|
||||
|
||||
|
@ -515,14 +526,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
} catch (KeeperException e) {
|
||||
LOG.error("Failed to write to rsGroupZNode", e);
|
||||
masterServices.abort("Failed to write to rsGroupZNode", e);
|
||||
throw new IOException("Failed to write to rsGroupZNode",e);
|
||||
throw new IOException("Failed to write to rsGroupZNode", e);
|
||||
}
|
||||
updateCacheOfRSGroups(newGroupMap.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Make changes visible.
|
||||
* Caller must be synchronized on 'this'.
|
||||
* Make changes visible. Caller must be synchronized on 'this'.
|
||||
*/
|
||||
private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
|
||||
Map<TableName, String> newTableMap) {
|
||||
|
@ -532,8 +542,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Update cache of rsgroups.
|
||||
* Caller must be synchronized on 'this'.
|
||||
* Update cache of rsgroups. Caller must be synchronized on 'this'.
|
||||
* @param currentGroups Current list of Groups.
|
||||
*/
|
||||
private void updateCacheOfRSGroups(final Set<String> currentGroups) {
|
||||
|
@ -549,7 +558,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
LOG.debug("Reading online RS from zookeeper");
|
||||
List<ServerName> servers = new LinkedList<>();
|
||||
try {
|
||||
for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
|
||||
for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
|
||||
servers.add(ServerName.parseServerName(el));
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
|
@ -562,12 +571,10 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
private SortedSet<Address> getDefaultServers() throws IOException {
|
||||
SortedSet<Address> defaultServers = Sets.newTreeSet();
|
||||
for (ServerName serverName : getOnlineRS()) {
|
||||
Address server =
|
||||
Address.fromParts(serverName.getHostname(), serverName.getPort());
|
||||
Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
|
||||
boolean found = false;
|
||||
for(RSGroupInfo rsgi: listRSGroups()) {
|
||||
if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) &&
|
||||
rsgi.containsServer(server)) {
|
||||
for (RSGroupInfo rsgi : listRSGroups()) {
|
||||
if (!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) && rsgi.containsServer(server)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
|
@ -593,13 +600,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
private void updateFailedAssignments() {
|
||||
// Kick all regions in FAILED_OPEN state
|
||||
List<RegionInfo> stuckAssignments = Lists.newArrayList();
|
||||
for (RegionStateNode state:
|
||||
masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
|
||||
for (RegionStateNode state : masterServices.getAssignmentManager().getRegionStates()
|
||||
.getRegionsInTransition()) {
|
||||
if (state.isStuck()) {
|
||||
stuckAssignments.add(state.getRegionInfo());
|
||||
}
|
||||
}
|
||||
for (RegionInfo region: stuckAssignments) {
|
||||
for (RegionInfo region : stuckAssignments) {
|
||||
LOG.info("Retrying assignment of " + region);
|
||||
try {
|
||||
masterServices.getAssignmentManager().unassign(region);
|
||||
|
@ -612,8 +619,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
/**
|
||||
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
|
||||
* servers. Notifications about server changes are received by registering {@link ServerListener}.
|
||||
* As a listener, we need to return immediately, so the real work of updating the servers is
|
||||
* done asynchronously in this thread.
|
||||
* As a listener, we need to return immediately, so the real work of updating the servers is done
|
||||
* asynchronously in this thread.
|
||||
*/
|
||||
private class ServerEventsListenerThread extends Thread implements ServerListener {
|
||||
private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
|
||||
|
@ -642,14 +649,14 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
public void run() {
|
||||
setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
|
||||
SortedSet<Address> prevDefaultServers = new TreeSet<>();
|
||||
while(isMasterRunning(masterServices)) {
|
||||
while (isMasterRunning(masterServices)) {
|
||||
try {
|
||||
LOG.info("Updating default servers.");
|
||||
SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
|
||||
if (!servers.equals(prevDefaultServers)) {
|
||||
RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
|
||||
prevDefaultServers = servers;
|
||||
LOG.info("Updated with servers: "+servers.size());
|
||||
LOG.info("Updated with servers: " + servers.size());
|
||||
}
|
||||
try {
|
||||
synchronized (this) {
|
||||
|
@ -673,8 +680,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
private volatile boolean hasChanged = false;
|
||||
|
||||
public FailedOpenUpdaterThread(Configuration conf) {
|
||||
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
|
||||
DEFAULT_REASSIGN_WAIT_INTERVAL);
|
||||
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, DEFAULT_REASSIGN_WAIT_INTERVAL);
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
|
@ -734,130 +740,61 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
private volatile boolean online = false;
|
||||
|
||||
RSGroupStartupWorker() {
|
||||
super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
|
||||
if (waitForGroupTableOnline()) {
|
||||
LOG.info("GroupBasedLoadBalancer is now online");
|
||||
} else {
|
||||
LOG.warn("Quit without making region group table online");
|
||||
}
|
||||
}
|
||||
|
||||
private boolean waitForGroupTableOnline() {
|
||||
final List<RegionInfo> foundRegions = new LinkedList<>();
|
||||
final List<RegionInfo> assignedRegions = new LinkedList<>();
|
||||
final AtomicBoolean found = new AtomicBoolean(false);
|
||||
final TableStateManager tsm = masterServices.getTableStateManager();
|
||||
boolean createSent = false;
|
||||
while (!found.get() && isMasterRunning(masterServices)) {
|
||||
foundRegions.clear();
|
||||
assignedRegions.clear();
|
||||
found.set(true);
|
||||
while (isMasterRunning(masterServices)) {
|
||||
try {
|
||||
boolean rootMetaFound =
|
||||
Utility.verifyMetaRegionLocation(conn, masterServices.getZooKeeper(), 1);
|
||||
final AtomicBoolean nsFound = new AtomicBoolean(false);
|
||||
if (rootMetaFound) {
|
||||
MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
|
||||
@Override
|
||||
public boolean visitInternal(Result row) throws IOException {
|
||||
RegionInfo info = MetaTableAccessor.getRegionInfo(row);
|
||||
if (info != null) {
|
||||
Cell serverCell =
|
||||
row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SERVER_QUALIFIER);
|
||||
if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
|
||||
ServerName sn =
|
||||
ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
|
||||
if (sn == null) {
|
||||
found.set(false);
|
||||
} else if (tsm.isTableState(RSGROUP_TABLE_NAME, TableState.State.ENABLED)) {
|
||||
try {
|
||||
ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
|
||||
ClientProtos.GetRequest request =
|
||||
RequestConverter.buildGetRequest(info.getRegionName(),
|
||||
new Get(ROW_KEY));
|
||||
rs.get(null, request);
|
||||
assignedRegions.add(info);
|
||||
} catch(Exception ex) {
|
||||
LOG.debug("Caught exception while verifying group region", ex);
|
||||
}
|
||||
}
|
||||
foundRegions.add(info);
|
||||
}
|
||||
if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
|
||||
Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SERVER_QUALIFIER);
|
||||
ServerName sn = null;
|
||||
if(cell != null) {
|
||||
sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
|
||||
}
|
||||
if (sn == null) {
|
||||
nsFound.set(false);
|
||||
} else if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
|
||||
TableState.State.ENABLED)) {
|
||||
try {
|
||||
ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
|
||||
ClientProtos.GetRequest request =
|
||||
RequestConverter.buildGetRequest(info.getRegionName(),
|
||||
new Get(ROW_KEY));
|
||||
rs.get(null, request);
|
||||
nsFound.set(true);
|
||||
} catch(Exception ex) {
|
||||
LOG.debug("Caught exception while verifying group region", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
MetaTableAccessor.fullScanRegions(conn, visitor);
|
||||
// if no regions in meta then we have to create the table
|
||||
if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
|
||||
createRSGroupTable();
|
||||
createSent = true;
|
||||
}
|
||||
LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get()
|
||||
+ ", regionCount=" + foundRegions.size() + ", assignCount="
|
||||
+ assignedRegions.size() + ", rootMetaFound=" + rootMetaFound);
|
||||
found.set(found.get() && assignedRegions.size() == foundRegions.size()
|
||||
&& foundRegions.size() > 0);
|
||||
} else {
|
||||
LOG.info("Waiting for catalog tables to come online");
|
||||
found.set(false);
|
||||
TableStateManager tsm = masterServices.getTableStateManager();
|
||||
if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
|
||||
createRSGroupTable();
|
||||
}
|
||||
if (found.get()) {
|
||||
LOG.debug("With group table online, refreshing cached information.");
|
||||
RSGroupInfoManagerImpl.this.refresh(true);
|
||||
online = true;
|
||||
//flush any inconsistencies between ZK and HTable
|
||||
RSGroupInfoManagerImpl.this.flushConfig();
|
||||
// try reading from the table
|
||||
try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
|
||||
table.get(new Get(ROW_KEY));
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch(Exception e) {
|
||||
found.set(false);
|
||||
LOG.info(
|
||||
"RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
|
||||
RSGroupInfoManagerImpl.this.refresh(true);
|
||||
online = true;
|
||||
// flush any inconsistencies between ZK and HTable
|
||||
RSGroupInfoManagerImpl.this.flushConfig();
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to perform check", e);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Sleep interrupted", e);
|
||||
// 100ms is short so let's just ignore the interrupt
|
||||
Threads.sleepWithoutInterrupt(100);
|
||||
}
|
||||
}
|
||||
return found.get();
|
||||
return false;
|
||||
}
|
||||
|
||||
private void createRSGroupTable() throws IOException {
|
||||
Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
|
||||
OptionalLong optProcId = masterServices.getProcedures().stream()
|
||||
.filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
|
||||
.filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
|
||||
.findFirst();
|
||||
long procId;
|
||||
if (optProcId.isPresent()) {
|
||||
procId = optProcId.getAsLong();
|
||||
} else {
|
||||
procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
|
||||
}
|
||||
// wait for region to be online
|
||||
int tries = 600;
|
||||
while (!(masterServices.getMasterProcedureExecutor().isFinished(procId))
|
||||
&& masterServices.getMasterProcedureExecutor().isRunning()
|
||||
&& tries > 0) {
|
||||
while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
|
||||
masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -865,13 +802,13 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
tries--;
|
||||
}
|
||||
if(tries <= 0) {
|
||||
if (tries <= 0) {
|
||||
throw new IOException("Failed to create group table in a given time.");
|
||||
} else {
|
||||
Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
|
||||
if (result != null && result.isFailed()) {
|
||||
throw new IOException("Failed to create group table. " +
|
||||
MasterProcedureUtil.unwrapRemoteIOException(result));
|
||||
throw new IOException(
|
||||
"Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -886,31 +823,32 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
|
||||
private void multiMutate(List<Mutation> mutations) throws IOException {
|
||||
CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
|
||||
MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
|
||||
= MultiRowMutationProtos.MutateRowsRequest.newBuilder();
|
||||
for (Mutation mutation : mutations) {
|
||||
if (mutation instanceof Put) {
|
||||
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
|
||||
try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
|
||||
CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
|
||||
MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
|
||||
MultiRowMutationProtos.MutateRowsRequest.newBuilder();
|
||||
for (Mutation mutation : mutations) {
|
||||
if (mutation instanceof Put) {
|
||||
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
|
||||
mutation));
|
||||
} else if (mutation instanceof Delete) {
|
||||
mmrBuilder.addMutationRequest(
|
||||
org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.
|
||||
MutationType.DELETE, mutation));
|
||||
} else {
|
||||
throw new DoNotRetryIOException("multiMutate doesn't support "
|
||||
+ mutation.getClass().getName());
|
||||
} else if (mutation instanceof Delete) {
|
||||
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
|
||||
mutation));
|
||||
} else {
|
||||
throw new DoNotRetryIOException(
|
||||
"multiMutate doesn't support " + mutation.getClass().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
|
||||
MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
|
||||
try {
|
||||
service.mutateRows(null, mmrBuilder.build());
|
||||
} catch (ServiceException ex) {
|
||||
ProtobufUtil.toIOException(ex);
|
||||
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
|
||||
MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
|
||||
try {
|
||||
service.mutateRows(null, mmrBuilder.build());
|
||||
} catch (ServiceException ex) {
|
||||
ProtobufUtil.toIOException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,245 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.rsgroup;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.NoRouteToHostException;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
|
||||
/**
|
||||
* Utility for this RSGroup package in hbase-rsgroup.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
final class Utility {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Utility.class);
|
||||
|
||||
private Utility() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param master the master to get online servers for
|
||||
* @return Set of online Servers named for their hostname and port (not ServerName).
|
||||
*/
|
||||
static Set<Address> getOnlineServers(final MasterServices master) {
|
||||
Set<Address> onlineServers = new HashSet<Address>();
|
||||
if (master == null) {
|
||||
return onlineServers;
|
||||
}
|
||||
|
||||
for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
|
||||
onlineServers.add(server.getAddress());
|
||||
}
|
||||
return onlineServers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify <code>hbase:meta</code> is deployed and accessible.
|
||||
* @param hConnection the connection to use
|
||||
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
|
||||
* @param timeout How long to wait on zk for meta address (passed through to the internal call to
|
||||
* {@link #getMetaServerConnection}.
|
||||
* @return True if the <code>hbase:meta</code> location is healthy.
|
||||
* @throws IOException if the number of retries for getting the connection is exceeded
|
||||
* @throws InterruptedException if waiting for the socket operation fails
|
||||
*/
|
||||
public static boolean verifyMetaRegionLocation(ClusterConnection hConnection, ZKWatcher zkw,
|
||||
final long timeout) throws InterruptedException, IOException {
|
||||
return verifyMetaRegionLocation(hConnection, zkw, timeout, RegionInfo.DEFAULT_REPLICA_ID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify <code>hbase:meta</code> is deployed and accessible.
|
||||
* @param connection the connection to use
|
||||
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
|
||||
* @param timeout How long to wait on zk for meta address (passed through to
|
||||
* @param replicaId the ID of the replica
|
||||
* @return True if the <code>hbase:meta</code> location is healthy.
|
||||
* @throws InterruptedException if waiting for the socket operation fails
|
||||
* @throws IOException if the number of retries for getting the connection is exceeded
|
||||
*/
|
||||
public static boolean verifyMetaRegionLocation(ClusterConnection connection, ZKWatcher zkw,
|
||||
final long timeout, int replicaId) throws InterruptedException, IOException {
|
||||
AdminProtos.AdminService.BlockingInterface service = null;
|
||||
try {
|
||||
service = getMetaServerConnection(connection, zkw, timeout, replicaId);
|
||||
} catch (NotAllMetaRegionsOnlineException e) {
|
||||
// Pass
|
||||
} catch (ServerNotRunningYetException e) {
|
||||
// Pass -- remote server is not up so can't be carrying root
|
||||
} catch (UnknownHostException e) {
|
||||
// Pass -- server name doesn't resolve so it can't be assigned anything.
|
||||
} catch (RegionServerStoppedException e) {
|
||||
// Pass -- server name sends us to a server that is dying or already dead.
|
||||
}
|
||||
return (service != null) && verifyRegionLocation(connection, service,
|
||||
MetaTableLocator.getMetaRegionLocation(zkw, replicaId),
|
||||
RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId)
|
||||
.getRegionName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify we can connect to <code>hostingServer</code> and that its carrying
|
||||
* <code>regionName</code>.
|
||||
* @param hostingServer Interface to the server hosting <code>regionName</code>
|
||||
* @param address The servername that goes with the <code>metaServer</code> interface. Used
|
||||
* logging.
|
||||
* @param regionName The regionname we are interested in.
|
||||
* @return True if we were able to verify the region located at other side of the interface.
|
||||
*/
|
||||
// TODO: We should be able to get the ServerName from the AdminProtocol
|
||||
// rather than have to pass it in. Its made awkward by the fact that the
|
||||
// HRI is likely a proxy against remote server so the getServerName needs
|
||||
// to be fixed to go to a local method or to a cache before we can do this.
|
||||
private static boolean verifyRegionLocation(final ClusterConnection connection,
|
||||
AdminService.BlockingInterface hostingServer, final ServerName address,
|
||||
final byte[] regionName) {
|
||||
if (hostingServer == null) {
|
||||
LOG.info("Passed hostingServer is null");
|
||||
return false;
|
||||
}
|
||||
Throwable t;
|
||||
HBaseRpcController controller = connection.getRpcControllerFactory().newController();
|
||||
try {
|
||||
// Try and get regioninfo from the hosting server.
|
||||
return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null;
|
||||
} catch (ConnectException e) {
|
||||
t = e;
|
||||
} catch (RetriesExhaustedException e) {
|
||||
t = e;
|
||||
} catch (RemoteException e) {
|
||||
IOException ioe = e.unwrapRemoteException();
|
||||
t = ioe;
|
||||
} catch (IOException e) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause != null && cause instanceof EOFException) {
|
||||
t = cause;
|
||||
} else if (cause != null && cause.getMessage() != null &&
|
||||
cause.getMessage().contains("Connection reset")) {
|
||||
t = cause;
|
||||
} else {
|
||||
t = e;
|
||||
}
|
||||
}
|
||||
LOG.info("Failed verification of " + Bytes.toStringBinary(regionName) + " at address=" +
|
||||
address + ", exception=" + t.getMessage());
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a connection to the server hosting meta, as reported by ZooKeeper, waiting up to the
|
||||
* specified timeout for availability.
|
||||
* <p>
|
||||
* WARNING: Does not retry. Use an {@link org.apache.hadoop.hbase.client.HTable} instead.
|
||||
* @param connection the connection to use
|
||||
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
|
||||
* @param timeout How long to wait on meta location
|
||||
* @param replicaId the ID of the replica
|
||||
* @return connection to server hosting meta
|
||||
* @throws InterruptedException if waiting for the socket operation fails
|
||||
* @throws IOException if the number of retries for getting the connection is exceeded
|
||||
*/
|
||||
private static AdminService.BlockingInterface getMetaServerConnection(
|
||||
ClusterConnection connection, ZKWatcher zkw, long timeout, int replicaId)
|
||||
throws InterruptedException, IOException {
|
||||
return getCachedConnection(connection,
|
||||
MetaTableLocator.waitMetaRegionLocation(zkw, replicaId, timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sn ServerName to get a connection against.
|
||||
* @return The AdminProtocol we got when we connected to <code>sn</code> May have come from cache,
|
||||
* may not be good, may have been setup by this invocation, or may be null.
|
||||
* @throws IOException if the number of retries for getting the connection is exceeded
|
||||
*/
|
||||
private static AdminService.BlockingInterface getCachedConnection(ClusterConnection connection,
|
||||
ServerName sn) throws IOException {
|
||||
if (sn == null) {
|
||||
return null;
|
||||
}
|
||||
AdminService.BlockingInterface service = null;
|
||||
try {
|
||||
service = connection.getAdmin(sn);
|
||||
} catch (RetriesExhaustedException e) {
|
||||
if (e.getCause() != null && e.getCause() instanceof ConnectException) {
|
||||
LOG.debug("Catch this; presume it means the cached connection has gone bad.");
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.debug("Timed out connecting to " + sn);
|
||||
} catch (NoRouteToHostException e) {
|
||||
LOG.debug("Connecting to " + sn, e);
|
||||
} catch (SocketException e) {
|
||||
LOG.debug("Exception connecting to " + sn);
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.debug("Unknown host exception connecting to " + sn);
|
||||
} catch (FailedServerException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Server " + sn + " is in failed server list.");
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
Throwable cause = ioe.getCause();
|
||||
if (ioe instanceof ConnectException) {
|
||||
LOG.debug("Catch. Connect refused.");
|
||||
} else if (cause != null && cause instanceof EOFException) {
|
||||
LOG.debug("Catch. Other end disconnected us.");
|
||||
} else if (cause != null && cause.getMessage() != null &&
|
||||
cause.getMessage().toLowerCase(Locale.ROOT).contains("connection reset")) {
|
||||
LOG.debug("Catch. Connection reset.");
|
||||
} else {
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
}
|
||||
return service;
|
||||
}
|
||||
}
|
|
@ -29,7 +29,6 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -43,7 +42,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
|
|||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
@ -55,7 +54,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
|
|||
/**
|
||||
* Test RSGroupBasedLoadBalancer with SimpleLoadBalancer as internal balancer
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
@Category(LargeTests.class)
|
||||
public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -42,7 +41,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -52,7 +51,7 @@ import org.junit.experimental.categories.Category;
|
|||
/**
|
||||
* Test RSGroupBasedLoadBalancer with StochasticLoadBalancer as internal balancer
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
@Category(LargeTests.class)
|
||||
public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
|
||||
extends RSGroupableBalancerTestBase {
|
||||
@ClassRule
|
||||
|
|
|
@ -1,229 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.rsgroup;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
||||
|
||||
@Category({ MiscTests.class, MediumTests.class })
|
||||
public class TestUtility {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestUtility.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestUtility.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final ServerName SN =
|
||||
ServerName.valueOf("example.org", 1234, System.currentTimeMillis());
|
||||
|
||||
private ZKWatcher watcher;
|
||||
|
||||
private Abortable abortable;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
// Set this down so tests run quicker
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||
UTIL.startMiniZKCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws IOException {
|
||||
UTIL.getZkCluster().shutdown();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
this.abortable = new Abortable() {
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.info(why, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
this.watcher =
|
||||
new ZKWatcher(UTIL.getConfiguration(), this.getClass().getSimpleName(), this.abortable, true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
try {
|
||||
// Clean out meta location or later tests will be confused... they presume
|
||||
// start fresh in zk.
|
||||
MetaTableLocator.deleteMetaLocation(this.watcher);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Unable to delete hbase:meta location", e);
|
||||
}
|
||||
|
||||
this.watcher.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param admin An {@link AdminProtos.AdminService.BlockingInterface} instance; you'll likely want
|
||||
* to pass a mocked HRS; can be null.
|
||||
* @param client A mocked ClientProtocol instance, can be null
|
||||
* @return Mock up a connection that returns a {@link Configuration} when
|
||||
* {@link org.apache.hadoop.hbase.client.ClusterConnection#getConfiguration()} is called,
|
||||
* a 'location' when
|
||||
* {@link org.apache.hadoop.hbase.client.RegionLocator#getRegionLocation(byte[], boolean)}
|
||||
* is called, and that returns the passed
|
||||
* {@link AdminProtos.AdminService.BlockingInterface} instance when
|
||||
* {@link org.apache.hadoop.hbase.client.ClusterConnection#getAdmin(ServerName)} is
|
||||
* called, returns the passed {@link ClientProtos.ClientService.BlockingInterface}
|
||||
* instance when
|
||||
* {@link org.apache.hadoop.hbase.client.ClusterConnection#getClient(ServerName)} is
|
||||
* called.
|
||||
*/
|
||||
private ClusterConnection mockConnection(final AdminProtos.AdminService.BlockingInterface admin,
|
||||
final ClientProtos.ClientService.BlockingInterface client) throws IOException {
|
||||
ClusterConnection connection =
|
||||
HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
|
||||
Mockito.doNothing().when(connection).close();
|
||||
// Make it so we return any old location when asked.
|
||||
final HRegionLocation anyLocation =
|
||||
new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, SN);
|
||||
Mockito.when(connection.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(),
|
||||
Mockito.anyBoolean())).thenReturn(anyLocation);
|
||||
Mockito.when(connection.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any()))
|
||||
.thenReturn(anyLocation);
|
||||
if (admin != null) {
|
||||
// If a call to getHRegionConnection, return this implementation.
|
||||
Mockito.when(connection.getAdmin(Mockito.any())).thenReturn(admin);
|
||||
}
|
||||
if (client != null) {
|
||||
// If a call to getClient, return this implementation.
|
||||
Mockito.when(connection.getClient(Mockito.any())).thenReturn(client);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
private void testVerifyMetaRegionLocationWithException(Exception ex)
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
// Mock an ClientProtocol.
|
||||
final ClientProtos.ClientService.BlockingInterface implementation =
|
||||
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
|
||||
|
||||
ClusterConnection connection = mockConnection(null, implementation);
|
||||
|
||||
// If a 'get' is called on mocked interface, throw connection refused.
|
||||
Mockito.when(implementation.get((RpcController) Mockito.any(), (GetRequest) Mockito.any()))
|
||||
.thenThrow(new ServiceException(ex));
|
||||
|
||||
long timeout = UTIL.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
|
||||
MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPENING);
|
||||
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, timeout));
|
||||
|
||||
MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
|
||||
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test get of meta region fails properly if nothing to connect to.
|
||||
*/
|
||||
@Test
|
||||
public void testVerifyMetaRegionLocationFails()
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
ClusterConnection connection = Mockito.mock(ClusterConnection.class);
|
||||
ServiceException connectException =
|
||||
new ServiceException(new ConnectException("Connection refused"));
|
||||
final AdminProtos.AdminService.BlockingInterface implementation =
|
||||
Mockito.mock(AdminProtos.AdminService.BlockingInterface.class);
|
||||
Mockito.when(implementation.getRegionInfo((RpcController) Mockito.any(),
|
||||
(GetRegionInfoRequest) Mockito.any())).thenThrow(connectException);
|
||||
Mockito.when(connection.getAdmin(Mockito.any())).thenReturn(implementation);
|
||||
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
|
||||
Mockito.when(controllerFactory.newController())
|
||||
.thenReturn(Mockito.mock(HBaseRpcController.class));
|
||||
Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory);
|
||||
|
||||
ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis());
|
||||
MetaTableLocator.setMetaLocation(this.watcher, sn, RegionState.State.OPENING);
|
||||
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, 100));
|
||||
MetaTableLocator.setMetaLocation(this.watcher, sn, RegionState.State.OPEN);
|
||||
assertFalse(Utility.verifyMetaRegionLocation(connection, watcher, 100));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test we survive a connection refused {@link ConnectException}
|
||||
*/
|
||||
@Test
|
||||
public void testGetMetaServerConnectionFails()
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
testVerifyMetaRegionLocationWithException(new ConnectException("Connection refused"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that verifyMetaRegionLocation properly handles getting a ServerNotRunningException. See
|
||||
* HBASE-4470. Note this doesn't check the exact exception thrown in the HBASE-4470 as there it is
|
||||
* thrown from getHConnection() and here it is thrown from get() -- but those are both called from
|
||||
* the same function anyway, and this way is less invasive than throwing from getHConnection would
|
||||
* be.
|
||||
*/
|
||||
@Test
|
||||
public void testVerifyMetaRegionServerNotRunning()
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
testVerifyMetaRegionLocationWithException(new ServerNotRunningYetException("mock"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue