HBASE-26730 Extend hbase shell 'status' command to support an option 'tasks' (#4095)

Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2022-02-15 18:38:18 -08:00 committed by GitHub
parent 7c52895e8c
commit 407c6e7b22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 471 additions and 21 deletions

View File

@ -161,6 +161,12 @@ public interface ClusterMetrics {
*/ */
Map<TableName, RegionStatesCount> getTableRegionStatesCount(); Map<TableName, RegionStatesCount> getTableRegionStatesCount();
/**
* Provide the list of master tasks
*/
@Nullable
List<ServerTask> getMasterTasks();
/** /**
* Kinds of ClusterMetrics * Kinds of ClusterMetrics
*/ */
@ -213,5 +219,9 @@ public interface ClusterMetrics {
* metrics about table to no of regions status count * metrics about table to no of regions status count
*/ */
TABLE_TO_REGIONS_COUNT, TABLE_TO_REGIONS_COUNT,
/**
* metrics about monitored tasks
*/
TASKS,
} }
} }

View File

@ -83,6 +83,10 @@ public final class ClusterMetricsBuilder {
if (metrics.getMasterName() != null) { if (metrics.getMasterName() != null) {
builder.setMaster(ProtobufUtil.toServerName((metrics.getMasterName()))); builder.setMaster(ProtobufUtil.toServerName((metrics.getMasterName())));
} }
if (metrics.getMasterTasks() != null) {
builder.addAllMasterTasks(metrics.getMasterTasks().stream()
.map(t -> ProtobufUtil.toServerTask(t)).collect(Collectors.toList()));
}
if (metrics.getBalancerOn() != null) { if (metrics.getBalancerOn() != null) {
builder.setBalancerOn(metrics.getBalancerOn()); builder.setBalancerOn(metrics.getBalancerOn());
} }
@ -122,7 +126,9 @@ public final class ClusterMetricsBuilder {
proto.getTableRegionStatesCountList().stream() proto.getTableRegionStatesCountList().stream()
.collect(Collectors.toMap( .collect(Collectors.toMap(
e -> ProtobufUtil.toTableName(e.getTableName()), e -> ProtobufUtil.toTableName(e.getTableName()),
e -> ProtobufUtil.toTableRegionStatesCount(e.getRegionStatesCount())))); e -> ProtobufUtil.toTableRegionStatesCount(e.getRegionStatesCount()))))
.setMasterTasks(proto.getMasterTasksList().stream()
.map(t -> ProtobufUtil.getServerTask(t)).collect(Collectors.toList()));
if (proto.hasClusterId()) { if (proto.hasClusterId()) {
builder.setClusterId(ClusterId.convert(proto.getClusterId()).toString()); builder.setClusterId(ClusterId.convert(proto.getClusterId()).toString());
} }
@ -164,6 +170,7 @@ public final class ClusterMetricsBuilder {
case SERVERS_NAME: return ClusterMetrics.Option.SERVERS_NAME; case SERVERS_NAME: return ClusterMetrics.Option.SERVERS_NAME;
case MASTER_INFO_PORT: return ClusterMetrics.Option.MASTER_INFO_PORT; case MASTER_INFO_PORT: return ClusterMetrics.Option.MASTER_INFO_PORT;
case TABLE_TO_REGIONS_COUNT: return ClusterMetrics.Option.TABLE_TO_REGIONS_COUNT; case TABLE_TO_REGIONS_COUNT: return ClusterMetrics.Option.TABLE_TO_REGIONS_COUNT;
case TASKS: return ClusterMetrics.Option.TASKS;
// should not reach here // should not reach here
default: throw new IllegalArgumentException("Invalid option: " + option); default: throw new IllegalArgumentException("Invalid option: " + option);
} }
@ -188,6 +195,7 @@ public final class ClusterMetricsBuilder {
case SERVERS_NAME: return Option.SERVERS_NAME; case SERVERS_NAME: return Option.SERVERS_NAME;
case MASTER_INFO_PORT: return ClusterStatusProtos.Option.MASTER_INFO_PORT; case MASTER_INFO_PORT: return ClusterStatusProtos.Option.MASTER_INFO_PORT;
case TABLE_TO_REGIONS_COUNT: return ClusterStatusProtos.Option.TABLE_TO_REGIONS_COUNT; case TABLE_TO_REGIONS_COUNT: return ClusterStatusProtos.Option.TABLE_TO_REGIONS_COUNT;
case TASKS: return ClusterStatusProtos.Option.TASKS;
// should not reach here // should not reach here
default: throw new IllegalArgumentException("Invalid option: " + option); default: throw new IllegalArgumentException("Invalid option: " + option);
} }
@ -231,6 +239,8 @@ public final class ClusterMetricsBuilder {
private int masterInfoPort; private int masterInfoPort;
private List<ServerName> serversName = Collections.emptyList(); private List<ServerName> serversName = Collections.emptyList();
private Map<TableName, RegionStatesCount> tableRegionStatesCount = Collections.emptyMap(); private Map<TableName, RegionStatesCount> tableRegionStatesCount = Collections.emptyMap();
@Nullable
private List<ServerTask> masterTasks;
private ClusterMetricsBuilder() { private ClusterMetricsBuilder() {
} }
@ -280,6 +290,10 @@ public final class ClusterMetricsBuilder {
this.serversName = serversName; this.serversName = serversName;
return this; return this;
} }
public ClusterMetricsBuilder setMasterTasks(List<ServerTask> masterTasks) {
this.masterTasks = masterTasks;
return this;
}
public ClusterMetricsBuilder setTableRegionStatesCount( public ClusterMetricsBuilder setTableRegionStatesCount(
Map<TableName, RegionStatesCount> tableRegionStatesCount) { Map<TableName, RegionStatesCount> tableRegionStatesCount) {
@ -300,7 +314,8 @@ public final class ClusterMetricsBuilder {
balancerOn, balancerOn,
masterInfoPort, masterInfoPort,
serversName, serversName,
tableRegionStatesCount tableRegionStatesCount,
masterTasks
); );
} }
private static class ClusterMetricsImpl implements ClusterMetrics { private static class ClusterMetricsImpl implements ClusterMetrics {
@ -320,6 +335,7 @@ public final class ClusterMetricsBuilder {
private final int masterInfoPort; private final int masterInfoPort;
private final List<ServerName> serversName; private final List<ServerName> serversName;
private final Map<TableName, RegionStatesCount> tableRegionStatesCount; private final Map<TableName, RegionStatesCount> tableRegionStatesCount;
private final List<ServerTask> masterTasks;
ClusterMetricsImpl(String hbaseVersion, List<ServerName> deadServerNames, ClusterMetricsImpl(String hbaseVersion, List<ServerName> deadServerNames,
Map<ServerName, ServerMetrics> liveServerMetrics, Map<ServerName, ServerMetrics> liveServerMetrics,
@ -331,7 +347,8 @@ public final class ClusterMetricsBuilder {
Boolean balancerOn, Boolean balancerOn,
int masterInfoPort, int masterInfoPort,
List<ServerName> serversName, List<ServerName> serversName,
Map<TableName, RegionStatesCount> tableRegionStatesCount) { Map<TableName, RegionStatesCount> tableRegionStatesCount,
List<ServerTask> masterTasks) {
this.hbaseVersion = hbaseVersion; this.hbaseVersion = hbaseVersion;
this.deadServerNames = Preconditions.checkNotNull(deadServerNames); this.deadServerNames = Preconditions.checkNotNull(deadServerNames);
this.liveServerMetrics = Preconditions.checkNotNull(liveServerMetrics); this.liveServerMetrics = Preconditions.checkNotNull(liveServerMetrics);
@ -344,6 +361,7 @@ public final class ClusterMetricsBuilder {
this.masterInfoPort = masterInfoPort; this.masterInfoPort = masterInfoPort;
this.serversName = serversName; this.serversName = serversName;
this.tableRegionStatesCount = Preconditions.checkNotNull(tableRegionStatesCount); this.tableRegionStatesCount = Preconditions.checkNotNull(tableRegionStatesCount);
this.masterTasks = masterTasks;
} }
@Override @Override
@ -406,6 +424,11 @@ public final class ClusterMetricsBuilder {
return Collections.unmodifiableMap(tableRegionStatesCount); return Collections.unmodifiableMap(tableRegionStatesCount);
} }
@Override
public List<ServerTask> getMasterTasks() {
return masterTasks;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(1024); StringBuilder sb = new StringBuilder(1024);

View File

@ -124,4 +124,11 @@ public interface ServerMetrics {
*/ */
long getLastReportTimestamp(); long getLastReportTimestamp();
/**
* Called directly from clients such as the hbase shell
* @return the active monitored tasks
*/
@Nullable
List<ServerTask> getTasks();
} }

View File

@ -87,6 +87,8 @@ public final class ServerMetricsBuilder {
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink() .setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) ? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink())
: null) : null)
.setTasks(serverLoadPB.getTasksList().stream()
.map(ProtobufUtil::getServerTask).collect(Collectors.toList()))
.setReportTimestamp(serverLoadPB.getReportEndTime()) .setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber) .setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
.setVersion(version).build(); .setVersion(version).build();
@ -105,19 +107,24 @@ public final class ServerMetricsBuilder {
.setInfoServerPort(metrics.getInfoServerPort()) .setInfoServerPort(metrics.getInfoServerPort())
.setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE)) .setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE))
.setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE)) .setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE))
.addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads( .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames()))
.addAllRegionLoads(
metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad) metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad)
.collect(Collectors.toList())).addAllUserLoads( .collect(Collectors.toList()))
.addAllUserLoads(
metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics) metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics)
.collect(Collectors.toList())).addAllReplLoadSource( .collect(Collectors.toList()))
.addAllReplLoadSource(
metrics.getReplicationLoadSourceList().stream() metrics.getReplicationLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.addAllTasks(
metrics.getTasks().stream().map(ProtobufUtil::toServerTask)
.collect(Collectors.toList()))
.setReportStartTime(metrics.getLastReportTimestamp()) .setReportStartTime(metrics.getLastReportTimestamp())
.setReportEndTime(metrics.getReportTimestamp()); .setReportEndTime(metrics.getReportTimestamp());
if (metrics.getReplicationLoadSink() != null) { if (metrics.getReplicationLoadSink() != null) {
builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink())); builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink()));
} }
return builder.build(); return builder.build();
} }
@ -143,6 +150,8 @@ public final class ServerMetricsBuilder {
private final Set<String> coprocessorNames = new TreeSet<>(); private final Set<String> coprocessorNames = new TreeSet<>();
private long reportTimestamp = EnvironmentEdgeManager.currentTime(); private long reportTimestamp = EnvironmentEdgeManager.currentTime();
private long lastReportTimestamp = 0; private long lastReportTimestamp = 0;
private final List<ServerTask> tasks = new ArrayList<>();
private ServerMetricsBuilder(ServerName serverName) { private ServerMetricsBuilder(ServerName serverName) {
this.serverName = serverName; this.serverName = serverName;
} }
@ -228,6 +237,11 @@ public final class ServerMetricsBuilder {
return this; return this;
} }
public ServerMetricsBuilder setTasks(List<ServerTask> tasks) {
this.tasks.addAll(tasks);
return this;
}
public ServerMetrics build() { public ServerMetrics build() {
return new ServerMetricsImpl( return new ServerMetricsImpl(
serverName, serverName,
@ -246,7 +260,8 @@ public final class ServerMetricsBuilder {
coprocessorNames, coprocessorNames,
reportTimestamp, reportTimestamp,
lastReportTimestamp, lastReportTimestamp,
userMetrics); userMetrics,
tasks);
} }
private static class ServerMetricsImpl implements ServerMetrics { private static class ServerMetricsImpl implements ServerMetrics {
@ -268,13 +283,15 @@ public final class ServerMetricsBuilder {
private final long reportTimestamp; private final long reportTimestamp;
private final long lastReportTimestamp; private final long lastReportTimestamp;
private final Map<byte[], UserMetrics> userMetrics; private final Map<byte[], UserMetrics> userMetrics;
private final List<ServerTask> tasks;
ServerMetricsImpl(ServerName serverName, int versionNumber, String version, ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, long readRequestsCount, long requestCountPerSecond, long requestCount, long readRequestsCount,
long writeRequestsCount, Size usedHeapSize, Size maxHeapSize, long writeRequestsCount, Size usedHeapSize, Size maxHeapSize,
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink, int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames,
long reportTimestamp, long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics) { long reportTimestamp, long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics,
List<ServerTask> tasks) {
this.serverName = Preconditions.checkNotNull(serverName); this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber; this.versionNumber = versionNumber;
this.version = version; this.version = version;
@ -292,6 +309,7 @@ public final class ServerMetricsBuilder {
this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames); this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames);
this.reportTimestamp = reportTimestamp; this.reportTimestamp = reportTimestamp;
this.lastReportTimestamp = lastReportTimestamp; this.lastReportTimestamp = lastReportTimestamp;
this.tasks = tasks;
} }
@Override @Override
@ -388,6 +406,11 @@ public final class ServerMetricsBuilder {
return lastReportTimestamp; return lastReportTimestamp;
} }
@Override
public List<ServerTask> getTasks() {
return tasks;
}
@Override @Override
public String toString() { public String toString() {
int storeCount = 0; int storeCount = 0;

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.yetus.audience.InterfaceAudience;
/** Information about active monitored server tasks */
@InterfaceAudience.Public
public interface ServerTask {
/** Task state */
enum State {
RUNNING,
WAITING,
COMPLETE,
ABORTED;
}
/**
* Get the task's description.
* @return the task's description, typically a name
*/
String getDescription();
/**
* Get the current status of the task.
* @return the task's current status
*/
String getStatus();
/**
* Get the current state of the task.
* @return the task's current state
*/
State getState();
/**
* Get the task start time.
* @return the time when the task started, or 0 if it has not started yet
*/
long getStartTime();
/**
* Get the task completion time.
* @return the time when the task completed, or 0 if it has not completed yet
*/
long getCompletionTime();
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.yetus.audience.InterfaceAudience;
/** Builder for information about active monitored server tasks */
@InterfaceAudience.Private
public final class ServerTaskBuilder {
public static ServerTaskBuilder newBuilder() {
return new ServerTaskBuilder();
}
private String description = "";
private String status = "";
private ServerTask.State state = ServerTask.State.RUNNING;
private long startTime;
private long completionTime;
private ServerTaskBuilder() { }
private static final class ServerTaskImpl implements ServerTask {
private final String description;
private final String status;
private final ServerTask.State state;
private final long startTime;
private final long completionTime;
private ServerTaskImpl(final String description, final String status,
final ServerTask.State state, final long startTime, final long completionTime) {
this.description = description;
this.status = status;
this.state = state;
this.startTime = startTime;
this.completionTime = completionTime;
}
@Override
public String getDescription() {
return description;
}
@Override
public String getStatus() {
return status;
}
@Override
public State getState() {
return state;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public long getCompletionTime() {
return completionTime;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(512);
sb.append(getDescription());
sb.append(": status=");
sb.append(getStatus());
sb.append(", state=");
sb.append(getState());
sb.append(", startTime=");
sb.append(getStartTime());
sb.append(", completionTime=");
sb.append(getCompletionTime());
return sb.toString();
}
}
public ServerTaskBuilder setDescription(final String description) {
this.description = description;
return this;
}
public ServerTaskBuilder setStatus(final String status) {
this.status = status;
return this;
}
public ServerTaskBuilder setState(final ServerTask.State state) {
this.state = state;
return this;
}
public ServerTaskBuilder setStartTime(final long startTime) {
this.startTime = startTime;
return this;
}
public ServerTaskBuilder setCompletionTime(final long completionTime) {
this.completionTime = completionTime;
return this;
}
public ServerTask build() {
return new ServerTaskImpl(description, status, state, startTime, completionTime);
}
}

View File

@ -67,6 +67,8 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerTask;
import org.apache.hadoop.hbase.ServerTaskBuilder;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.BalanceResponse; import org.apache.hadoop.hbase.client.BalanceResponse;
@ -3905,4 +3907,24 @@ public final class ProtobufUtil {
.build(); .build();
} }
public static ServerTask getServerTask(ClusterStatusProtos.ServerTask task) {
return ServerTaskBuilder.newBuilder()
.setDescription(task.getDescription())
.setStatus(task.getStatus())
.setState(ServerTask.State.valueOf(task.getState().name()))
.setStartTime(task.getStartTime())
.setCompletionTime(task.getCompletionTime())
.build();
}
public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
return ClusterStatusProtos.ServerTask.newBuilder()
.setDescription(task.getDescription())
.setStatus(task.getStatus())
.setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
.setStartTime(task.getStartTime())
.setCompletionTime(task.getCompletionTime())
.build();
}
} }

View File

@ -229,6 +229,21 @@ message ReplicationLoadSource {
optional uint64 oPsShipped = 12; optional uint64 oPsShipped = 12;
} }
message ServerTask {
required string description = 1;
required string status = 2;
required State state = 3;
optional uint64 startTime = 4;
optional uint64 completionTime = 5;
enum State {
RUNNING = 0;
WAITING = 1;
COMPLETE = 2;
ABORTED = 3;
}
}
message ServerLoad { message ServerLoad {
/** Number of requests since last report. */ /** Number of requests since last report. */
optional uint64 number_of_requests = 1; optional uint64 number_of_requests = 1;
@ -295,6 +310,11 @@ message ServerLoad {
* The metrics for write requests on this region server * The metrics for write requests on this region server
*/ */
optional uint64 write_requests_count = 14; optional uint64 write_requests_count = 14;
/**
* The active monitored tasks
*/
repeated ServerTask tasks = 15;
} }
message LiveServerInfo { message LiveServerInfo {
@ -328,6 +348,7 @@ message ClusterStatus {
optional int32 master_info_port = 10 [default = -1]; optional int32 master_info_port = 10 [default = -1];
repeated ServerName servers_name = 11; repeated ServerName servers_name = 11;
repeated TableRegionStatesCount table_region_states_count = 12; repeated TableRegionStatesCount table_region_states_count = 12;
repeated ServerTask master_tasks = 13;
} }
enum Option { enum Option {
@ -343,4 +364,5 @@ enum Option {
MASTER_INFO_PORT = 9; MASTER_INFO_PORT = 9;
SERVERS_NAME = 10; SERVERS_NAME = 10;
TABLE_TO_REGIONS_COUNT = 11; TABLE_TO_REGIONS_COUNT = 11;
TASKS = 12;
} }

View File

@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerTask;
import org.apache.hadoop.hbase.ServerTaskBuilder;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
@ -2737,16 +2739,39 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
options = EnumSet.allOf(Option.class); options = EnumSet.allOf(Option.class);
} }
// TASKS and/or LIVE_SERVERS will populate this map, which will be given to the builder if
// not null after option processing completes.
Map<ServerName, ServerMetrics> serverMetricsMap = null;
for (Option opt : options) { for (Option opt : options) {
switch (opt) { switch (opt) {
case HBASE_VERSION: builder.setHBaseVersion(VersionInfo.getVersion()); break; case HBASE_VERSION: builder.setHBaseVersion(VersionInfo.getVersion()); break;
case CLUSTER_ID: builder.setClusterId(getClusterId()); break; case CLUSTER_ID: builder.setClusterId(getClusterId()); break;
case MASTER: builder.setMasterName(getServerName()); break; case MASTER: builder.setMasterName(getServerName()); break;
case BACKUP_MASTERS: builder.setBackerMasterNames(getBackupMasters()); break; case BACKUP_MASTERS: builder.setBackerMasterNames(getBackupMasters()); break;
case TASKS: {
// Master tasks
builder.setMasterTasks(TaskMonitor.get().getTasks().stream()
.map(task -> ServerTaskBuilder.newBuilder()
.setDescription(task.getDescription())
.setStatus(task.getStatus())
.setState(ServerTask.State.valueOf(task.getState().name()))
.setStartTime(task.getStartTime())
.setCompletionTime(task.getCompletionTimestamp())
.build())
.collect(Collectors.toList()));
// TASKS is also synonymous with LIVE_SERVERS for now because task information for
// regionservers is carried in ServerLoad.
// Add entries to serverMetricsMap for all live servers, if we haven't already done so
if (serverMetricsMap == null) {
serverMetricsMap = getOnlineServers();
}
break;
}
case LIVE_SERVERS: { case LIVE_SERVERS: {
if (serverManager != null) { // Add entries to serverMetricsMap for all live servers, if we haven't already done so
builder.setLiveServerMetrics(serverManager.getOnlineServers().entrySet().stream() if (serverMetricsMap == null) {
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))); serverMetricsMap = getOnlineServers();
} }
break; break;
} }
@ -2808,9 +2833,24 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
} }
} }
} }
if (serverMetricsMap != null) {
builder.setLiveServerMetrics(serverMetricsMap);
}
return builder.build(); return builder.build();
} }
private Map<ServerName, ServerMetrics> getOnlineServers() {
if (serverManager != null) {
final Map<ServerName, ServerMetrics> map = new HashMap<>();
serverManager.getOnlineServers().entrySet()
.forEach(e -> map.put(e.getKey(), e.getValue()));
return map;
}
return null;
}
/** /**
* @return cluster status * @return cluster status
*/ */

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.net.Address;
@ -1195,6 +1196,15 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
} }
} }
TaskMonitor.get().getTasks().forEach(task ->
serverLoad.addTasks(ClusterStatusProtos.ServerTask.newBuilder()
.setDescription(task.getDescription())
.setStatus(task.getStatus() != null ? task.getStatus() : "")
.setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
.setStartTime(task.getStartTime())
.setCompletionTime(task.getCompletionTimestamp())
.build()));
return serverLoad.build(); return serverLoad.build();
} }

