HBASE-19131 Add the ClusterStatus hook and cleanup other hooks which can be replaced by ClusterStatus hook

This commit is contained in:
Chia-Ping Tsai 2017-11-04 22:10:35 +08:00
parent 51b65707b3
commit 56fd75e8e2
20 changed files with 481 additions and 1448 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase;
import com.google.common.base.Objects;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -194,14 +195,14 @@ public class ClusterStatus extends VersionedWritable {
if (!(o instanceof ClusterStatus)) {
return false;
}
return (getVersion() == ((ClusterStatus)o).getVersion()) &&
getHBaseVersion().equals(((ClusterStatus)o).getHBaseVersion()) &&
this.liveServers.equals(((ClusterStatus)o).liveServers) &&
this.deadServers.containsAll(((ClusterStatus)o).deadServers) &&
Arrays.equals(this.masterCoprocessors,
((ClusterStatus)o).masterCoprocessors) &&
this.master.equals(((ClusterStatus)o).master) &&
this.backupMasters.containsAll(((ClusterStatus)o).backupMasters);
ClusterStatus other = (ClusterStatus) o;
return Objects.equal(getHBaseVersion(), other.getHBaseVersion()) &&
Objects.equal(this.liveServers, other.liveServers) &&
getDeadServerNames().containsAll(other.getDeadServerNames()) &&
Arrays.equals(getMasterCoprocessors(), other.getMasterCoprocessors()) &&
Objects.equal(getMaster(), other.getMaster()) &&
getBackupMasters().containsAll(other.getBackupMasters()) &&
Objects.equal(getClusterId(), other.getClusterId());
}
/**

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase;
import com.google.common.base.Objects;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -329,4 +330,35 @@ public class ServerLoad {
public long getReportTime() {
return reportTime;
}
@Override
public int hashCode() {
return Objects.hashCode(stores, storefiles, storeUncompressedSizeMB,
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, readRequestsCount,
writeRequestsCount, rootIndexSizeKB, totalStaticIndexSizeKB,
totalStaticBloomSizeKB, totalCompactingKVs, currentCompactedKVs);
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof ServerLoad) {
ServerLoad sl = ((ServerLoad) other);
return stores == sl.stores && storefiles == sl.storefiles
&& storeUncompressedSizeMB == sl.storeUncompressedSizeMB
&& storefileSizeMB == sl.storefileSizeMB
&& memstoreSizeMB == sl.memstoreSizeMB
&& storefileIndexSizeMB == sl.storefileIndexSizeMB
&& readRequestsCount == sl.readRequestsCount
&& writeRequestsCount == sl.writeRequestsCount
&& rootIndexSizeKB == sl.rootIndexSizeKB
&& totalStaticIndexSizeKB == sl.totalStaticIndexSizeKB
&& totalStaticBloomSizeKB == sl.totalStaticBloomSizeKB
&& totalCompactingKVs == sl.totalCompactingKVs
&& currentCompactedKVs == sl.currentCompactedKVs;
}
return false;
}
}

View File

@ -1807,13 +1807,6 @@ class ConnectionManager {
return stub.clearDeadServers(controller, request);
}
@Override
public MasterProtos.ListDeadServersResponse listDeadServers(
RpcController controller,
MasterProtos.ListDeadServersRequest request) throws ServiceException {
return stub.listDeadServers(controller, request);
}
@Override
public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
throws ServiceException {
@ -2721,7 +2714,7 @@ class ConnectionManager {
public boolean hasCellBlockSupport() {
return this.rpcClient.hasCellBlockSupport();
}
@Override
public ConnectionConfiguration getConnectionConfiguration() {
return this.connectionConfig;

View File

@ -136,7 +136,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshot
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListProceduresRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
@ -4903,14 +4902,7 @@ public class HBaseAdmin implements Admin {
@Override
public List<ServerName> listDeadServers() throws IOException {
return executeCallable(new MasterCallable<List<ServerName>>(getConnection()) {
@Override
public List<ServerName> call(int callTimeout) throws ServiceException {
ListDeadServersRequest req = ListDeadServersRequest.newBuilder().build();
return ProtobufUtil.toServerNameList(
master.listDeadServers(null, req).getServerNameList());
}
});
return new ArrayList<>(getClusterStatus().getDeadServerNames());
}
@Override

View File

@ -556,13 +556,6 @@ message SecurityCapabilitiesResponse {
repeated Capability capabilities = 1;
}
message ListDeadServersRequest {
}
message ListDeadServersResponse {
repeated ServerName server_name = 1;
}
message ClearDeadServersRequest {
repeated ServerName server_name = 1;
}
@ -867,9 +860,4 @@ service MasterService {
/** clear dead servers from master*/
rpc ClearDeadServers(ClearDeadServersRequest)
returns(ClearDeadServersResponse);
/** Returns a list of Dead Servers. */
rpc ListDeadServers(ListDeadServersRequest)
returns(ListDeadServersResponse);
}

View File

@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -953,15 +954,13 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
}
@Override
public void preListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
public void preGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
public void postListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
public void postGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx,
ClusterStatus status) throws IOException {
}
@Override

View File

@ -54,7 +54,8 @@ MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
<th>Start Time</th>
</tr>
<%java>
Collection<ServerName> backup_masters = master.getClusterStatus().getBackupMasters();
Collection<ServerName> backup_masters
= master.getClusterStatusWithoutCoprocessor().getBackupMasters();
ServerName [] backupServerNames = backup_masters.toArray(new ServerName[backup_masters.size()]);
Arrays.sort(backupServerNames);
for (ServerName serverName : backupServerNames) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -67,12 +68,13 @@ public class BaseMasterAndRegionObserver extends BaseRegionObserver
}
@Override
public void preListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
public void preGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
public void postListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
public void postGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx,
ClusterStatus status)
throws IOException {
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -78,13 +79,14 @@ public class BaseMasterObserver implements MasterObserver {
}
@Override
public void preListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
public void preGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
public void postListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
public void postGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx,
ClusterStatus status)
throws IOException {
}
@Override
@ -474,7 +476,7 @@ public class BaseMasterObserver implements MasterObserver {
public void postListSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final SnapshotDescription snapshot) throws IOException {
}
@Override
public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.Coprocessor;
@ -674,7 +675,7 @@ public interface MasterObserver extends Coprocessor {
*/
void postListSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final SnapshotDescription snapshot) throws IOException;
/**
* Called before a snapshot is cloned.
* Called as part of restoreSnapshot RPC call.
@ -921,7 +922,7 @@ public interface MasterObserver extends Coprocessor {
*/
void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName) throws IOException;
/**
* Called before the quota for the user is stored.
* @param ctx the environment to interact with the framework and master
@ -1027,7 +1028,7 @@ public interface MasterObserver extends Coprocessor {
final String namespace, final Quotas quotas) throws IOException;
/**
* Called before dispatching region merge request.
* Called before dispatching region merge request.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx coprocessor environment
* @param regionA first region to be merged
@ -1036,7 +1037,7 @@ public interface MasterObserver extends Coprocessor {
*/
void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
HRegionInfo regionA, HRegionInfo regionB) throws IOException;
/**
* called after dispatching the region merge request.
* @param c coprocessor environment
@ -1048,14 +1049,15 @@ public interface MasterObserver extends Coprocessor {
final HRegionInfo regionA, final HRegionInfo regionB) throws IOException;
/**
* Called before list dead region servers.
* Called before get cluster status.
*/
void preListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException;
void preGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException;
/**
* Called after list dead region servers.
* Called after get cluster status.
*/
void postListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException;
void postGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx,
ClusterStatus status) throws IOException;
/**
* Called before clear dead region servers.

View File

@ -828,7 +828,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
//initialize load balancer
this.balancer.setMasterServices(this);
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setClusterStatus(getClusterStatusWithoutCoprocessor());
this.balancer.initialize();
// Check if master is shutting down because of some issue
@ -860,7 +860,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
this.assignmentManager.joinCluster();
// set cluster status again after user regions are assigned
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setClusterStatus(getClusterStatusWithoutCoprocessor());
// Start balancer and meta catalog janitor after meta and regions have been assigned.
status.setStatus("Starting balancer and catalog janitor");
@ -1488,7 +1488,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
List<RegionPlan> plans = new ArrayList<RegionPlan>();
//Give the balancer the current cluster state.
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setClusterStatus(getClusterStatusWithoutCoprocessor());
for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
if (partialPlans != null) plans.addAll(partialPlans);
@ -2404,10 +2404,22 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
}
public ClusterStatus getClusterStatus() throws IOException {
if (cpHost != null) {
cpHost.preGetClusterStatus();
}
ClusterStatus status = getClusterStatusWithoutCoprocessor();
LOG.info(getClientIdAuditPrefix() + " get ClusterStatus, status=" + status);
if (cpHost != null) {
cpHost.postGetClusterStatus(status);
}
return status;
}
/**
* @return cluster status
*/
public ClusterStatus getClusterStatus() throws InterruptedIOException {
public ClusterStatus getClusterStatusWithoutCoprocessor() throws InterruptedIOException {
// Build Set of backup masters from ZK nodes
List<String> backupMasterStrings;
try {
@ -3154,12 +3166,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
@Override
public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
return getClusterStatus().getLastMajorCompactionTsForTable(table);
return getClusterStatusWithoutCoprocessor().getLastMajorCompactionTsForTable(table);
}
@Override
public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
return getClusterStatusWithoutCoprocessor().getLastMajorCompactionTsForRegion(regionName);
}
/**

View File

@ -27,6 +27,7 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
@ -919,7 +920,7 @@ public class MasterCoprocessorHost
}
});
}
public void preCloneSnapshot(final SnapshotDescription snapshot,
final HTableDescriptor hTableDescriptor) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@ -1071,7 +1072,7 @@ public class MasterCoprocessorHost
}
});
}
public void preSetUserQuota(final String user, final Quotas quotas) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
@ -1166,7 +1167,7 @@ public class MasterCoprocessorHost
});
}
public void postSetNamespaceQuota(final String namespace, final Quotas quotas)
public void postSetNamespaceQuota(final String namespace, final Quotas quotas)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
@ -1177,22 +1178,22 @@ public class MasterCoprocessorHost
});
}
public void preListDeadServers() throws IOException {
public void preGetClusterStatus() throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
oserver.preListDeadServers(ctx);
oserver.preGetClusterStatus(ctx);
}
});
}
public void postListDeadServers() throws IOException {
public void postGetClusterStatus(final ClusterStatus status) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
oserver.postListDeadServers(ctx);
oserver.postGetClusterStatus(ctx, status);
}
});
}

View File

@ -125,8 +125,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshot
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListDeadServersResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListProceduresRequest;
@ -1140,33 +1138,6 @@ public class MasterRpcServices extends RSRpcServices
}
}
@Override
public ListDeadServersResponse listDeadServers(RpcController controller,
ListDeadServersRequest request) throws ServiceException {
LOG.debug(master.getClientIdAuditPrefix() + " list dead region servers.");
ListDeadServersResponse.Builder response = ListDeadServersResponse.newBuilder();
try {
master.checkInitialized();
if (master.cpHost != null) {
master.cpHost.preListDeadServers();
}
Set<ServerName> servers = master.getServerManager().getDeadServers().copyServerNames();
for (ServerName server : servers) {
response.addServerName(ProtobufUtil.toServerName(server));
}
if (master.cpHost != null) {
master.cpHost.postListDeadServers();
}
} catch (IOException io) {
throw new ServiceException(io);
}
return response.build();
}
@Override
public ClearDeadServersResponse clearDeadServers(RpcController controller,
ClearDeadServersRequest request) throws ServiceException {

View File

@ -46,7 +46,7 @@ public class ClusterStatusChore extends ScheduledChore {
@Override
protected void chore() {
try {
balancer.setClusterStatus(master.getClusterStatus());
balancer.setClusterStatus(master.getClusterStatusWithoutCoprocessor());
} catch (InterruptedIOException e) {
LOG.warn("Ignoring interruption", e);
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -770,7 +771,7 @@ public class AccessController extends BaseMasterAndRegionObserver
}
}
} else if (entry.getValue() == null) {
get.addFamily(col);
get.addFamily(col);
} else {
throw new RuntimeException("Unhandled collection type " +
entry.getValue().getClass().getName());
@ -1338,7 +1339,7 @@ public class AccessController extends BaseMasterAndRegionObserver
requirePermission("listSnapshot " + snapshot.getName(), Action.ADMIN);
}
}
@Override
public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
@ -1412,7 +1413,7 @@ public class AccessController extends BaseMasterAndRegionObserver
@Override
public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
NamespaceDescriptor ns) throws IOException {
// We require only global permission so that
// We require only global permission so that
// a user with NS admin cannot altering namespace configurations. i.e. namespace quota
requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
}
@ -2605,14 +2606,6 @@ public class AccessController extends BaseMasterAndRegionObserver
Action.ADMIN);
}
@Override
public void preListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException { }
@Override
public void postListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException { }
@Override
public void preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
@ -2676,7 +2669,7 @@ public class AccessController extends BaseMasterAndRegionObserver
public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
List<WALEntry> entries, CellScanner cells) throws IOException {
}
@Override
public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final String userName, final Quotas quotas) throws IOException {
@ -2742,4 +2735,10 @@ public class AccessController extends BaseMasterAndRegionObserver
String groupName) throws IOException {
requirePermission("balanceRSGroup", Action.ADMIN);
}
@Override
public void preGetClusterStatus(final ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
requirePermission("getClusterStatus", Action.ADMIN);
}
}

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the ClusterStatus.
*/
@Category(SmallTests.class)
public class TestClientClusterStatus {
private static HBaseTestingUtility UTIL;
private static HBaseAdmin ADMIN;
private final static int SLAVES = 5;
private final static int MASTERS = 3;
private static MiniHBaseCluster CLUSTER;
private static HRegionServer DEAD;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
UTIL = new HBaseTestingUtility(conf);
UTIL.startMiniCluster(MASTERS, SLAVES);
CLUSTER = UTIL.getHBaseCluster();
CLUSTER.waitForActiveAndReadyMaster();
ADMIN = UTIL.getHBaseAdmin();
// Kill one region server
List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads();
RegionServerThread rst = rsts.get(rsts.size() - 1);
DEAD = rst.getRegionServer();
DEAD.stop("Test dead servers status");
while (rst.isAlive()) {
Thread.sleep(500);
}
}
@Test
public void testDefaults() throws Exception {
ClusterStatus origin = ADMIN.getClusterStatus();
ClusterStatus defaults = ADMIN.getClusterStatus();
Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
Assert.assertTrue(origin.getAverageLoad() == defaults.getAverageLoad());
Assert.assertTrue(origin.getBackupMastersSize() == defaults.getBackupMastersSize());
Assert.assertTrue(origin.getDeadServers() == defaults.getDeadServers());
Assert.assertTrue(origin.getRegionsCount() == defaults.getRegionsCount());
Assert.assertTrue(origin.getServersSize() == defaults.getServersSize());
Assert.assertTrue(origin.equals(defaults));
}
@Test
public void testLiveAndDeadServersStatus() throws Exception {
List<RegionServerThread> regionserverThreads = CLUSTER.getLiveRegionServerThreads();
int numRs = 0;
int len = regionserverThreads.size();
for (int i = 0; i < len; i++) {
if (regionserverThreads.get(i).isAlive()) {
numRs++;
}
}
// Retrieve live servers and dead servers info.
ClusterStatus status = ADMIN.getClusterStatus();
Assert.assertNotNull(status);
Assert.assertNotNull(status.getServers());
// exclude a dead region server
Assert.assertEquals(SLAVES -1, numRs);
// live servers = nums of regionservers
// By default, HMaster don't carry any regions so it won't report its load.
// Hence, it won't be in the server list.
Assert.assertEquals(status.getServers().size(), numRs);
Assert.assertTrue(status.getRegionsCount() > 0);
Assert.assertNotNull(status.getDeadServerNames());
Assert.assertEquals(1, status.getDeadServers());
ServerName deadServerName = status.getDeadServerNames().iterator().next();
Assert.assertEquals(DEAD.getServerName(), deadServerName);
}
@Test
public void testMasterAndBackupMastersStatus() throws Exception {
// get all the master threads
List<MasterThread> masterThreads = CLUSTER.getMasterThreads();
int numActive = 0;
int activeIndex = 0;
ServerName activeName = null;
HMaster active = null;
for (int i = 0; i < masterThreads.size(); i++) {
if (masterThreads.get(i).getMaster().isActiveMaster()) {
numActive++;
activeIndex = i;
active = masterThreads.get(activeIndex).getMaster();
activeName = active.getServerName();
}
}
Assert.assertNotNull(active);
Assert.assertEquals(1, numActive);
Assert.assertEquals(MASTERS, masterThreads.size());
// Retrieve master and backup masters infos only.
ClusterStatus status = ADMIN.getClusterStatus();
Assert.assertTrue(status.getMaster().equals(activeName));
Assert.assertEquals(MASTERS - 1, status.getBackupMastersSize());
}
@Test
public void testOtherStatusInfos() throws Exception {
ClusterStatus status = ADMIN.getClusterStatus();
Assert.assertTrue(status.getMasterCoprocessors().length == 1);
Assert.assertNotNull(status.getHBaseVersion());
Assert.assertNotNull(status.getClusterId());
Assert.assertNotNull(status.getBalancerOn());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (ADMIN != null) ADMIN.close();
UTIL.shutdownMiniCluster();
}
@Test
public void testObserver() throws IOException {
int preCount = MyObserver.PRE_COUNT.get();
int postCount = MyObserver.POST_COUNT.get();
boolean find = false;
for (String s : ADMIN.getClusterStatus().getMasterCoprocessors()) {
if (s.equals(MyObserver.class.getSimpleName())) {
find = true;
}
}
Assert.assertTrue(find);
Assert.assertEquals(preCount + 1, MyObserver.PRE_COUNT.get());
Assert.assertEquals(postCount + 1, MyObserver.POST_COUNT.get());
}
public static class MyObserver extends BaseMasterObserver {
private static final AtomicInteger PRE_COUNT = new AtomicInteger(0);
private static final AtomicInteger POST_COUNT = new AtomicInteger(0);
@Override
public void preGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
PRE_COUNT.incrementAndGet();
}
@Override public void postGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx,
ClusterStatus status) throws IOException {
POST_COUNT.incrementAndGet();
}
}
}

View File

@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -272,13 +273,13 @@ public class TestMasterObserver {
}
@Override
public void preListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
public void preGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
public void postListDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
public void postGetClusterStatus(ObserverContext<MasterCoprocessorEnvironment> ctx,
ClusterStatus status) throws IOException {
}
@Override

View File

@ -1568,7 +1568,7 @@ public class TestSplitTransactionOnCluster {
}
}
private void waitUntilRegionServerDead() throws InterruptedException, InterruptedIOException {
private void waitUntilRegionServerDead() throws InterruptedException, IOException {
// Wait until the master processes the RS shutdown
for (int i=0; cluster.getMaster().getClusterStatus().
getServers().size() > NB_SERVERS && i<100; i++) {

View File

@ -3040,4 +3040,18 @@ public class TestAccessController extends SecureTestUtil {
verifyAllowed(action1, SUPERUSER, USER_ADMIN);
verifyDenied(action1, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
}
@Test
public void testGetClusterStatus() throws Exception {
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preGetClusterStatus(ObserverContext.createAndPrepare(CP_ENV, null));
return null;
}
};
verifyAllowed(action, SUPERUSER, USER_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
}
}