View File

@ -49,12 +49,14 @@ import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.filter.FilterAllFilter; import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MetricsUserAggregateFactory; import org.apache.hadoop.hbase.regionserver.MetricsUserAggregateFactory;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -80,6 +82,18 @@ public class TestClientClusterMetrics {
private static final TableName TABLE_NAME = TableName.valueOf("test"); private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] CF = Bytes.toBytes("cf"); private static final byte[] CF = Bytes.toBytes("cf");
// We need to promote the visibility of tryRegionServerReport for this test
public static class MyRegionServer
extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
@Override
public void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException {
super.tryRegionServerReport(reportStartTime, reportEndTime);
}
}
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -87,6 +101,7 @@ public class TestClientClusterMetrics {
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName()); conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
UTIL = new HBaseTestingUtil(conf); UTIL = new HBaseTestingUtil(conf);
StartTestingClusterOption option = StartTestingClusterOption.builder() StartTestingClusterOption option = StartTestingClusterOption.builder()
.rsClass(TestClientClusterMetrics.MyRegionServer.class)
.numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build(); .numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
UTIL.startMiniCluster(option); UTIL.startMiniCluster(option);
CLUSTER = UTIL.getHBaseCluster(); CLUSTER = UTIL.getHBaseCluster();
@ -305,7 +320,8 @@ public class TestClientClusterMetrics {
Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size()); Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
} }
@Test public void testUserMetrics() throws Exception { @Test
public void testUserMetrics() throws Exception {
Configuration conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();
// If metrics for users is not enabled, this test doesn't make sense. // If metrics for users is not enabled, this test doesn't make sense.
if (!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF, if (!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF,
@ -393,6 +409,48 @@ public class TestClientClusterMetrics {
UTIL.deleteTable(TABLE_NAME); UTIL.deleteTable(TABLE_NAME);
} }
@Test
public void testServerTasks() throws Exception {
// TaskMonitor is a singleton per VM, so will be shared among all minicluster "servers",
// so we only need to look at the first live server's results to find it.
final String testTaskName = "TEST TASK";
TaskMonitor.get().createStatus(testTaskName).setStatus("Testing 1... 2... 3...");
// Of course, first we must trigger regionserver reports.
final long now = EnvironmentEdgeManager.currentTime();
final long last = now - 1000; // fake a period, or someone might div by zero
for (RegionServerThread rs: CLUSTER.getRegionServerThreads()) {
((MyRegionServer)rs.getRegionServer()).tryRegionServerReport(last, now);
}
// Get status now
ClusterMetrics clusterMetrics = ADMIN.getClusterMetrics(EnumSet.of(Option.TASKS));
// The test task will be in the master metrics list
boolean found = false;
for (ServerTask task: clusterMetrics.getMasterTasks()) {
if (testTaskName.equals(task.getDescription())) {
// Found it
found = true;
break;
}
}
Assert.assertTrue("Expected task not found in master task list", found);
// Get the tasks information (carried in server metrics)
found = false;
for (ServerMetrics serverMetrics: clusterMetrics.getLiveServerMetrics().values()) {
if (serverMetrics.getTasks() != null) {
for (ServerTask task: serverMetrics.getTasks()) {
if (testTaskName.equals(task.getDescription())) {
// Found it
found = true;
break;
}
}
}
}
// We will fall through here if getClusterMetrics(TASKS) did not correctly process the
// task list.
Assert.assertTrue("Expected task not found in server load", found);
}
private RegionMetrics getMetaMetrics() throws IOException { private RegionMetrics getMetaMetrics() throws IOException {
for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
.getLiveServerMetrics().values()) { .getLiveServerMetrics().values()) {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerTask;
import org.apache.hadoop.hbase.Size; import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -293,6 +294,11 @@ public class TestRegionsRecoveryChore {
return null; return null;
} }
@Override
public List<ServerTask> getMasterTasks() {
return null;
}
}; };
return clusterMetrics; return clusterMetrics;
} }
@ -387,6 +393,11 @@ public class TestRegionsRecoveryChore {
return 0; return 0;
} }
@Override
public List<ServerTask> getTasks() {
return null;
}
}; };
return serverMetrics; return serverMetrics;
} }

View File

@ -923,14 +923,17 @@ module Hbase
for v in cluster_metrics.getRegionStatesInTransition for v in cluster_metrics.getRegionStatesInTransition
puts(format(' %s', v)) puts(format(' %s', v))
end end
master = cluster_metrics.getMasterName master = cluster_metrics.getMaster
puts(format('active master: %s:%d %d', master.getHostname, master.getPort, unless master.nil?
master.getStartcode)) puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
puts(format('%d backup masters', cluster_metrics.getBackupMasterNames.size)) for task in cluster_metrics.getMasterTasks
for server in cluster_metrics.getBackupMasterNames puts(format(' %s', task.toString))
end
end
puts(format('%d backup masters', cluster_metrics.getBackupMastersSize))
for server in cluster_metrics.getBackupMasters
puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode)) puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
end end
master_coprocs = @admin.getMasterCoprocessorNames.toString master_coprocs = @admin.getMasterCoprocessorNames.toString
unless master_coprocs.nil? unless master_coprocs.nil?
puts(format('master coprocessors: %s', master_coprocs)) puts(format('master coprocessors: %s', master_coprocs))
@ -943,6 +946,9 @@ module Hbase
puts(format(' %s', region.getNameAsString.dump)) puts(format(' %s', region.getNameAsString.dump))
puts(format(' %s', region.toString)) puts(format(' %s', region.toString))
end end
for task in cluster_metrics.getLoad(server).getTasks
puts(format(' %s', task.toString))
end
end end
puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size)) puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size))
for server in cluster_metrics.getDeadServerNames for server in cluster_metrics.getDeadServerNames
@ -982,6 +988,33 @@ module Hbase
puts(format('%<sink>s', sink: r_sink_string)) puts(format('%<sink>s', sink: r_sink_string))
end end
end end
elsif format == 'tasks'
master = cluster_metrics.getMaster
unless master.nil?
puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
printed = false
for task in cluster_metrics.getMasterTasks
next unless task.getState.name == 'RUNNING'
puts(format(' %s', task.toString))
printed = true
end
if !printed
puts(' no active tasks')
end
end
puts(format('%d live servers', cluster_metrics.getServersSize))
for server in cluster_metrics.getServers
puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
printed = false
for task in cluster_metrics.getLoad(server).getTasks
next unless task.getState.name == 'RUNNING'
puts(format(' %s', task.toString))
printed = true
end
if !printed
puts(' no active tasks')
end
end
elsif format == 'simple' elsif format == 'simple'
load = 0 load = 0
regions = 0 regions = 0

View File

@ -22,13 +22,14 @@ module Shell
class Status < Command class Status < Command
def help def help
<<-EOF <<-EOF
Show cluster status. Can be 'summary', 'simple', 'detailed', or 'replication'. The Show cluster status. Can be 'summary', 'simple', 'detailed', 'tasks', or 'replication'. The
default is 'summary'. Examples: default is 'summary'. Examples:
hbase> status hbase> status
hbase> status 'simple'
hbase> status 'summary' hbase> status 'summary'
hbase> status 'simple'
hbase> status 'detailed' hbase> status 'detailed'
hbase> status 'tasks'
hbase> status 'replication' hbase> status 'replication'
hbase> status 'replication', 'source' hbase> status 'replication', 'source'
hbase> status 'replication', 'sink' hbase> status 'replication', 'sink'